From 1895ae3b24301df0e67070537069da944e4f67a6 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Mon, 6 May 2024 19:27:18 +0800 Subject: [PATCH] improve verifier run logic, perf tests, fix minor bugs --- cmd/main.go | 49 ++++++----- config.toml | 11 +-- go.mod | 19 +---- go.sum | 48 +---------- internal/backfiller/backfiller.go | 126 ++++++++++++++++++++++++++++ internal/cache/bootstrap.go | 6 +- internal/cache/cache.go | 2 +- internal/cache/map.go | 2 +- internal/db/db.go | 5 ++ internal/handler/index_add.go | 2 +- internal/handler/index_remove.go | 2 +- internal/syncer/realtime.go | 4 + internal/syncer/syncer.go | 31 +++++-- internal/verifier/verifier.go | 133 ------------------------------ 14 files changed, 201 insertions(+), 239 deletions(-) create mode 100644 internal/backfiller/backfiller.go delete mode 100644 internal/verifier/verifier.go diff --git a/cmd/main.go b/cmd/main.go index f32164c..94c52ec 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/grassrootseconomics/celo-tracker/internal/backfiller" "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/pool" @@ -19,7 +20,6 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" - "github.com/grassrootseconomics/celo-tracker/internal/verifier" "github.com/grassrootseconomics/celo-tracker/pkg/chain" "github.com/knadh/koanf/v2" ) @@ -45,24 +45,24 @@ func init() { lo.Info("starting celo tracker", "build", build) } +/* +Dependency Order +---------------- +- Stats +- Chain +- DB +- Cache +- JetStream Pub +- Worker Pool +- Block Processor +- Chain Syncer +- Verifier +*/ func main() { var wg sync.WaitGroup + ctx, stop := notifyShutdown() - /* - Dependency Order - ---------------- - - Stats - - Chain - - DB - - Cache - - JetStream Pub - - Worker Pool - - Block Processor - - Chain Syncer - - Verifier - - */ stats := stats.New(lo) chain, err := chain.New(chain.ChainOpts{ @@ -107,7 +107,7 @@ func main() { } workerPool := pool.NewPool(pool.PoolOpts{ - PoolSize: runtime.NumCPU(), + PoolSize: runtime.NumCPU()*3 + 1, }) blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ @@ -125,6 +125,7 @@ func main() { Chain: chain, DB: db, Logg: lo, + StartBlock: ko.Int64("chain.start_block"), Stats: stats, WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), }) @@ -133,7 +134,7 @@ func main() { os.Exit(1) } - verifier := verifier.New(verifier.VerifierOpts{ + backfiller := backfiller.New(backfiller.BackfillerOpts{ BlockWorker: workerPool, BlockProcessor: blockProcessor, Chain: chain, @@ -151,7 +152,11 @@ func main() { wg.Add(1) go func() { defer wg.Done() - verifier.Start() + if err := backfiller.Run(false); err != nil { + lo.Error("backfiller initial run error", "error", err) + } + + backfiller.Start() }() <-ctx.Done() @@ -161,10 +166,14 @@ func main() { wg.Add(1) go func() { defer wg.Done() - workerPool.StopWait() chainSyncer.Stop() - verifier.Stop() + backfiller.Stop() + workerPool.Stop() jetStreamPub.Close() + + if err := db.Cleanup(); err != nil { + lo.Error("db compaction failed", "error", err) + } }() go func() { diff --git a/config.toml b/config.toml index 43429a9..ec8b885 100644 --- a/config.toml +++ b/config.toml @@ -5,16 +5,13 @@ go_process = true address = ":5001" [chain] -ws_endpoint = "wss://ws.celo.grassecon.net" -rpc_endpoint = "https://celo.grassecon.net" +ws_endpoint = "wss://forno.celo.org/ws" +rpc_endpoint = "https://forno.celo.org" testnet = false -realtime = true -historical = true -start_block = 25091040 -batch_size = 30 +start_block = 25462593 + [bootstrap] -# https://software.grassecon.org/addresses ge_registries = [ "0xd1FB944748aca327a1ba036B082993D9dd9Bfa0C", "0x0cc9f4fff962def35bb34a53691180b13e653030", diff --git a/go.mod b/go.mod index c702f86..599047f 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,9 @@ module github.com/grassrootseconomics/celo-tracker go 1.22.1 require ( - github.com/alitto/pond v1.8.3 - github.com/arl/statsviz v0.6.0 github.com/bits-and-blooms/bitset v1.13.0 github.com/celo-org/celo-blockchain v1.8.0 - github.com/dgraph-io/badger/v4 v4.2.0 - github.com/ef-ds/deque/v2 v2.0.2 + github.com/gammazero/workerpool v1.1.3 github.com/grassrootseconomics/celoutils/v2 v2.6.0 github.com/grassrootseconomics/w3-celo v0.16.0 github.com/kamikazechaser/common v0.2.0 @@ -16,6 +13,8 @@ require ( github.com/knadh/koanf/providers/env v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 github.com/knadh/koanf/v2 v2.1.0 + github.com/nats-io/nats.go v1.34.1 + github.com/puzpuzpuz/xsync/v3 v3.1.0 go.etcd.io/bbolt v1.3.9 ) @@ -34,20 +33,12 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gammazero/deque v0.2.0 // indirect - github.com/gammazero/workerpool v1.1.3 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect - github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/glog v1.0.0 // indirect - github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect - github.com/google/flatbuffers v1.12.1 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/hdevalence/ed25519consensus v0.0.0-20201207055737-7fde80a9d5ff // indirect @@ -61,7 +52,6 @@ require ( github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect - github.com/nats-io/nats.go v1.34.1 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect @@ -69,13 +59,11 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/tsdb v0.7.1 // indirect - github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect - go.opencensus.io v0.22.5 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sync v0.5.0 // indirect @@ -83,7 +71,6 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect - google.golang.org/protobuf v1.28.1 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index d6530cb..b79ec16 100644 --- a/go.sum +++ b/go.sum @@ -45,14 +45,10 @@ github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBA github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs= -github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= -github.com/arl/statsviz v0.6.0 h1:jbW1QJkEYQkufd//4NDYRSNBpwJNrdzPahF7ZmoGdyE= -github.com/arl/statsviz v0.6.0/go.mod h1:0toboo+YGSUXDaS4g1D5TVS4dXs7S7YYT5J/qnW2h8s= github.com/aws/aws-sdk-go-v2 v1.2.0/go.mod h1:zEQs02YRBw1DjK0PoJv3ygDYOFTre1ejlJWl8FwAuQo= github.com/aws/aws-sdk-go-v2/config v1.1.1/go.mod h1:0XsVy9lBI/BCXm+2Tuvt39YmdHwS5unDQmxZOYe8F5Y= github.com/aws/aws-sdk-go-v2/credentials v1.1.1/go.mod h1:mM2iIjwl7LULWtS6JCACyInboHirisUUdkBPoTHMOUo= @@ -140,23 +136,13 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeC github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M= github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= -github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= -github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= -github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/docker/docker v1.6.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= -github.com/ef-ds/deque/v2 v2.0.2 h1:GQtDK1boBMu/qsNbSLQsqzwNptaioxZI39X3UxT5ALA= -github.com/ef-ds/deque/v2 v2.0.2/go.mod h1:hoZy4VooWLhRT4uS+sSCilfgBQUNptJU2FGqr08a5sc= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -200,15 +186,10 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDs github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= -github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= -github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= -github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 h1:5ZkaAPbicIKTF2I64qf5Fh8Aa83Q/dnOafMYV0OMwjA= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -223,9 +204,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -235,15 +213,12 @@ github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGS github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= -github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= -github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -263,10 +238,6 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= -github.com/grassrootseconomics/celoutils/v2 v2.4.1 h1:8S4+TfXVevxu3+tBIyzGM4z6iT3Rw3fZHoS93rGFCVo= -github.com/grassrootseconomics/celoutils/v2 v2.4.1/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg= -github.com/grassrootseconomics/celoutils/v2 v2.4.2 h1:EAXLMLJhv9ukAlM2me8A+jHInxXeSSOmEWKY9zHPONQ= -github.com/grassrootseconomics/celoutils/v2 v2.4.2/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg= github.com/grassrootseconomics/celoutils/v2 v2.6.0 h1:Ccr4KQ7lvsO73xaOmu+OW1jn1p9l0/lGm5vEw81HXGQ= github.com/grassrootseconomics/celoutils/v2 v2.6.0/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg= github.com/grassrootseconomics/w3-celo v0.16.0 h1:AKPd+LGqR4YgkLw44V4Jgq/+prhJfTnaWzFOdS8JRgg= @@ -323,12 +294,9 @@ github.com/kamikazechaser/common v0.2.0 h1:bqi5UaMTDm/wtZlJEvQDNhsLVJP4Beg+HKWeQ github.com/kamikazechaser/common v0.2.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= github.com/karalabe/usb v0.0.0-20190919080040-51dc0efba356/go.mod h1:Od972xHfMJowv7NGVDiWVxk2zxnWgjLlJzE+F4F7AGU= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= -github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= -github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= @@ -503,16 +471,15 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= -github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= -go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -554,7 +521,6 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -573,7 +539,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -640,7 +605,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -686,11 +650,7 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -740,10 +700,6 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/backfiller/backfiller.go b/internal/backfiller/backfiller.go new file mode 100644 index 0000000..3c0aa67 --- /dev/null +++ b/internal/backfiller/backfiller.go @@ -0,0 +1,126 @@ +package backfiller + +import ( + "context" + "log/slog" + "time" + + "github.com/gammazero/workerpool" + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/processor" + "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" +) + +type ( + BackfillerOpts struct { + BlockWorker *workerpool.WorkerPool + BlockProcessor *processor.Processor + Chain *chain.Chain + DB *db.DB + Logg *slog.Logger + Stats *stats.Stats + } + + Backfiller struct { + blockWorker *workerpool.WorkerPool + blockProcessor *processor.Processor + chain *chain.Chain + db *db.DB + logg *slog.Logger + quit chan struct{} + stats *stats.Stats + } +) + +const ( + blockBatchSize = 50 + verifierInterval = 20 * time.Second +) + +func New(o BackfillerOpts) *Backfiller { + return &Backfiller{ + blockWorker: o.BlockWorker, + blockProcessor: o.BlockProcessor, + chain: o.Chain, + db: o.DB, + logg: o.Logg, + quit: make(chan struct{}), + stats: o.Stats, + } +} + +func (b *Backfiller) Start() { + ticker := time.NewTicker(verifierInterval) + + for { + select { + case <-b.quit: + b.logg.Info("verifier shutting down") + return + case <-ticker.C: + if b.blockWorker.WaitingQueueSize() <= 1 { + if err := b.Run(true); err != nil { + b.logg.Error("verifier run error", "err", err) + } + } + } + } +} + +func (b *Backfiller) Stop() { + b.quit <- struct{}{} +} + +func (b *Backfiller) Run(skipLatest bool) error { + lower, err := b.db.GetLowerBound() + if err != nil { + return err + } + upper, err := b.db.GetUpperBound() + if err != nil { + return err + } + + if skipLatest { + upper-- + } + + missingBlocks, err := b.db.GetMissingValuesBitSet(lower, upper) + if err != nil { + return err + } + missingBlocksCount := missingBlocks.Count() + if missingBlocksCount > 0 { + b.logg.Info("found missing blocks", "skip_latest", skipLatest, "missing_blocks_count", missingBlocksCount) + buffer := make([]uint, missingBlocksCount) + missingBlocks.NextSetMany(0, buffer) + defer missingBlocks.ClearAll() + + for i := 0; i < int(missingBlocksCount); i += blockBatchSize { + end := i + blockBatchSize + if end > int(missingBlocksCount) { + end = int(missingBlocksCount) + } + batch := make([]uint64, end-i) + for j := i; j < end; j++ { + batch[j-i] = uint64(buffer[j]) + } + + blocks, err := b.chain.GetBlocks(context.Background(), batch) + if err != nil { + return err + } + + for _, block := range blocks { + b.blockWorker.Submit(func() { + if err := b.blockProcessor.ProcessBlock(context.Background(), block); err != nil { + b.logg.Error("block processor error", "source", "backfiller", "block", block.NumberU64(), "error", err) + } + }) + } + } + } + + return nil +} diff --git a/internal/cache/bootstrap.go b/internal/cache/bootstrap.go index b3840c9..7478b0e 100644 --- a/internal/cache/bootstrap.go +++ b/internal/cache/bootstrap.go @@ -11,9 +11,7 @@ import ( ) func bootstrapGESmartContracts(ctx context.Context, registries []string, chain *chain.Chain, cache Cache) (WatchableIndex, error) { - var ( - watchableIndex = make(WatchableIndex) - ) + var watchableIndex = make(WatchableIndex) for _, registry := range registries { registryMap, err := chain.Provider.RegistryMap(ctx, w3.A(registry)) @@ -76,5 +74,5 @@ func bootstrapGESmartContracts(ctx context.Context, registries []string, chain * } } - return watchableIndex, nil + return nil, nil } diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 16efe3c..49e31e5 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -15,7 +15,7 @@ type ( Add(string) Remove(string) SetWatchableIndex(WatchableIndex) - ISWatchAbleIndex(string) bool + IsWatchAbleIndex(string) bool Size() int } diff --git a/internal/cache/map.go b/internal/cache/map.go index 432439d..edf16ba 100644 --- a/internal/cache/map.go +++ b/internal/cache/map.go @@ -46,7 +46,7 @@ func (c *MapCache) SetWatchableIndex(watchableIndex WatchableIndex) { c.watchableIndex = watchableIndex } -func (c *MapCache) ISWatchAbleIndex(key string) bool { +func (c *MapCache) IsWatchAbleIndex(key string) bool { _, ok := c.watchableIndex[key] return ok } diff --git a/internal/db/db.go b/internal/db/db.go index e9cbe11..d7b8b6d 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -112,6 +112,11 @@ func (d *DB) GetLowerBound() (uint64, error) { if err != nil { return 0, err } + + if v == nil { + return 0, nil + } + return unmarshalUint64(v), nil } diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go index 8ee0589..693f1d5 100644 --- a/internal/handler/index_add.go +++ b/internal/handler/index_add.go @@ -55,7 +55,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub }, } - if h.cache.ISWatchAbleIndex(address.Hex()) { + if h.cache.IsWatchAbleIndex(address.Hex()) { h.cache.Add(address.Hex()) } diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go index 521866c..2bd6ae6 100644 --- a/internal/handler/index_remove.go +++ b/internal/handler/index_remove.go @@ -54,7 +54,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, pub }, } - if h.cache.ISWatchAbleIndex(address.Hex()) { + if h.cache.IsWatchAbleIndex(address.Hex()) { h.cache.Remove(address.Hex()) } diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index ce39d1e..f13f224 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -84,6 +84,10 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err } }) + if err := s.db.SetUpperBound(blockNumber); err != nil { + return err + } + return nil } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 51b1c58..51751d7 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -20,6 +20,7 @@ type ( Chain *chain.Chain DB *db.DB Logg *slog.Logger + StartBlock int64 Stats *stats.Stats WebSocketEndpoint string } @@ -39,15 +40,26 @@ type ( func New(o SyncerOpts) (*Syncer, error) { latestBlock, err := o.Chain.GetLatestBlock(context.Background()) + if err != nil { + return nil, err + } + lowerBound, err := o.DB.GetLowerBound() if err != nil { return nil, err } if lowerBound == 0 { - if err := o.DB.SetLowerBound(latestBlock); err != nil { - return nil, err + if o.StartBlock > 0 { + if err := o.DB.SetLowerBound(uint64(o.StartBlock)); err != nil { + return nil, err + } + } else { + if err := o.DB.SetLowerBound(latestBlock); err != nil { + return nil, err + } } } + if err := o.DB.SetUpperBound(latestBlock); err != nil { return nil, err } @@ -58,12 +70,13 @@ func New(o SyncerOpts) (*Syncer, error) { } return &Syncer{ - blockWorker: o.BlockWorker, - chain: o.Chain, - db: o.DB, - ethClient: ethClient, - logg: o.Logg, - quit: make(chan struct{}), - stats: o.Stats, + blockWorker: o.BlockWorker, + blockProcessor: o.BlockProcessor, + chain: o.Chain, + db: o.DB, + ethClient: ethClient, + logg: o.Logg, + quit: make(chan struct{}), + stats: o.Stats, }, nil } diff --git a/internal/verifier/verifier.go b/internal/verifier/verifier.go deleted file mode 100644 index d493afd..0000000 --- a/internal/verifier/verifier.go +++ /dev/null @@ -1,133 +0,0 @@ -package verifier - -import ( - "context" - "fmt" - "log/slog" - "time" - - "github.com/gammazero/workerpool" - "github.com/grassrootseconomics/celo-tracker/internal/db" - "github.com/grassrootseconomics/celo-tracker/internal/processor" - "github.com/grassrootseconomics/celo-tracker/internal/stats" - "github.com/grassrootseconomics/celo-tracker/pkg/chain" -) - -type ( - VerifierOpts struct { - BlockWorker *workerpool.WorkerPool - BlockProcessor *processor.Processor - Chain *chain.Chain - DB *db.DB - Logg *slog.Logger - Stats *stats.Stats - } - - Verifier struct { - blockWorker *workerpool.WorkerPool - blockProcessor *processor.Processor - chain *chain.Chain - db *db.DB - logg *slog.Logger - quit chan struct{} - stats *stats.Stats - } -) - -const ( - blockBatchSize = 25 - verifierInterval = 1 * time.Minute -) - -func New(o VerifierOpts) *Verifier { - return &Verifier{ - blockWorker: o.BlockWorker, - chain: o.Chain, - db: o.DB, - logg: o.Logg, - quit: make(chan struct{}), - stats: o.Stats, - } -} - -func (v *Verifier) Start() { - ticker := time.NewTicker(verifierInterval) - - for { - select { - case <-v.quit: - v.logg.Info("janitor: shutdown signal received") - return - case <-ticker.C: - batch, err := v.getMissingBlocks() - if err != nil { - v.logg.Error("verifier error getting missing blocks", "err", err) - } - - if batch != nil { - v.logg.Info("verifier found missing block gap requeuing missing blocks") - blocks, err := v.chain.GetBlocks(context.Background(), batch) - if err != nil { - v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err) - } - - for _, block := range blocks { - v.blockWorker.Submit(func() { - if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil { - v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err) - } - }) - } - } else { - v.logg.Debug("verifier found no missing blocks running db compactor") - if err := v.db.Cleanup(); err != nil { - v.logg.Error("verifier compactor error", "error", err) - } - } - } - } -} - -func (v *Verifier) Stop() { - // TODO: Run with sync.Once - v.quit <- struct{}{} -} - -func (v *Verifier) getMissingBlocks() ([]uint64, error) { - lower, err := v.db.GetLowerBound() - if err != nil { - return nil, err - } - - upper, err := v.db.GetUpperBound() - if err != nil { - return nil, err - } - - missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1) - if err != nil { - return nil, err - } - missingBlocksCount := missingBlocks.Count() - - if missingBlocksCount > 0 { - - buffer := make([]uint, missingBlocksCount) - missingBlocks.NextSetMany(0, buffer) - - for i := 0; i < int(missingBlocksCount); i += blockBatchSize { - end := i + blockBatchSize - if end > int(missingBlocksCount) { - end = int(missingBlocksCount) - } - batch := make([]uint64, end-i) - for j := i; j < end; j++ { - batch[j-i] = uint64(buffer[j]) - } - - return batch, nil - } - } - - return nil, nil -}