From 2640ecd03baa1dea9067d1995fc8aeb2409eebb3 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 23 May 2024 14:41:39 +0800 Subject: [PATCH] release: v1.0.0 --- .gitignore | 3 + cmd/init.go | 53 ++++++ cmd/main.go | 242 ++++++++++++++++++++++++ config.toml | 28 +++ dev/docker-compose.yaml | 8 + go.mod | 82 +++++++++ go.sum | 283 +++++++++++++++++++++++++++++ internal/api/api.go | 31 ++++ internal/backfiller/backfiller.go | 107 +++++++++++ internal/cache/cache.go | 61 +++++++ internal/cache/xmap.go | 48 +++++ internal/chain/chain.go | 21 +++ internal/chain/rpc.go | 139 ++++++++++++++ internal/db/bolt.go | 182 +++++++++++++++++++ internal/db/db.go | 48 +++++ internal/event/event.go | 37 ++++ internal/handler/faucet_give.go | 112 ++++++++++++ internal/handler/handler.go | 50 +++++ internal/handler/index_add.go | 113 ++++++++++++ internal/handler/index_remove.go | 98 ++++++++++ internal/handler/ownership.go | 98 ++++++++++ internal/handler/pool_deposit.go | 108 +++++++++++ internal/handler/pool_swap.go | 121 ++++++++++++ internal/handler/quoter_price.go | 100 ++++++++++ internal/handler/seal.go | 95 ++++++++++ internal/handler/token_burn.go | 97 ++++++++++ internal/handler/token_mint.go | 103 +++++++++++ internal/handler/token_transfer.go | 124 +++++++++++++ internal/pool/pool.go | 28 +++ internal/processor/processor.go | 147 +++++++++++++++ internal/pub/console.go | 30 +++ internal/pub/jetstream.go | 92 ++++++++++ internal/pub/pub.go | 12 ++ internal/queue/queue.go | 67 +++++++ internal/stats/stats.go | 78 ++++++++ internal/syncer/realtime.go | 80 ++++++++ internal/syncer/syncer.go | 77 ++++++++ revive.toml | 44 +++++ 38 files changed, 3247 insertions(+) create mode 100644 .gitignore create mode 100644 cmd/init.go create mode 100644 cmd/main.go create mode 100644 config.toml create mode 100644 dev/docker-compose.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/api/api.go create mode 100644 internal/backfiller/backfiller.go create mode 100644 internal/cache/cache.go create mode 100644 internal/cache/xmap.go create mode 100644 internal/chain/chain.go create mode 100644 internal/chain/rpc.go create mode 100644 internal/db/bolt.go create mode 100644 internal/db/db.go create mode 100644 internal/event/event.go create mode 100644 internal/handler/faucet_give.go create mode 100644 internal/handler/handler.go create mode 100644 internal/handler/index_add.go create mode 100644 internal/handler/index_remove.go create mode 100644 internal/handler/ownership.go create mode 100644 internal/handler/pool_deposit.go create mode 100644 internal/handler/pool_swap.go create mode 100644 internal/handler/quoter_price.go create mode 100644 internal/handler/seal.go create mode 100644 internal/handler/token_burn.go create mode 100644 internal/handler/token_mint.go create mode 100644 internal/handler/token_transfer.go create mode 100644 internal/pool/pool.go create mode 100644 internal/processor/processor.go create mode 100644 internal/pub/console.go create mode 100644 internal/pub/jetstream.go create mode 100644 internal/pub/pub.go create mode 100644 internal/queue/queue.go create mode 100644 internal/stats/stats.go create mode 100644 internal/syncer/realtime.go create mode 100644 internal/syncer/syncer.go create mode 100644 revive.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..36a5bea --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +tracker_db +.vscode +.idx \ No newline at end of file diff --git a/cmd/init.go b/cmd/init.go new file mode 100644 index 0000000..c08ec24 --- /dev/null +++ b/cmd/init.go @@ -0,0 +1,53 @@ +package main + +import ( + "log/slog" + "os" + "strings" + + "github.com/kamikazechaser/common/logg" + "github.com/knadh/koanf/parsers/toml" + "github.com/knadh/koanf/providers/env" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/v2" +) + +func initLogger() *slog.Logger { + loggOpts := logg.LoggOpts{ + FormatType: logg.Logfmt, + LogLevel: slog.LevelInfo, + } + + if os.Getenv("DEBUG") != "" { + loggOpts.LogLevel = slog.LevelDebug + } + + if os.Getenv("DEV") != "" { + loggOpts.LogLevel = slog.LevelDebug + loggOpts.FormatType = logg.Human + } + + return logg.NewLogg(loggOpts) +} + +func initConfig() *koanf.Koanf { + var ( + ko = koanf.New(".") + ) + + confFile := file.Provider(confFlag) + if err := ko.Load(confFile, toml.Parser()); err != nil { + lo.Error("could not parse configuration file", "error", err) + os.Exit(1) + } + + if err := ko.Load(env.Provider("TRACKER_", ".", func(s string) string { + return strings.ReplaceAll(strings.ToLower( + strings.TrimPrefix(s, "TRACKER_")), "__", ".") + }), nil); err != nil { + lo.Error("could not override config from env vars", "error", err) + os.Exit(1) + } + + return ko +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..be4ba8c --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,242 @@ +package main + +import ( + "context" + "errors" + "flag" + "log/slog" + "net/http" + "os" + "os/signal" + "runtime" + "sync" + "syscall" + "time" + + "github.com/grassrootseconomics/celo-tracker/internal/api" + "github.com/grassrootseconomics/celo-tracker/internal/backfiller" + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/pool" + "github.com/grassrootseconomics/celo-tracker/internal/processor" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/celo-tracker/internal/queue" + "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/grassrootseconomics/celo-tracker/internal/syncer" + "github.com/knadh/koanf/v2" +) + +const defaultGracefulShutdownPeriod = time.Second * 30 + +var ( + build = "dev" + + confFlag string + + lo *slog.Logger + ko *koanf.Koanf +) + +func init() { + flag.StringVar(&confFlag, "config", "config.toml", "Config file location") + flag.Parse() + + lo = initLogger() + ko = initConfig() + + lo.Info("starting celo tracker", "build", build) +} + +/* +Dependency Order +---------------- +- Chain +- DB +- Cache +- JetStream Pub +- Worker Pool +- Block Processor +- Queue +- Stats +- Chain Syncer +- Backfiller +- API +*/ +func main() { + var wg sync.WaitGroup + ctx, stop := notifyShutdown() + + chain, err := chain.NewRPCFetcher(chain.RPCOpts{ + RPCEndpoint: ko.MustString("chain.rpc_endpoint"), + ChainID: ko.MustInt64("chain.chainid"), + IsArchiveNode: ko.Bool("chain.archive_node"), + }) + if err != nil { + lo.Error("could not initialize chain client", "error", err) + os.Exit(1) + } + + db, err := db.New(db.DBOpts{ + Logg: lo, + DBType: ko.MustString("core.db_type"), + }) + if err != nil { + lo.Error("could not initialize blocks db", "error", err) + os.Exit(1) + } + + cache, err := cache.New(cache.CacheOpts{ + Chain: chain, + Logg: lo, + CacheType: ko.MustString("core.cache_type"), + Blacklist: ko.MustStrings("bootstrap.blacklist"), + Registries: ko.MustStrings("bootstrap.ge_registries"), + Watchlist: ko.MustStrings("bootstrap.watchlist"), + }) + if err != nil { + lo.Error("could not initialize cache", "error", err) + os.Exit(1) + } + + jetStreamPub, err := pub.NewJetStreamPub(pub.JetStreamOpts{ + Endpoint: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, + Logg: lo, + }) + if err != nil { + lo.Error("could not initialize jetstream pub", "error", err) + os.Exit(1) + } + + poolOpts := pool.PoolOpts{ + Logg: lo, + WorkerCount: ko.Int("core.pool_size"), + } + if ko.Int("core.pool_size") <= 0 { + poolOpts.WorkerCount = runtime.NumCPU() * 3 + } + workerPool := pool.NewPool(poolOpts) + + stats := stats.New(stats.StatsOpts{ + Cache: cache, + Logg: lo, + Pool: workerPool, + }) + + blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ + Cache: cache, + DB: db, + Chain: chain, + Pub: jetStreamPub, + Logg: lo, + Stats: stats, + }) + + queue := queue.New(queue.QueueOpts{ + Logg: lo, + Processor: blockProcessor, + Pool: workerPool, + }) + + chainSyncer, err := syncer.New(syncer.SyncerOpts{ + DB: db, + Chain: chain, + Logg: lo, + Queue: queue, + Stats: stats, + StartBlock: ko.Int64("chain.start_block"), + WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), + }) + if err != nil { + lo.Error("could not initialize chain syncer", "error", err) + os.Exit(1) + } + + backfiller := backfiller.New(backfiller.BackfillerOpts{ + DB: db, + Logg: lo, + Queue: queue, + }) + + apiServer := &http.Server{ + Addr: ko.MustString("api.address"), + Handler: api.New(stats), + } + + wg.Add(1) + go func() { + defer wg.Done() + queue.Process() + }() + + wg.Add(1) + go func() { + defer wg.Done() + chainSyncer.Start() + }() + + wg.Add(1) + go func() { + defer wg.Done() + if err := backfiller.Run(false); err != nil { + lo.Error("backfiller initial run error", "error", err) + } + backfiller.Start() + }() + + wg.Add(1) + go func() { + defer wg.Done() + lo.Info("metrics and stats server starting", "address", ko.MustString("api.address")) + if err := apiServer.ListenAndServe(); err != http.ErrServerClosed { + lo.Error("failed to start API server", "error", err) + os.Exit(1) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + stats.StartStatsPrinter() + }() + + <-ctx.Done() + lo.Info("shutdown signal received") + shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) + + wg.Add(1) + go func() { + defer wg.Done() + queue.Stop() + workerPool.StopAndWait() + stats.Stop() + chainSyncer.Stop() + backfiller.Stop() + jetStreamPub.Close() + db.Cleanup() + db.Close() + apiServer.Shutdown(shutdownCtx) + lo.Info("graceful shutdown routine complete") + }() + + go func() { + wg.Wait() + stop() + cancel() + os.Exit(0) + }() + + <-shutdownCtx.Done() + if errors.Is(shutdownCtx.Err(), context.DeadlineExceeded) { + stop() + cancel() + lo.Error("graceful shutdown period exceeded, forcefully shutting down") + } + os.Exit(1) +} + +func notifyShutdown() (context.Context, context.CancelFunc) { + return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..816c998 --- /dev/null +++ b/config.toml @@ -0,0 +1,28 @@ +[api] +address = ":5001" + +[core] +cache_type = "map" +db_type = "bolt" +pool_size = 0 +archive_node = false + +[chain] +ws_endpoint = "wss://forno.celo.org/ws" +rpc_endpoint = "https://forno.celo.org" +chainid = 42220 +start_block = 0 + +[bootstrap] +ge_registries = [ + "0xd1FB944748aca327a1ba036B082993D9dd9Bfa0C", + "0x0cc9f4fff962def35bb34a53691180b13e653030", +] +watchlist = [""] +blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"] + +[jetstream] +enable = true +endpoint = "nats://127.0.0.1:4222" +persist_duration_hrs = 48 +dedup_duration_hrs = 6 diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml new file mode 100644 index 0000000..e456040 --- /dev/null +++ b/dev/docker-compose.yaml @@ -0,0 +1,8 @@ +services: + nats: + image: nats:2 + restart: unless-stopped + command: -js -sd /tmp/nats/data -m 8222 + ports: + - 0.0.0.0:4222:4222 + - 0.0.0.0:8222:8222 \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..813da43 --- /dev/null +++ b/go.mod @@ -0,0 +1,82 @@ +module github.com/grassrootseconomics/celo-tracker + +go 1.22.3 + +require ( + github.com/VictoriaMetrics/metrics v1.33.1 + github.com/alitto/pond v1.8.3 + github.com/bits-and-blooms/bitset v1.13.0 + github.com/celo-org/celo-blockchain v1.8.4 + github.com/grassrootseconomics/celoutils/v3 v3.0.1 + github.com/grassrootseconomics/w3-celo v0.17.2 + github.com/kamikazechaser/common v0.2.0 + github.com/knadh/koanf/parsers/toml v0.1.0 + github.com/knadh/koanf/providers/env v0.1.0 + github.com/knadh/koanf/providers/file v0.1.0 + github.com/knadh/koanf/v2 v2.1.0 + github.com/nats-io/nats.go v1.34.1 + github.com/puzpuzpuz/xsync/v3 v3.1.0 + github.com/uptrace/bunrouter v1.0.21 + go.etcd.io/bbolt v1.3.9 +) + +require ( + filippo.io/edwards25519 v1.0.0-alpha.2 // indirect + github.com/StackExchange/wmi v1.2.1 // indirect + github.com/VictoriaMetrics/fastcache v1.12.1 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/celo-org/celo-bls-go v0.3.4 // indirect + github.com/celo-org/celo-bls-go-android v0.3.3 // indirect + github.com/celo-org/celo-bls-go-ios v0.3.3 // indirect + github.com/celo-org/celo-bls-go-linux v0.3.3 // indirect + github.com/celo-org/celo-bls-go-macos v0.3.3 // indirect + github.com/celo-org/celo-bls-go-other v0.3.3 // indirect + github.com/celo-org/celo-bls-go-windows v0.3.3 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/deckarep/golang-set v1.8.0 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/fjl/memsize v0.0.2 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect + github.com/go-stack/stack v1.8.1 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect + github.com/gorilla/websocket v1.5.0 // indirect + github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect + github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff // indirect + github.com/holiman/bloomfilter/v2 v2.0.3 // indirect + github.com/holiman/uint256 v1.2.4 // indirect + github.com/huin/goupnp v1.3.0 // indirect + github.com/jackpal/go-nat-pmp v1.0.2 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/lmittmann/tint v1.0.4 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/onsi/gomega v1.10.1 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/tsdb v0.7.1 // indirect + github.com/rivo/uniseg v0.4.2 // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/stretchr/testify v1.8.4 // indirect + github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/valyala/fastrand v1.1.0 // indirect + github.com/valyala/histogram v1.2.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + golang.org/x/time v0.5.0 // indirect + golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect + gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..81c2bb6 --- /dev/null +++ b/go.sum @@ -0,0 +1,283 @@ +filippo.io/edwards25519 v1.0.0-alpha.2 h1:EWbZLqGEPSIj2W69gx04KtNVkyPIfe3uj0DhDQJonbQ= +filippo.io/edwards25519 v1.0.0-alpha.2/go.mod h1:X+pm78QAUPtFLi1z9PYIlS/bdDnvbCOGKtZ+ACWEf7o= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= +github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= +github.com/VictoriaMetrics/metrics v1.33.1 h1:CNV3tfm2Kpv7Y9W3ohmvqgFWPR55tV2c7M2U6OIo+UM= +github.com/VictoriaMetrics/metrics v1.33.1/go.mod h1:r7hveu6xMdUACXvB8TYdAj8WEsKzWB0EkpJN+RDtOf8= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= +github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE= +github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/btcsuite/btcd v0.23.2 h1:/YOgUp25sdCnP5ho6Hl3s0E438zlX+Kak7E6TgBgoT0= +github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= +github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA= +github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg= +github.com/celo-org/celo-blockchain v1.8.4 h1:QJiRRGcZyO+PutT8WkFO1juORANAOGhN8yGNP7HvBqk= +github.com/celo-org/celo-blockchain v1.8.4/go.mod h1:0udpV9QnZ2rnVu3vf8G9Wjs5di6Bf9urjwGVgAUEGB8= +github.com/celo-org/celo-bls-go v0.3.4 h1:slNePT/gVjgUi7f8M4KTwBz/YYgv3JWU6XqyY0xKN84= +github.com/celo-org/celo-bls-go v0.3.4/go.mod h1:qDZHMC3bBqOw5qle28cRtKlEyJhslZtckcc2Tomqdks= +github.com/celo-org/celo-bls-go-android v0.3.3 h1:iZ2Gragn3JItkptmppeq1SENmNVc1f1W25UE4u231HY= +github.com/celo-org/celo-bls-go-android v0.3.3/go.mod h1:cFgtFRH8+6x5b+EyG5SqniXY3aKd03NBSGDgITscX34= +github.com/celo-org/celo-bls-go-ios v0.3.3 h1:/yHaEYft9WfXyPIGuJz7V2/r+tp2IqSpkvKsMsVgbuY= +github.com/celo-org/celo-bls-go-ios v0.3.3/go.mod h1:eaSoMpx29YV5oF7jXVChzJpNfxeZHnAa8G4PjL5CyW0= +github.com/celo-org/celo-bls-go-linux v0.3.3 h1:ukSQSIRyFCQeC1i7LJJunRKvlLuG1JMwNZ6DQZC51fE= +github.com/celo-org/celo-bls-go-linux v0.3.3/go.mod h1:DVpJadg22OrxBtMb0ub6iNVdqDBL/r6EDdWVAA0bHa0= +github.com/celo-org/celo-bls-go-macos v0.3.3 h1:H4ZGc+kS3e/w9Q6qru6FtlkYtVDS8eIQgw6UURB/Jlo= +github.com/celo-org/celo-bls-go-macos v0.3.3/go.mod h1:mYPuRqGMVxj6yZUeL6Q6ggtP52HPBS1jz+FvBPXQ7QA= +github.com/celo-org/celo-bls-go-other v0.3.3 h1:/Q9SLJK22hibPm/WI/OPxbBmgXTgUhjUgobfzz7qj/w= +github.com/celo-org/celo-bls-go-other v0.3.3/go.mod h1:tNxZNfekzyT7TdYQbyNPhkfpcYtA3KCU/IKX5FNxM/U= +github.com/celo-org/celo-bls-go-windows v0.3.3 h1:0IP+Ad9l+op50TIfkmFr+j7+TIjKksVROe+EoF7Ixa4= +github.com/celo-org/celo-bls-go-windows v0.3.3/go.mod h1:82GC5iJA9Qw5gynhYqR8ht3J+l/MO8eSNzgSTMI8UdA= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/deckarep/golang-set v1.8.0 h1:sk9/l/KqpunDwP7pSjUg0keiOOLEnOBHzykLrsPppp4= +github.com/deckarep/golang-set v1.8.0/go.mod h1:5nI87KwE7wgsBU1F4GKAw2Qod7p5kyS383rP6+o6qqo= +github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/fjl/memsize v0.0.2 h1:27txuSD9or+NZlnOWdKUxeBzTAUkWCVh+4Gf2dWFOzA= +github.com/fjl/memsize v0.0.2/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= +github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-stack/stack v1.8.1 h1:ntEHSVwIt7PNXNpgPmVfMrNhLtgjlmnZha2kOpuRiDw= +github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXiurYmW7fx4GZkL8feAMVq7nEjURHk= +github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grassrootseconomics/celoutils/v3 v3.0.1 h1:R8W7jueJTFVhASV8ixDMaX5voJ2G/pW/tXLhf+KB43E= +github.com/grassrootseconomics/celoutils/v3 v3.0.1/go.mod h1:VVo5Ge0Lop9rjVhTd0fnidyJmIolYAXsEJ6bzkangwQ= +github.com/grassrootseconomics/w3-celo v0.17.2 h1:sEBlOe/H2F6YvBRTGD/kiObVqUFW7AczbzC8xxsVLDY= +github.com/grassrootseconomics/w3-celo v0.17.2/go.mod h1:M3KJaj25DspF9siaqNXyamAuzC5DmWXx74tpyPp+R4Y= +github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= +github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= +github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuWpEqDnvIw251EVy4zlP8gWbsGj4BsUKCRpYs= +github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff h1:LeVKjw8pcDQj7WVVnbFvbD7ovcv+r/l15ka1NH6Lswc= +github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff/go.mod h1:Feit0l8NcNO4g69XNjwvsR0LGcwMMfzI1TF253rOIlQ= +github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= +github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= +github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU= +github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= +github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= +github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/kamikazechaser/common v0.2.0 h1:bqi5UaMTDm/wtZlJEvQDNhsLVJP4Beg+HKWeQ+dhpss= +github.com/kamikazechaser/common v0.2.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6ODgOub/6LCI= +github.com/knadh/koanf/parsers/toml v0.1.0/go.mod h1:yUprhq6eo3GbyVXFFMdbfZSo928ksS+uo0FFqNMnO18= +github.com/knadh/koanf/providers/env v0.1.0 h1:LqKteXqfOWyx5Ab9VfGHmjY9BvRXi+clwyZozgVRiKg= +github.com/knadh/koanf/providers/env v0.1.0/go.mod h1:RE8K9GbACJkeEnkl8L/Qcj8p4ZyPXZIQ191HJi44ZaQ= +github.com/knadh/koanf/providers/file v0.1.0 h1:fs6U7nrV58d3CFAFh8VTde8TM262ObYf3ODrc//Lp+c= +github.com/knadh/koanf/providers/file v0.1.0/go.mod h1:rjJ/nHQl64iYCtAW2QQnF0eSmDEX/YZ/eNFj5yR6BvA= +github.com/knadh/koanf/v2 v2.1.0 h1:eh4QmHHBuU8BybfIJ8mB8K8gsGCD/AUQTdwGq/GzId8= +github.com/knadh/koanf/v2 v2.1.0/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= +github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= +github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8ohIXc3tViBH44KcwB2g4= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= +github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= +github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= +github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= +github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= +github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= +github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/uptrace/bunrouter v1.0.21 h1:HXarvX+N834sXyHpl+I/TuE11m19kLW/qG5u3YpHUag= +github.com/uptrace/bunrouter v1.0.21/go.mod h1:TwT7Bc0ztF2Z2q/ZzMuSVkcb/Ig/d3MQeP2cxn3e1hI= +github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8= +github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= +github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= +github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU= +gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0= +gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/api.go b/internal/api/api.go new file mode 100644 index 0000000..e7c161a --- /dev/null +++ b/internal/api/api.go @@ -0,0 +1,31 @@ +package api + +import ( + "net/http" + + "github.com/VictoriaMetrics/metrics" + "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/uptrace/bunrouter" +) + +func New(statsCollector *stats.Stats) *bunrouter.Router { + router := bunrouter.New() + + router.GET("/metrics", metricsHandler()) + router.GET("/stats", statsHandler(statsCollector)) + + return router +} + +func metricsHandler() bunrouter.HandlerFunc { + return func(w http.ResponseWriter, _ bunrouter.Request) error { + metrics.WritePrometheus(w, true) + return nil + } +} + +func statsHandler(s *stats.Stats) bunrouter.HandlerFunc { + return func(w http.ResponseWriter, _ bunrouter.Request) error { + return bunrouter.JSON(w, s.APIStatsResponse()) + } +} diff --git a/internal/backfiller/backfiller.go b/internal/backfiller/backfiller.go new file mode 100644 index 0000000..38c3f77 --- /dev/null +++ b/internal/backfiller/backfiller.go @@ -0,0 +1,107 @@ +package backfiller + +import ( + "fmt" + "log/slog" + "os" + "time" + + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/queue" +) + +type ( + BackfillerOpts struct { + DB db.DB + Logg *slog.Logger + Queue *queue.Queue + } + + backfiller struct { + db db.DB + logg *slog.Logger + queue *queue.Queue + stopCh chan struct{} + ticker *time.Ticker + } +) + +const ( + verifierInterval = 20 * time.Second + epochBlocksCount = 17_280 +) + +func New(o BackfillerOpts) *backfiller { + return &backfiller{ + db: o.DB, + logg: o.Logg, + queue: o.Queue, + stopCh: make(chan struct{}), + ticker: time.NewTicker(verifierInterval), + } +} + +func (b *backfiller) Stop() { + b.ticker.Stop() + b.stopCh <- struct{}{} +} + +func (b *backfiller) Start() { + for { + select { + case <-b.stopCh: + b.logg.Info("verifier shutting down") + b.ticker.Stop() + return + case <-b.ticker.C: + if b.queue.Size() <= 1 { + if err := b.Run(true); err != nil { + b.logg.Error("verifier tick run error", "err", err) + } + } + } + } +} + +func (b *backfiller) Run(skipLatest bool) error { + lower, err := b.db.GetLowerBound() + if err != nil { + return fmt.Errorf("verifier could not get lower bound from db: err %v", err) + } + upper, err := b.db.GetUpperBound() + if err != nil { + return fmt.Errorf("verifier could not get upper bound from db: err %v", err) + } + + if skipLatest { + upper-- + } + + missingBlocks, err := b.db.GetMissingValuesBitSet(lower, upper) + if err != nil { + return fmt.Errorf("verifier could not get missing values bitset: err %v", err) + } + missingBlocksCount := missingBlocks.Count() + + if missingBlocksCount > 0 { + if missingBlocksCount >= epochBlocksCount { + b.logg.Warn("large number of blocks missing this may result in degraded RPC performance set FORCE_BACKFILL=* to continue", "missing_blocks", missingBlocksCount) + _, ok := os.LookupEnv("FORCE_BACKFILL") + if !ok { + os.Exit(0) + } + } + b.logg.Info("bootstrapping queue with missing blocks") + + b.logg.Info("found missing blocks", "skip_latest", skipLatest, "missing_blocks_count", missingBlocksCount) + buffer := make([]uint, missingBlocksCount) + missingBlocks.NextSetMany(0, buffer) + defer missingBlocks.ClearAll() + + for _, block := range buffer { + b.queue.Push(uint64(block)) + } + } + + return nil +} diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..cc58cd3 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,61 @@ +package cache + +import ( + "context" + "fmt" + "log/slog" + + "github.com/grassrootseconomics/celo-tracker/internal/chain" +) + +type ( + Cache interface { + Purge() error + Exists(string) bool + Add(string, bool) + Remove(string) + IsWatchableIndex(string) bool + Size() int + } + CacheOpts struct { + Chain chain.Chain + Logg *slog.Logger + CacheType string + Blacklist []string + Registries []string + Watchlist []string + } +) + +func New(o CacheOpts) (Cache, error) { + var cache Cache + + switch o.CacheType { + case "map": + cache = NewMapCache() + default: + cache = NewMapCache() + o.Logg.Warn("invalid cache type, using default type (map)") + } + + geSmartContracts, err := o.Chain.Provider().GetGESmartContracts( + context.Background(), + o.Registries, + ) + if err != nil { + return nil, fmt.Errorf("cache could not bootstrap GE smart contracts: err %v", err) + } + + for k, v := range geSmartContracts { + cache.Add(k, v) + } + for _, address := range o.Watchlist { + cache.Add(address, false) + } + for _, address := range o.Blacklist { + cache.Remove(address) + } + o.Logg.Info("cache bootstrap complete", "cached_addresses", cache.Size()) + + return cache, nil +} diff --git a/internal/cache/xmap.go b/internal/cache/xmap.go new file mode 100644 index 0000000..b2b8ea0 --- /dev/null +++ b/internal/cache/xmap.go @@ -0,0 +1,48 @@ +package cache + +import ( + "github.com/puzpuzpuz/xsync/v3" +) + +type mapCache struct { + xmap *xsync.Map +} + +func NewMapCache() Cache { + return &mapCache{ + xmap: xsync.NewMap(), + } +} + +func (c *mapCache) Purge() error { + c.xmap.Clear() + return nil +} + +func (c *mapCache) Exists(key string) bool { + _, ok := c.xmap.Load(key) + return ok +} + +func (c *mapCache) Add(key string, value bool) { + c.xmap.Store(key, value) +} + +func (c *mapCache) Remove(key string) { + c.xmap.Delete(key) +} + +func (c *mapCache) Size() int { + return c.xmap.Size() +} + +func (c *mapCache) IsWatchableIndex(key string) bool { + watchable, ok := c.xmap.Load(key) + if !ok { + return false + } + watchableBool, ok := watchable.(bool) + if !ok { + } + return watchableBool +} diff --git a/internal/chain/chain.go b/internal/chain/chain.go new file mode 100644 index 0000000..f15f671 --- /dev/null +++ b/internal/chain/chain.go @@ -0,0 +1,21 @@ +package chain + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celoutils/v3" +) + +type Chain interface { + GetBlocks(context.Context, []uint64) ([]types.Block, error) + GetBlock(context.Context, uint64) (*types.Block, error) + GetLatestBlock(context.Context) (uint64, error) + GetTransaction(context.Context, common.Hash) (*types.Transaction, error) + GetReceipts(context.Context, *types.Block) ([]types.Receipt, error) + GetRevertReason(context.Context, common.Hash, *big.Int) (string, error) + Provider() *celoutils.Provider + IsArchiveNode() bool +} diff --git a/internal/chain/rpc.go b/internal/chain/rpc.go new file mode 100644 index 0000000..91dd5ca --- /dev/null +++ b/internal/chain/rpc.go @@ -0,0 +1,139 @@ +package chain + +import ( + "context" + "math/big" + "net/http" + "time" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/celo-org/celo-blockchain/rpc" + "github.com/grassrootseconomics/celoutils/v3" + "github.com/grassrootseconomics/w3-celo" + "github.com/grassrootseconomics/w3-celo/module/eth" + "github.com/grassrootseconomics/w3-celo/w3types" +) + +type ( + RPCOpts struct { + RPCEndpoint string + ChainID int64 + IsArchiveNode bool + } + + RPC struct { + provider *celoutils.Provider + isArchiveNode bool + } +) + +func NewRPCFetcher(o RPCOpts) (Chain, error) { + customRPCClient, err := lowTimeoutRPCClient(o.RPCEndpoint) + if err != nil { + return nil, err + } + + chainProvider := celoutils.NewProvider( + o.RPCEndpoint, + o.ChainID, + celoutils.WithClient(customRPCClient), + ) + + return &RPC{ + provider: chainProvider, + isArchiveNode: o.IsArchiveNode, + }, nil +} + +func lowTimeoutRPCClient(rpcEndpoint string) (*w3.Client, error) { + httpClient := &http.Client{ + Timeout: 10 * time.Second, + } + + rpcClient, err := rpc.DialHTTPWithClient( + rpcEndpoint, + httpClient, + ) + if err != nil { + return nil, err + } + + return w3.NewClient(rpcClient), nil +} + +func (c *RPC) GetBlocks(ctx context.Context, blockNumbers []uint64) ([]types.Block, error) { + blocksCount := len(blockNumbers) + calls := make([]w3types.RPCCaller, blocksCount) + blocks := make([]types.Block, blocksCount) + + for i, v := range blockNumbers { + calls[i] = eth.BlockByNumber(new(big.Int).SetUint64(v)).Returns(&blocks[i]) + } + + if err := c.provider.Client.CallCtx(ctx, calls...); err != nil { + return nil, err + } + + return blocks, nil +} + +func (c *RPC) GetBlock(ctx context.Context, blockNumber uint64) (*types.Block, error) { + var block types.Block + blockCall := eth.BlockByNumber(new(big.Int).SetUint64(blockNumber)).Returns(&block) + + if err := c.provider.Client.CallCtx(ctx, blockCall); err != nil { + return nil, err + } + + return &block, nil +} + +func (c *RPC) GetLatestBlock(ctx context.Context) (uint64, error) { + var latestBlock big.Int + latestBlockCall := eth.BlockNumber().Returns(&latestBlock) + + if err := c.provider.Client.CallCtx(ctx, latestBlockCall); err != nil { + return 0, err + } + + return latestBlock.Uint64(), nil +} + +func (c *RPC) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, error) { + var transaction types.Transaction + if err := c.provider.Client.CallCtx(ctx, eth.Tx(txHash).Returns(&transaction)); err != nil { + return nil, err + } + + return &transaction, nil +} + +func (c *RPC) GetReceipts(ctx context.Context, block *types.Block) ([]types.Receipt, error) { + txCount := len(block.Transactions()) + + calls := make([]w3types.RPCCaller, txCount) + receipts := make([]types.Receipt, txCount) + + for i, tx := range block.Transactions() { + calls[i] = eth.TxReceipt(tx.Hash()).Returns(&receipts[i]) + } + + if err := c.provider.Client.CallCtx(ctx, calls...); err != nil { + return nil, err + } + + return receipts, nil +} + +func (c *RPC) GetRevertReason(ctx context.Context, txHash common.Hash, blockNumber *big.Int) (string, error) { + return c.provider.SimulateRevertedTx(ctx, txHash, blockNumber) +} + +func (c *RPC) Provider() *celoutils.Provider { + return c.provider +} + +func (c *RPC) IsArchiveNode() bool { + return c.isArchiveNode +} diff --git a/internal/db/bolt.go b/internal/db/bolt.go new file mode 100644 index 0000000..5464c23 --- /dev/null +++ b/internal/db/bolt.go @@ -0,0 +1,182 @@ +package db + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/bits-and-blooms/bitset" + bolt "go.etcd.io/bbolt" +) + +type boltDB struct { + db *bolt.DB +} + +const ( + dbFolderName = "tracker_db" + + upperBoundKey = "upper" + lowerBoundKey = "lower" +) + +var sortableOrder = binary.BigEndian + +func NewBoltDB() (DB, error) { + db, err := bolt.Open(dbFolderName, 0600, nil) + if err != nil { + return nil, err + } + + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte("blocks")) + if err != nil { + return fmt.Errorf("create bucket: %s", err) + } + return nil + }) + + return &boltDB{ + db: db, + }, nil +} + +func (d *boltDB) Close() error { + return d.db.Close() +} + +func (d *boltDB) get(k string) ([]byte, error) { + var v []byte + err := d.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("blocks")) + v = b.Get([]byte(k)) + return nil + }) + + if err != nil { + return nil, err + } + + return v, nil +} + +func (d *boltDB) setUint64(k string, v uint64) error { + err := d.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("blocks")) + return b.Put([]byte(k), marshalUint64(v)) + }) + if err != nil { + return err + } + return nil +} + +func (d *boltDB) setUint64AsKey(v uint64) error { + err := d.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("blocks")) + return b.Put(marshalUint64(v), nil) + }) + if err != nil { + return err + } + return nil +} + +func unmarshalUint64(b []byte) uint64 { + return sortableOrder.Uint64(b) +} + +func marshalUint64(v uint64) []byte { + b := make([]byte, 8) + sortableOrder.PutUint64(b, v) + return b +} + +func (d *boltDB) SetLowerBound(v uint64) error { + return d.setUint64(lowerBoundKey, v) +} + +func (d *boltDB) GetLowerBound() (uint64, error) { + v, err := d.get(lowerBoundKey) + if err != nil { + return 0, err + } + + if v == nil { + return 0, nil + } + + return unmarshalUint64(v), nil +} + +func (d *boltDB) SetUpperBound(v uint64) error { + return d.setUint64(upperBoundKey, v) +} + +func (d *boltDB) GetUpperBound() (uint64, error) { + v, err := d.get(upperBoundKey) + if err != nil { + return 0, err + } + return unmarshalUint64(v), nil +} + +func (d *boltDB) SetValue(v uint64) error { + return d.setUint64AsKey(v) +} + +func (d *boltDB) GetMissingValuesBitSet(lowerBound uint64, upperBound uint64) (*bitset.BitSet, error) { + var ( + b bitset.BitSet + ) + + err := d.db.View(func(tx *bolt.Tx) error { + var ( + lowerRaw = marshalUint64(lowerBound) + upperRaw = marshalUint64(upperBound) + ) + + for i := lowerBound; i <= upperBound; i++ { + b.Set(uint(i)) + } + + c := tx.Bucket([]byte("blocks")).Cursor() + + for k, _ := c.Seek(lowerRaw); k != nil && bytes.Compare(k, upperRaw) <= 0; k, _ = c.Next() { + b.Clear(uint(unmarshalUint64(k))) + } + + return nil + }) + if err != nil { + return nil, err + } + + return &b, nil +} + +func (d *boltDB) Cleanup() error { + lowerBound, err := d.GetLowerBound() + if err != nil { + return err + } + target := marshalUint64(lowerBound - 1) + + err = d.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("blocks")) + c := b.Cursor() + + for k, _ := c.First(); k != nil && bytes.Compare(k, target) <= 0; k, _ = c.Next() { + if err := b.Delete(k); err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/internal/db/db.go b/internal/db/db.go new file mode 100644 index 0000000..46d9389 --- /dev/null +++ b/internal/db/db.go @@ -0,0 +1,48 @@ +package db + +import ( + "log/slog" + + "github.com/bits-and-blooms/bitset" +) + +type ( + DB interface { + Close() error + GetLowerBound() (uint64, error) + SetLowerBound(v uint64) error + SetUpperBound(uint64) error + GetUpperBound() (uint64, error) + SetValue(uint64) error + GetMissingValuesBitSet(uint64, uint64) (*bitset.BitSet, error) + Cleanup() error + } + + DBOpts struct { + Logg *slog.Logger + DBType string + } +) + +func New(o DBOpts) (DB, error) { + var ( + err error + db DB + ) + + switch o.DBType { + case "bolt": + db, err = NewBoltDB() + if err != nil { + return nil, err + } + default: + db, err = NewBoltDB() + if err != nil { + return nil, err + } + o.Logg.Warn("invalid db type, using default type (bolt)") + } + + return db, nil +} diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..fea65c9 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,37 @@ +package event + +import "encoding/json" + +type ( + Event struct { + Block uint64 `json:"block"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + Timestamp uint64 `json:"timestamp"` + TxHash string `json:"transactionHash"` + TxType string `json:"transactionType"` + Payload map[string]any `json:"payload"` + Index uint `json:"-"` + } +) + +func (e Event) Serialize() ([]byte, error) { + jsonData, err := json.Marshal(e) + if err != nil { + return nil, err + } + + return jsonData, err +} + +func Deserialize(jsonData []byte) (Event, error) { + var ( + event Event + ) + + if err := json.Unmarshal(jsonData, &event); err != nil { + return event, err + } + + return event, nil +} diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go new file mode 100644 index 0000000..d7281bd --- /dev/null +++ b/internal/handler/faucet_give.go @@ -0,0 +1,112 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/celoutils/v3" + "github.com/grassrootseconomics/w3-celo" +) + +type faucetGiveHandler struct { + pub pub.Pub +} + +const faucetGiveEventName = "FAUCET_GIVE" + +var ( + faucetGiveTopicHash = w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c") + faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)") + faucetGiveToSig = w3.MustNewFunc("giveTo(address)", "uint256") + faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256") +) + +func NewFaucetGiveHandler(pub pub.Pub) *faucetGiveHandler { + return &faucetGiveHandler{ + pub: pub, + } +} + +func (h *faucetGiveHandler) Name() string { + return faucetGiveEventName +} + +func (h *faucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == faucetGiveTopicHash { + var ( + recipient common.Address + token common.Address + amount big.Int + ) + + if err := faucetGiveEvent.DecodeArgs(msg.Log, &recipient, &token, &amount); err != nil { + return err + } + + faucetGiveEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: faucetGiveEventName, + Payload: map[string]any{ + "recipient": recipient.Hex(), + "token": token.Hex(), + "amount": amount.String(), + }, + } + + return h.pub.Send(ctx, faucetGiveEvent) + } + + return nil +} + +func (h *faucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + faucetGiveEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: faucetGiveEventName, + } + + switch msg.InputData[:8] { + case "63e4bff4": + var to common.Address + + if err := faucetGiveToSig.DecodeArgs(w3.B(msg.InputData), &to); err != nil { + return err + } + + faucetGiveEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "recipient": to.Hex(), + "token": celoutils.ZeroAddress, + "amount": "0", + } + + return h.pub.Send(ctx, faucetGiveEvent) + case "de82efb4": + faucetGiveEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "recipient": celoutils.ZeroAddress, + "token": celoutils.ZeroAddress, + "amount": "0", + } + + return h.pub.Send(ctx, faucetGiveEvent) + } + + return nil +} diff --git a/internal/handler/handler.go b/internal/handler/handler.go new file mode 100644 index 0000000..8e45557 --- /dev/null +++ b/internal/handler/handler.go @@ -0,0 +1,50 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/pub" +) + +type ( + Handler interface { + Name() string + HandleLog(context.Context, LogMessage) error + HandleRevert(context.Context, RevertMessage) error + } + + HandlerPipeline []Handler + + LogMessage struct { + Log *types.Log + Timestamp uint64 + } + + RevertMessage struct { + From string + RevertReason string + InputData string + Block uint64 + ContractAddress string + Timestamp uint64 + TxHash string + } +) + +func New(pub pub.Pub, cache cache.Cache) HandlerPipeline { + return []Handler{ + NewTokenTransferHandler(pub), + NewPoolSwapHandler(pub), + NewFaucetGiveHandler(pub), + NewPoolDepositHandler(pub), + NewTokenMintHandler(pub), + NewTokenBurnHandler(pub), + NewQuoterPriceHandler(pub), + NewOwnershipHandler(pub), + NewSealHandler(pub), + NewIndexAddHandler(pub, cache), + NewIndexRemoveHandler(pub, cache), + } +} diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go new file mode 100644 index 0000000..3c14b0c --- /dev/null +++ b/internal/handler/index_add.go @@ -0,0 +1,113 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type indexAddHandler struct { + pub pub.Pub + cache cache.Cache +} + +const indexAddEventName = "INDEX_ADD" + +var ( + indexAddTopicHash = w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f") + indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)") + indexAddSig = w3.MustNewFunc("add(address)", "bool") + indexRegisterSig = w3.MustNewFunc("register(address)", "bool") +) + +func NewIndexAddHandler(pub pub.Pub, cache cache.Cache) *indexAddHandler { + return &indexAddHandler{ + pub: pub, + cache: cache, + } +} + +func (h *indexAddHandler) Name() string { + return indexAddEventName +} + +func (h *indexAddHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == indexAddTopicHash { + var address common.Address + + if err := indexAddEvent.DecodeArgs(msg.Log, &address); err != nil { + return err + } + + indexAddEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: indexAddEventName, + Payload: map[string]any{ + "address": address.Hex(), + }, + } + + if h.cache.IsWatchableIndex(address.Hex()) { + h.cache.Add(address.Hex(), false) + } + + return h.pub.Send(ctx, indexAddEvent) + } + + return nil +} + +func (h *indexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + indexAddEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: indexAddEventName, + } + + switch msg.InputData[:8] { + case "0a3b0a4f": + var address common.Address + + indexAddEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "address": address.Hex(), + } + + if err := indexAddSig.DecodeArgs(w3.B(msg.InputData), &address); err != nil { + return err + } + + return h.pub.Send(ctx, indexAddEvent) + case "4420e486": + var address common.Address + + indexAddEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "address": address.Hex(), + } + + if err := indexRegisterSig.DecodeArgs(w3.B(msg.InputData), &address); err != nil { + return err + } + + return h.pub.Send(ctx, indexAddEvent) + } + + return nil +} diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go new file mode 100644 index 0000000..cd89536 --- /dev/null +++ b/internal/handler/index_remove.go @@ -0,0 +1,98 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type indexRemoveHandler struct { + pub pub.Pub + cache cache.Cache +} + +const indexRemoveEventName = "INDEX_REMOVE" + +var ( + indexRemoveTopicHash = w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb") + indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)") + indexRemoveSig = w3.MustNewFunc("remove(address)", "bool") +) + +func NewIndexRemoveHandler(pub pub.Pub, cache cache.Cache) *indexRemoveHandler { + return &indexRemoveHandler{ + pub: pub, + cache: cache, + } +} + +func (h *indexRemoveHandler) Name() string { + return indexRemoveEventName +} + +func (h *indexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == indexRemoveTopicHash { + var address common.Address + + if err := indexRemoveEvent.DecodeArgs(msg.Log, &address); err != nil { + return err + } + + indexRemoveEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: indexRemoveEventName, + Payload: map[string]any{ + "address": address.Hex(), + }, + } + + if h.cache.IsWatchableIndex(address.Hex()) { + h.cache.Remove(address.Hex()) + } + + return h.pub.Send(ctx, indexRemoveEvent) + } + + return nil +} + +func (h *indexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "29092d0e": + var address common.Address + + if err := indexRemoveSig.DecodeArgs(w3.B(msg.InputData), &address); err != nil { + return err + } + + indexRemoveEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: indexRemoveEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "address": address.Hex(), + }, + } + + return h.pub.Send(ctx, indexRemoveEvent) + } + + return nil +} diff --git a/internal/handler/ownership.go b/internal/handler/ownership.go new file mode 100644 index 0000000..c56aee4 --- /dev/null +++ b/internal/handler/ownership.go @@ -0,0 +1,98 @@ +package handler + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type ownershipHandler struct { + pub pub.Pub +} + +const ( + ownershipEventName = "OWNERSHIP_TRANSFERRED" +) + +var ( + ownershipTopicHash = w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0") + ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)") + ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool") +) + +func NewOwnershipHandler(pub pub.Pub) *ownershipHandler { + return &ownershipHandler{ + pub: pub, + } +} + +func (h *ownershipHandler) Name() string { + return ownershipEventName +} + +func (h *ownershipHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == ownershipTopicHash { + var ( + previousOwner common.Address + newOwner common.Address + ) + + if err := ownershipEvent.DecodeArgs(msg.Log, &previousOwner, &newOwner); err != nil { + return err + } + + ownershipEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: ownershipEventName, + Payload: map[string]any{ + "previousOwner": previousOwner.Hex(), + "newOwner": newOwner.Hex(), + }, + } + + return h.pub.Send(ctx, ownershipEvent) + } + + return nil +} + +func (h *ownershipHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "f2fde38b": + var newOwner common.Address + + if err := ownershipToSig.DecodeArgs(w3.B(msg.InputData), &newOwner); err != nil { + return err + } + + ownershipEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: ownershipEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "previousOwner": msg.From, + "newOwner": newOwner.Hex(), + }, + } + + return h.pub.Send(ctx, ownershipEvent) + } + + return nil +} diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go new file mode 100644 index 0000000..34042eb --- /dev/null +++ b/internal/handler/pool_deposit.go @@ -0,0 +1,108 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type poolDepositHandler struct { + pub pub.Pub +} + +const poolDepositEventName = "POOL_DEPOSIT" + +var ( + poolDepositTopicHash = w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62") + poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)") + poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "") +) + +func NewPoolDepositHandler(pub pub.Pub) *poolDepositHandler { + return &poolDepositHandler{ + pub: pub, + } +} + +func (h *poolDepositHandler) Name() string { + return poolDepositEventName +} + +func (h *poolDepositHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == poolDepositTopicHash { + var ( + initiator common.Address + tokenIn common.Address + amountIn big.Int + ) + + if err := poolDepositEvent.DecodeArgs( + msg.Log, + &initiator, + &tokenIn, + &amountIn, + ); err != nil { + return err + } + + poolDepositEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: poolDepositEventName, + Payload: map[string]any{ + "initiator": initiator.Hex(), + "tokenIn": tokenIn.Hex(), + "amountIn": amountIn.String(), + }, + } + + return h.pub.Send(ctx, poolDepositEvent) + } + + return nil +} + +func (h *poolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "47e7ef24": + var ( + tokenIn common.Address + amountIn big.Int + ) + + if err := poolDepositSig.DecodeArgs(w3.B(msg.InputData), &tokenIn, &amountIn); err != nil { + return err + } + + poolDepositEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: poolDepositEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "initiator": msg.From, + "tokenIn": tokenIn.Hex(), + "amountIn": amountIn.String(), + }, + } + + return h.pub.Send(ctx, poolDepositEvent) + } + + return nil +} diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go new file mode 100644 index 0000000..8f90e95 --- /dev/null +++ b/internal/handler/pool_swap.go @@ -0,0 +1,121 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type poolSwapHandler struct { + pub pub.Pub +} + +const poolSwapEventName = "POOL_SWAP" + +var ( + poolSwapTopicHash = w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413") + poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)") + poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "") +) + +func NewPoolSwapHandler(pub pub.Pub) *poolSwapHandler { + return &poolSwapHandler{ + pub: pub, + } +} + +func (h *poolSwapHandler) Name() string { + return poolSwapEventName +} + +func (h *poolSwapHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == poolSwapTopicHash { + var ( + initiator common.Address + tokenIn common.Address + tokenOut common.Address + amountIn big.Int + amountOut big.Int + fee big.Int + ) + + if err := poolSwapEvent.DecodeArgs( + msg.Log, + &initiator, + &tokenIn, + &tokenOut, + &amountIn, + &amountOut, + &fee, + ); err != nil { + return err + } + + poolSwapEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: poolSwapEventName, + Payload: map[string]any{ + "initiator": initiator.Hex(), + "tokenIn": tokenIn.Hex(), + "tokenOut": tokenOut.Hex(), + "amountIn": amountIn.String(), + "amountOut": amountOut.String(), + "fee": fee.String(), + }, + } + + return h.pub.Send(ctx, poolSwapEvent) + } + + return nil +} + +func (h *poolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "d9caed12": + var ( + tokenOut common.Address + tokenIn common.Address + amountIn big.Int + ) + + if err := poolSwapSig.DecodeArgs(w3.B(msg.InputData), &tokenOut, &tokenIn, &amountIn); err != nil { + return err + } + + poolSwapEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: poolSwapEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "initiator": msg.From, + "tokenIn": tokenIn.Hex(), + "tokenOut": tokenOut.Hex(), + "amountIn": amountIn.String(), + "amountOut": "0", + "fee": "0", + }, + } + + return h.pub.Send(ctx, poolSwapEvent) + } + + return nil +} diff --git a/internal/handler/quoter_price.go b/internal/handler/quoter_price.go new file mode 100644 index 0000000..fd2f109 --- /dev/null +++ b/internal/handler/quoter_price.go @@ -0,0 +1,100 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type quoterPriceHandler struct { + pub pub.Pub +} + +const quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED" + +var ( + quoterPriceTopicHash = w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1") + quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)") + quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256") +) + +func NewQuoterPriceHandler(pub pub.Pub) *quoterPriceHandler { + return "erPriceHandler{ + pub: pub, + } +} + +func (h *quoterPriceHandler) Name() string { + return quoterPriceEventName +} + +func (h *quoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == quoterPriceTopicHash { + var ( + token common.Address + exchangeRate big.Int + ) + + if err := quoterPriceEvent.DecodeArgs(msg.Log, &token, &exchangeRate); err != nil { + return err + } + + quoterPriceEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: quoterPriceEventName, + Payload: map[string]any{ + "token": token.Hex(), + "exchangeRate": exchangeRate.String(), + }, + } + + return h.pub.Send(ctx, quoterPriceEvent) + } + + return nil +} + +func (h *quoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "ebc59dff": + var ( + token common.Address + exchangeRate big.Int + ) + + if err := quoterPriceToSig.DecodeArgs(w3.B(msg.InputData), &token, &exchangeRate); err != nil { + return err + } + + quoterPriceEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: quoterPriceEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "token": token.Hex(), + "exchangeRate": exchangeRate.String(), + }, + } + + return h.pub.Send(ctx, quoterPriceEvent) + } + + return nil +} diff --git a/internal/handler/seal.go b/internal/handler/seal.go new file mode 100644 index 0000000..0803958 --- /dev/null +++ b/internal/handler/seal.go @@ -0,0 +1,95 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type sealHandler struct { + pub pub.Pub +} + +const sealEventName = "SEAL_STATE_CHANGE" + +var ( + sealTopicHash = w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e") + sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)") + sealToSig = w3.MustNewFunc("seal(uint256)", "uint256") +) + +func NewSealHandler(pub pub.Pub) *sealHandler { + return &sealHandler{ + pub: pub, + } +} + +func (h *sealHandler) Name() string { + return sealEventName +} + +func (h *sealHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == sealTopicHash { + var ( + final bool + sealState big.Int + ) + + if err := sealEvent.DecodeArgs(msg.Log, &final, &sealState); err != nil { + return err + } + + sealEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: sealEventName, + Payload: map[string]any{ + "final": final, + "sealState": sealState.String(), + }, + } + + return h.pub.Send(ctx, sealEvent) + } + + return nil +} + +func (h *sealHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "86fe212d": + var sealState big.Int + + if err := sealToSig.DecodeArgs(w3.B(msg.InputData), &sealState); err != nil { + return err + } + + sealEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: sealEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "sealState": sealState.String(), + }, + } + + return h.pub.Send(ctx, sealEvent) + } + + return nil +} diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go new file mode 100644 index 0000000..c4d7dab --- /dev/null +++ b/internal/handler/token_burn.go @@ -0,0 +1,97 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type tokenBurnHandler struct { + pub pub.Pub +} + +const burnEventName = "TOKEN_BURN" + +var ( + tokenBurnTopicHash = w3.H("0xcc16f5dbb4873280815c1ee09dbd06736cffcc184412cf7a71a0fdb75d397ca5") + tokenBurnEvent = w3.MustNewEvent("tokenBurn(address indexed _tokenBurner, uint256 _value)") + tokenBurnToSig = w3.MustNewFunc("tokenBurn(uint256)", "bool") +) + +func NewTokenBurnHandler(pub pub.Pub) *tokenBurnHandler { + return &tokenBurnHandler{ + pub: pub, + } +} + +func (h *tokenBurnHandler) Name() string { + return burnEventName +} + +func (h *tokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == tokenBurnTopicHash { + var ( + tokenBurner common.Address + value big.Int + ) + + if err := tokenBurnEvent.DecodeArgs(msg.Log, &tokenBurner, &value); err != nil { + return err + } + + tokenBurnEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: burnEventName, + Payload: map[string]any{ + "tokenBurner": tokenBurner.Hex(), + "value": value.String(), + }, + } + + return h.pub.Send(ctx, tokenBurnEvent) + } + + return nil +} + +func (h *tokenBurnHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "42966c68": + var value big.Int + + if err := tokenBurnToSig.DecodeArgs(w3.B(msg.InputData), &value); err != nil { + return err + } + + tokenBurnEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: burnEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "tokenBurner": msg.From, + "value": value.String(), + }, + } + + return h.pub.Send(ctx, tokenBurnEvent) + } + + return nil +} diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go new file mode 100644 index 0000000..439143d --- /dev/null +++ b/internal/handler/token_mint.go @@ -0,0 +1,103 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type tokenMintHandler struct { + pub pub.Pub +} + +const mintEventName = "TOKEN_MINT" + +var ( + tokenMintTopicHash = w3.H("0xab8530f87dc9b59234c4623bf917212bb2536d647574c8e7e5da92c2ede0c9f8") + tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)") + tokenMintToSig = w3.MustNewFunc("MintTo(address, uint256)", "bool") +) + +func NewTokenMintHandler(pub pub.Pub) *tokenMintHandler { + return &tokenMintHandler{ + pub: pub, + } +} + +func (h *tokenMintHandler) Name() string { + return mintEventName +} + +func (h *tokenMintHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == tokenMintTopicHash { + var ( + tokenMinter common.Address + to common.Address + value big.Int + ) + + if err := tokenMintEvent.DecodeArgs(msg.Log, &tokenMinter, &to, &value); err != nil { + return err + } + + tokenMintEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: mintEventName, + Payload: map[string]any{ + "tokenMinter": tokenMinter.Hex(), + "to": to.Hex(), + "value": value.String(), + }, + } + + return h.pub.Send(ctx, tokenMintEvent) + } + + return nil +} + +func (h *tokenMintHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + switch msg.InputData[:8] { + case "449a52f8": + var ( + to common.Address + value big.Int + ) + + if err := tokenMintToSig.DecodeArgs(w3.B(msg.InputData), &to, &value); err != nil { + return err + } + + tokenMintEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: mintEventName, + Payload: map[string]any{ + "revertReason": msg.RevertReason, + "tokenMinter": msg.From, + "to": to.Hex(), + "value": value.String(), + }, + } + + return h.pub.Send(ctx, tokenMintEvent) + } + + return nil +} diff --git a/internal/handler/token_transfer.go b/internal/handler/token_transfer.go new file mode 100644 index 0000000..7b9acba --- /dev/null +++ b/internal/handler/token_transfer.go @@ -0,0 +1,124 @@ +package handler + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/w3-celo" +) + +type tokenTransferHandler struct { + pub pub.Pub +} + +const transferEventName = "TOKEN_TRANSFER" + +var ( + tokenTransferTopicHash = w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") + tokenTransferEvent = w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)") + tokenTransferSig = w3.MustNewFunc("transfer(address, uint256)", "bool") + tokenTransferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool") +) + +func NewTokenTransferHandler(pub pub.Pub) *tokenTransferHandler { + return &tokenTransferHandler{ + pub: pub, + } +} + +func (h *tokenTransferHandler) Name() string { + return transferEventName +} + +func (h *tokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage) error { + if msg.Log.Topics[0] == tokenTransferTopicHash { + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := tokenTransferEvent.DecodeArgs(msg.Log, &from, &to, &value); err != nil { + return err + } + + tokenTransferEvent := event.Event{ + Index: msg.Log.Index, + Block: msg.Log.BlockNumber, + ContractAddress: msg.Log.Address.Hex(), + Success: true, + Timestamp: msg.Timestamp, + TxHash: msg.Log.TxHash.Hex(), + TxType: transferEventName, + Payload: map[string]any{ + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + }, + } + + return h.pub.Send(ctx, tokenTransferEvent) + } + + return nil +} + +func (h *tokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessage) error { + if len(msg.InputData) < 8 { + return nil + } + + tokenTransferEvent := event.Event{ + Block: msg.Block, + ContractAddress: msg.ContractAddress, + Success: false, + Timestamp: msg.Timestamp, + TxHash: msg.TxHash, + TxType: transferEventName, + } + + switch msg.InputData[:8] { + case "a9059cbb": + var ( + to common.Address + value big.Int + ) + + if err := tokenTransferSig.DecodeArgs(w3.B(msg.InputData), &to, &value); err != nil { + return err + } + + tokenTransferEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "from": msg.From, + "to": to.Hex(), + "value": value.String(), + } + + return h.pub.Send(ctx, tokenTransferEvent) + case "23b872dd": + var ( + from common.Address + to common.Address + value big.Int + ) + + if err := tokenTransferFromSig.DecodeArgs(w3.B(msg.InputData), &from, &to, &value); err != nil { + return err + } + + tokenTransferEvent.Payload = map[string]any{ + "revertReason": msg.RevertReason, + "from": from.Hex(), + "to": to.Hex(), + "value": value.String(), + } + + return h.pub.Send(ctx, tokenTransferEvent) + } + + return nil +} diff --git a/internal/pool/pool.go b/internal/pool/pool.go new file mode 100644 index 0000000..40a8cdb --- /dev/null +++ b/internal/pool/pool.go @@ -0,0 +1,28 @@ +package pool + +import ( + "log/slog" + "runtime/debug" + + "github.com/alitto/pond" +) + +type PoolOpts struct { + Logg *slog.Logger + WorkerCount int +} + +func NewPool(o PoolOpts) *pond.WorkerPool { + return pond.New( + o.WorkerCount, + 1, + pond.Strategy(pond.Balanced()), + pond.PanicHandler(panicHandler(o.Logg)), + ) +} + +func panicHandler(logg *slog.Logger) func(interface{}) { + return func(panic interface{}) { + logg.Error("block processor goroutine exited from a panic", "error", panic, "stack_trace", string(debug.Stack())) + } +} diff --git a/internal/processor/processor.go b/internal/processor/processor.go new file mode 100644 index 0000000..7583d33 --- /dev/null +++ b/internal/processor/processor.go @@ -0,0 +1,147 @@ +package processor + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/internal/cache" + "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/handler" + "github.com/grassrootseconomics/celo-tracker/internal/pub" + "github.com/grassrootseconomics/celo-tracker/internal/stats" +) + +type ( + ProcessorOpts struct { + Cache cache.Cache + DB db.DB + Chain chain.Chain + Pub pub.Pub + Logg *slog.Logger + Stats *stats.Stats + } + + Processor struct { + cache cache.Cache + db db.DB + chain chain.Chain + handlerPipeline handler.HandlerPipeline + logg *slog.Logger + stats *stats.Stats + } +) + +func NewProcessor(o ProcessorOpts) *Processor { + return &Processor{ + cache: o.Cache, + db: o.DB, + handlerPipeline: handler.New(o.Pub, o.Cache), + chain: o.Chain, + logg: o.Logg, + stats: o.Stats, + } +} + +func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error { + block, err := p.chain.GetBlock(ctx, blockNumber) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("block %d error: %v", blockNumber, err) + } + + receiptsResp, err := p.chain.GetReceipts(ctx, block) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("receipts fetch error: block %d: %v", blockNumber, err) + } + + for _, receipt := range receiptsResp { + if receipt.Status > 0 { + for _, log := range receipt.Logs { + if p.cache.Exists(log.Address.Hex()) { + msg := handler.LogMessage{ + Log: log, + Timestamp: block.Time(), + } + + if err := p.handleLog(ctx, msg); err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("handle logs error: block %d: %v", blockNumber, err) + } + } + } + } else if p.isTrieAvailable(blockNumber) { + tx, err := p.chain.GetTransaction(ctx, receipt.TxHash) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("get transaction error: tx %s: %v", receipt.TxHash.Hex(), err) + } + + if tx.To() == nil { + return nil + } + + if p.cache.Exists(tx.To().Hex()) { + from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx) + if err != nil { + return fmt.Errorf("transaction decode error: tx %s: %v", receipt.TxHash.Hex(), err) + } + + revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("get revert reason error: tx %s: %v", receipt.TxHash.Hex(), err) + } + + msg := handler.RevertMessage{ + From: from.Hex(), + RevertReason: revertReason, + InputData: common.Bytes2Hex(tx.Data()), + Block: blockNumber, + ContractAddress: tx.To().Hex(), + Timestamp: block.Time(), + TxHash: receipt.TxHash.Hex(), + } + + if err := p.handleRevert(ctx, msg); err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("handle revert error: tx %s: %v", receipt.TxHash.Hex(), err) + } + } + } + } + + if err := p.db.SetValue(blockNumber); err != nil { + return err + } + p.logg.Debug("successfully processed block", "block", blockNumber) + + return nil +} + +func (p *Processor) isTrieAvailable(blockNumber uint64) bool { + available := p.chain.IsArchiveNode() || p.stats.GetLatestBlock()-blockNumber <= 256 + if !available { + p.logg.Warn("skipping block due to potentially missing trie", "block_number", blockNumber) + } + return available +} + +func (p *Processor) handleLog(ctx context.Context, msg handler.LogMessage) error { + for _, handler := range p.handlerPipeline { + if err := handler.HandleLog(ctx, msg); err != nil { + return fmt.Errorf("log handler: %s err: %v", handler.Name(), err) + } + } + + return nil +} + +func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error { + for _, handler := range p.handlerPipeline { + if err := handler.HandleRevert(ctx, msg); err != nil { + return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err) + } + } + + return nil +} diff --git a/internal/pub/console.go b/internal/pub/console.go new file mode 100644 index 0000000..aca98f1 --- /dev/null +++ b/internal/pub/console.go @@ -0,0 +1,30 @@ +package pub + +import ( + "context" + "log/slog" + + "github.com/grassrootseconomics/celo-tracker/internal/event" +) + +type consolePub struct { + logg *slog.Logger +} + +func NewConsolePub(logg *slog.Logger) Pub { + return &consolePub{ + logg: logg, + } +} + +func (p *consolePub) Send(_ context.Context, payload event.Event) error { + data, err := payload.Serialize() + if err != nil { + return err + } + + p.logg.Info("emitted event", "json_payload", string(data)) + return nil +} + +func (p *consolePub) Close() {} diff --git a/internal/pub/jetstream.go b/internal/pub/jetstream.go new file mode 100644 index 0000000..4ec5c68 --- /dev/null +++ b/internal/pub/jetstream.go @@ -0,0 +1,92 @@ +package pub + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/nats-io/nats.go" +) + +type ( + JetStreamOpts struct { + Logg *slog.Logger + Endpoint string + DedupDuration time.Duration + PersistDuration time.Duration + } + + jetStreamPub struct { + natsConn *nats.Conn + jsCtx nats.JetStreamContext + } +) + +const streamName string = "TRACKER" + +var streamSubjects = []string{ + "TRACKER.*", +} + +func NewJetStreamPub(o JetStreamOpts) (Pub, error) { + natsConn, err := nats.Connect(o.Endpoint) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + o.Logg.Info("successfully connected to NATS server") + + stream, err := js.StreamInfo(streamName) + if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { + return nil, err + } + if stream == nil { + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: streamSubjects, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName) + } + + return &jetStreamPub{ + natsConn: natsConn, + jsCtx: js, + }, nil +} + +func (p *jetStreamPub) Close() { + if p.natsConn != nil { + p.natsConn.Close() + } +} + +func (p *jetStreamPub) Send(_ context.Context, payload event.Event) error { + data, err := payload.Serialize() + if err != nil { + return err + } + + _, err = p.jsCtx.Publish( + fmt.Sprintf("%s.%s", streamName, payload.TxType), + data, + nats.MsgId(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)), + ) + if err != nil { + return err + } + + return nil +} diff --git a/internal/pub/pub.go b/internal/pub/pub.go new file mode 100644 index 0000000..1a3c829 --- /dev/null +++ b/internal/pub/pub.go @@ -0,0 +1,12 @@ +package pub + +import ( + "context" + + "github.com/grassrootseconomics/celo-tracker/internal/event" +) + +type Pub interface { + Send(context.Context, event.Event) error + Close() +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..0bc6a0e --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,67 @@ +package queue + +import ( + "context" + "log/slog" + + "github.com/alitto/pond" + "github.com/grassrootseconomics/celo-tracker/internal/processor" +) + +type ( + QueueOpts struct { + Logg *slog.Logger + Processor *processor.Processor + Pool *pond.WorkerPool + } + + Queue struct { + logg *slog.Logger + processChan chan uint64 + stopSignal chan interface{} + processor *processor.Processor + pool *pond.WorkerPool + } +) + +func New(o QueueOpts) *Queue { + return &Queue{ + logg: o.Logg, + processChan: make(chan uint64, 17_280), + stopSignal: make(chan interface{}), + processor: o.Processor, + pool: o.Pool, + } +} + +func (q *Queue) Stop() { + q.stopSignal <- struct{}{} +} + +func (q *Queue) Process() { + for { + select { + case <-q.stopSignal: + q.logg.Info("shutdown signal received stopping queue processing") + return + case block, ok := <-q.processChan: + if !ok { + return + } + q.pool.Submit(func() { + err := q.processor.ProcessBlock(context.Background(), block) + if err != nil { + q.logg.Error("block processor error", "block_number", block, "error", err) + } + }) + } + } +} + +func (q *Queue) Push(block uint64) { + q.processChan <- block +} + +func (q *Queue) Size() int { + return len(q.processChan) +} diff --git a/internal/stats/stats.go b/internal/stats/stats.go new file mode 100644 index 0000000..e125df2 --- /dev/null +++ b/internal/stats/stats.go @@ -0,0 +1,78 @@ +package stats + +import ( + "log/slog" + "sync/atomic" + "time" + + "github.com/alitto/pond" + "github.com/grassrootseconomics/celo-tracker/internal/cache" +) + +type ( + StatsOpts struct { + Cache cache.Cache + Logg *slog.Logger + Pool *pond.WorkerPool + } + + Stats struct { + cache cache.Cache + logg *slog.Logger + pool *pond.WorkerPool + stopCh chan struct{} + + latestBlock atomic.Uint64 + } +) + +const statsPrinterInterval = 5 * time.Second + +func New(o StatsOpts) *Stats { + return &Stats{ + cache: o.Cache, + logg: o.Logg, + pool: o.Pool, + stopCh: make(chan struct{}), + } +} + +func (s *Stats) SetLatestBlock(v uint64) { + s.latestBlock.Store(v) +} + +func (s *Stats) GetLatestBlock() uint64 { + return s.latestBlock.Load() +} + +func (s *Stats) Stop() { + s.stopCh <- struct{}{} +} + +func (s *Stats) APIStatsResponse() map[string]interface{} { + return map[string]interface{}{ + "latestBlock": s.GetLatestBlock(), + "poolQueueSize": s.pool.WaitingTasks(), + "poolActiveWorkers": s.pool.RunningWorkers(), + "cacheSize": s.cache.Size(), + } +} + +func (s *Stats) StartStatsPrinter() { + ticker := time.NewTicker(statsPrinterInterval) + + for { + select { + case <-s.stopCh: + s.logg.Debug("stats shutting down") + return + case <-ticker.C: + s.logg.Info("block stats", + "latest_block", s.GetLatestBlock(), + "pool_queue_size", s.pool.WaitingTasks(), + "pool_active_workers", s.pool.RunningWorkers(), + "cache_size", s.cache.Size(), + ) + } + } +} diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go new file mode 100644 index 0000000..d49ae52 --- /dev/null +++ b/internal/syncer/realtime.go @@ -0,0 +1,80 @@ +package syncer + +import ( + "context" + "time" + + "github.com/celo-org/celo-blockchain" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/celo-org/celo-blockchain/event" +) + +type BlockQueueFn func(uint64) error + +const resubscribeInterval = 2 * time.Second + +func (s *Syncer) Stop() { + if s.realtimeSub != nil { + s.realtimeSub.Unsubscribe() + } +} + +func (s *Syncer) Start() { + s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn()) +} + +func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) { + newHeadersReceiver := make(chan *types.Header, 1) + sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) + s.logg.Info("realtime syncer connected to ws endpoint") + if err != nil { + return nil, err + } + + return event.NewSubscription(func(quit <-chan struct{}) error { + eventsCtx, eventsCancel := context.WithCancel(context.Background()) + defer eventsCancel() + + go func() { + select { + case <-quit: + s.logg.Info("realtime syncer stopping") + eventsCancel() + case <-eventsCtx.Done(): + return + } + }() + + for { + select { + case header := <-newHeadersReceiver: + if err := fn(header.Number.Uint64()); err != nil { + s.logg.Error("realtime block queuer error", "error", err) + } + case <-eventsCtx.Done(): + s.logg.Info("realtime syncer shutting down") + return nil + case err := <-sub.Err(): + return err + } + } + }), nil +} + +func (s *Syncer) queueRealtimeBlock(blockNumber uint64) error { + s.queue.Push(blockNumber) + if err := s.db.SetUpperBound(blockNumber); err != nil { + return err + } + s.stats.SetLatestBlock(blockNumber) + return nil +} + +func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc { + return func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + s.logg.Error("resubscribing after failed subscription", "error", err) + } + return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock) + } +} diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go new file mode 100644 index 0000000..a1fbcd7 --- /dev/null +++ b/internal/syncer/syncer.go @@ -0,0 +1,77 @@ +package syncer + +import ( + "context" + "log/slog" + + "github.com/celo-org/celo-blockchain" + "github.com/celo-org/celo-blockchain/ethclient" + "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/queue" + "github.com/grassrootseconomics/celo-tracker/internal/stats" +) + +type ( + SyncerOpts struct { + DB db.DB + Chain chain.Chain + Logg *slog.Logger + Queue *queue.Queue + Stats *stats.Stats + StartBlock int64 + WebSocketEndpoint string + } + + Syncer struct { + db db.DB + ethClient *ethclient.Client + logg *slog.Logger + realtimeSub celo.Subscription + stats *stats.Stats + queue *queue.Queue + stopCh chan struct{} + } +) + +func New(o SyncerOpts) (*Syncer, error) { + latestBlock, err := o.Chain.GetLatestBlock(context.Background()) + if err != nil { + return nil, err + } + + lowerBound, err := o.DB.GetLowerBound() + if err != nil { + return nil, err + } + if lowerBound == 0 { + if o.StartBlock > 0 { + if err := o.DB.SetLowerBound(uint64(o.StartBlock)); err != nil { + return nil, err + } + } else { + if err := o.DB.SetLowerBound(latestBlock); err != nil { + return nil, err + } + } + } + + if err := o.DB.SetUpperBound(latestBlock); err != nil { + return nil, err + } + o.Stats.SetLatestBlock(latestBlock) + + ethClient, err := ethclient.Dial(o.WebSocketEndpoint) + if err != nil { + return nil, err + } + + return &Syncer{ + db: o.DB, + ethClient: ethClient, + logg: o.Logg, + stats: o.Stats, + queue: o.Queue, + stopCh: make(chan struct{}), + }, nil +} diff --git a/revive.toml b/revive.toml new file mode 100644 index 0000000..500fd87 --- /dev/null +++ b/revive.toml @@ -0,0 +1,44 @@ +ignoreGeneratedHeader = true +severity = "warning" +confidence = 0.8 +errorCode = 0 +warningCode = 0 + +[rule.blank-imports] +[rule.context-as-argument] +[rule.context-keys-type] +[rule.dot-imports] +[rule.error-return] +[rule.error-strings] +[rule.error-naming] +[rule.increment-decrement] +[rule.var-naming] +[rule.var-declaration] +[rule.range] +[rule.receiver-naming] +[rule.time-naming] +[rule.unexported-return] +[rule.indent-error-flow] +[rule.errorf] +[rule.empty-block] +[rule.superfluous-else] +[rule.unused-parameter] +[rule.unreachable-code] +[rule.redefines-builtin-id] + + +[rule.defer] +arguments = [["loop", "method-call", "recover", "return", "immediate-recover"]] + +[rule.string-of-int] + +[rule.atomic] +[rule.call-to-gc] +[rule.constant-logical-expr] +[rule.identical-branches] +[rule.modifies-parameter] +[rule.modifies-value-receiver] +[rule.range-val-address] +[rule.range-val-in-closure] +[rule.unconditional-recursion] +[rule.waitgroup-by-value]