From df88d9df16ffacaeba3a08a549b1d259bfc8b12b Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 11 Jan 2023 08:13:59 +0000 Subject: [PATCH] refactor: perf, update libraries ci, and add docs * update config to better defaults * add docs, inline and md * add context support throughout * replace json with goccy/go-json for better decoding of large JSON * update graphql fetcher: replace ioutil with io * test runner script (until CI is ready) * update CI build config --- .env.test.example | 1 + .github/workflows/binary_release.yaml | 31 ---------------- .../{docker_release.yaml => build.yaml} | 30 +++++++++++++--- .gitignore | 1 + README.md | 10 ++++++ cmd/init.go | 11 +++--- cmd/main.go | 27 +++++++------- config.toml | 15 ++++++-- docs/api.md | 4 +++ go.mod | 1 + go.sum | 2 ++ internal/fetch/fetch.go | 4 ++- internal/fetch/graphql.go | 30 +++++++++------- internal/fetch/graphql_test.go | 8 +++-- internal/filter/address_filter.go | 4 ++- internal/filter/filter.go | 8 +++-- internal/filter/noop_filter.go | 4 ++- internal/filter/transfer_filter.go | 4 ++- internal/pipeline/pipeline.go | 24 ++++++++----- internal/pipeline/pipeline_test.go | 26 +++++++------- internal/pool/pool.go | 5 ++- internal/store/postgres.go | 16 ++++----- internal/store/store.go | 13 +++---- internal/syncer/head.go | 27 +++++++------- internal/syncer/janitor.go | 35 ++++++++++--------- internal/syncer/stats.go | 3 +- run_tests.sh | 4 +++ 27 files changed, 204 insertions(+), 144 deletions(-) create mode 100644 .env.test.example delete mode 100644 .github/workflows/binary_release.yaml rename .github/workflows/{docker_release.yaml => build.yaml} (52%) create mode 100644 README.md create mode 100644 docs/api.md create mode 100644 run_tests.sh diff --git a/.env.test.example b/.env.test.example new file mode 100644 index 0000000..0412928 --- /dev/null +++ b/.env.test.example @@ -0,0 +1 @@ +export TEST_GRAPHQL_ENDPOINT= \ No newline at end of file diff --git a/.github/workflows/binary_release.yaml b/.github/workflows/binary_release.yaml deleted file mode 100644 index 6a783c2..0000000 --- a/.github/workflows/binary_release.yaml +++ /dev/null @@ -1,31 +0,0 @@ -name: binary_release - -on: - push: - tags: - - "v*" - -jobs: - goreleaser: - runs-on: ubuntu-latest - container: - image: goreleaser/goreleaser-cross - environment: build - steps: - - name: Checkout - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - - name: Set up Go - uses: actions/setup-go@v2 - with: - go-version: 1.19 - - - name: Run GoReleaser - uses: goreleaser/goreleaser-action@v2 - with: - version: latest - args: release --rm-dist - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/docker_release.yaml b/.github/workflows/build.yaml similarity index 52% rename from .github/workflows/docker_release.yaml rename to .github/workflows/build.yaml index e25ffb0..f5a8e6f 100644 --- a/.github/workflows/docker_release.yaml +++ b/.github/workflows/build.yaml @@ -1,4 +1,4 @@ -name: docker_release +name: build on: push: @@ -6,9 +6,31 @@ on: - "v*" jobs: + goreleaser: + runs-on: ubuntu-latest + container: + image: goreleaser/goreleaser-cross + environment: build + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.19.3 + + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v2 + with: + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} build-publish: runs-on: ubuntu-latest - steps: - uses: actions/checkout@v2 - uses: docker/login-action@v1.14.1 @@ -26,5 +48,5 @@ jobs: docker-compose -f docker-compose.build.yaml push export TAG=latest docker-compose -f docker-compose.build.yaml build --progress plain - docker-compose -f docker-compose.build.yaml push - \ No newline at end of file + docker-compose -f docker-compose.build.yaml push + \ No newline at end of file diff --git a/.gitignore b/.gitignore index d13e3a1..d4f63f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ covprofile .env +.env.test diff --git a/README.md b/README.md new file mode 100644 index 0000000..b6a4366 --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# cic-chain-events + +![GitHub release (latest by date)](https://img.shields.io/github/v/release/grassrootseconomics/cic-chain-events) +![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/grassrootseconomics/cic-chain-events/build.yml) + +> CIC Chain Events + +## Documentation + +- [API](docs/api.md) diff --git a/cmd/init.go b/cmd/init.go index dc175d0..8096f2e 100644 --- a/cmd/init.go +++ b/cmd/init.go @@ -1,6 +1,7 @@ package main import ( + "context" "strings" "github.com/alitto/pond" @@ -60,7 +61,7 @@ func initQueries(queriesPath string) goyesql.Queries { func initPgStore() (store.Store[pgx.Rows], error) { pgStore, err := store.NewPostgresStore(store.PostgresStoreOpts{ DSN: ko.MustString("postgres.dsn"), - InitialLowerBound: uint64(ko.MustInt64("indexer.initial_lower_bound")), + InitialLowerBound: uint64(ko.MustInt64("syncer.initial_lower_bound")), Logg: lo, Queries: q, }) @@ -71,10 +72,10 @@ func initPgStore() (store.Store[pgx.Rows], error) { return pgStore, nil } -func initWorkerPool() *pond.WorkerPool { - return pool.NewPool(pool.Opts{ - ConcurrencyFactor: ko.MustInt("indexer.concurrency"), - PoolQueueSize: ko.MustInt("indexer.queue_size"), +func initWorkerPool(ctx context.Context) *pond.WorkerPool { + return pool.NewPool(ctx, pool.Opts{ + ConcurrencyFactor: ko.MustInt("syncer.concurrency"), + PoolQueueSize: ko.MustInt("syncer.queue_size"), }) } diff --git a/cmd/main.go b/cmd/main.go index 866677c..b679dc3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -48,11 +48,13 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() + workerPool := initWorkerPool(ctx) + pgStore, err := initPgStore() if err != nil { - lo.Fatal("error loading pg store", "error", err) + lo.Fatal("main: critical error loading pg store", "error", err) } - workerPool := initWorkerPool() + graphqlFetcher := initFetcher() pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ @@ -60,7 +62,6 @@ func main() { Filters: []filter.Filter{ initAddressFilter(), initTransferFilter(), - // initNoopFilter(), }, Logg: lo, Store: pgStore, @@ -74,18 +75,18 @@ func main() { WsEndpoint: ko.MustString("chain.ws_endpoint"), }) if err != nil { - lo.Fatal("error loading head syncer", "error", err) + lo.Fatal("main: crticial error loading head syncer", "error", err) } janitor := syncer.NewJanitor(syncer.JanitorOpts{ - BatchSize: uint64(ko.MustInt64("indexer.batch_size")), - HeadBlockLag: uint64(ko.MustInt64("indexer.head_block_lag")), + BatchSize: uint64(ko.MustInt64("syncer.batch_size")), + HeadBlockLag: uint64(ko.MustInt64("syncer.head_block_lag")), Logg: lo, Pipeline: pipeline, Pool: workerPool, Stats: syncerStats, Store: pgStore, - SweepInterval: time.Second * time.Duration(ko.MustInt64("indexer.sweep_interval")), + SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.sweep_interval")), }) apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool, lo)) @@ -94,7 +95,7 @@ func main() { go func() { defer wg.Done() if err := headSyncer.Start(ctx); err != nil { - lo.Fatal("head syncer error", "error", err) + lo.Fatal("main: critical error starting head syncer", "error", err) } }() @@ -102,7 +103,7 @@ func main() { go func() { defer wg.Done() if err := janitor.Start(ctx); err != nil { - lo.Fatal("janitor error", "error", err) + lo.Fatal("main: critical error starting janitor", "error", err) } }() @@ -112,19 +113,19 @@ func main() { lo.Info("starting API server") if err := apiServer.Start(ko.MustString("api.address")); err != nil { if strings.Contains(err.Error(), "Server closed") { - lo.Info("shutting down server") + lo.Info("main: shutting down server") } else { - lo.Fatal("could not start api server", "err", err) + lo.Fatal("main: critical error shutting down server", "err", err) } } }() <-ctx.Done() - lo.Info("graceful shutdown triggered") workerPool.Stop() + if err := apiServer.Shutdown(ctx); err != nil { - lo.Error("could not gracefully shutdown api server", "err", err) + lo.Error("main: could not gracefully shutdown api server", "err", err) } wg.Wait() diff --git a/config.toml b/config.toml index 8d9ad4e..37deb26 100644 --- a/config.toml +++ b/config.toml @@ -1,20 +1,31 @@ +# Exposes Prometheus metrics [metrics] go_process = true +# API server [api] address = ":8080" +# Geth API endpoints [chain] graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" ws_endpoint = "wss://ws.celo.grassecon.net" -[indexer] +# Syncer configs +[syncer] +# Maximum number of missing blocks pushed into the worker queue every janitor sweep batch_size = 200 +# Number of goroutines assigned to the worker pool concurrency = 3 +# Prevents reprocessing head block already in queue head_block_lag = 5 +# Max idle time after which goroutine is returned back to the pool idle_worker_timeout = 1 +# Syncer start block initial_lower_bound = 17034445 -queue_size = 1250 +# Max blocks in worker queue awaiting processing +queue_size = 500 +# Janitor sweep interval, should take into account concurrency and queue_size sweep_interval = 10 [postgres] diff --git a/docs/api.md b/docs/api.md new file mode 100644 index 0000000..828e482 --- /dev/null +++ b/docs/api.md @@ -0,0 +1,4 @@ +## API + +- `/stats` - Syncer stats (Ready after 1st successful janitor sweep). +- `/metrics` - Go process metrics (Prometheus format). \ No newline at end of file diff --git a/go.mod b/go.mod index 8bbe44c..816b124 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-stack/stack v1.8.0 // indirect + github.com/goccy/go-json v0.10.0 // indirect github.com/gorilla/websocket v1.4.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect diff --git a/go.sum b/go.sum index 7e808f5..efe6a8a 100644 --- a/go.sum +++ b/go.sum @@ -178,6 +178,8 @@ github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= +github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA= +github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go index 216189e..5eb6c50 100644 --- a/internal/fetch/fetch.go +++ b/internal/fetch/fetch.go @@ -1,8 +1,10 @@ package fetch +import "context" + // Fetch defines a block fetcher that must return a full JSON response type Fetch interface { - Block(block uint64) (fetchResponse FetchResponse, err error) + Block(ctx context.Context, block uint64) (fetchResponse FetchResponse, err error) } // Transaction reprsents a JSON object of all important mined transaction information diff --git a/internal/fetch/graphql.go b/internal/fetch/graphql.go index 8b22dca..29d28e8 100644 --- a/internal/fetch/graphql.go +++ b/internal/fetch/graphql.go @@ -2,15 +2,16 @@ package fetch import ( "bytes" - "encoding/json" + "context" "fmt" - "io/ioutil" + "io" "net/http" "time" + + "github.com/goccy/go-json" ) const ( - contentType = "application/json" graphqlQuery = `{"query":"{block(number:%d){transactions{block{number,timestamp},hash,index,from{address},to{address},value,inputData,status,gasUsed}}}"}` ) @@ -26,31 +27,34 @@ type Graphql struct { func NewGraphqlFetcher(o GraphqlOpts) Fetch { return &Graphql{ httpClient: &http.Client{ - Timeout: time.Second * 5, + Timeout: time.Second * 2, }, graphqlEndpoint: o.GraphqlEndpoint, } } -func (f *Graphql) Block(blockNumber uint64) (FetchResponse, error) { +func (f *Graphql) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) { var ( fetchResponse FetchResponse ) - resp, err := f.httpClient.Post( - f.graphqlEndpoint, - contentType, - bytes.NewBufferString(fmt.Sprintf(graphqlQuery, blockNumber)), - ) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.graphqlEndpoint, bytes.NewBufferString(fmt.Sprintf(graphqlQuery, blockNumber))) if err != nil { return FetchResponse{}, err } - if resp.StatusCode != http.StatusOK { + req.Header.Set("Content-Type", "application/json") + + resp, err := f.httpClient.Do(req) + if err != nil { + return FetchResponse{}, err + } + + if resp.StatusCode >= http.StatusBadRequest { return FetchResponse{}, fmt.Errorf("error fetching block %s", resp.Status) } - defer resp.Body.Close() - out, err := ioutil.ReadAll(resp.Body) + out, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() if err != nil { return FetchResponse{}, nil } diff --git a/internal/fetch/graphql_test.go b/internal/fetch/graphql_test.go index debed01..251734d 100644 --- a/internal/fetch/graphql_test.go +++ b/internal/fetch/graphql_test.go @@ -1,13 +1,15 @@ package fetch import ( + "context" + "os" "testing" "github.com/stretchr/testify/suite" ) var ( - graphqlEndpoint = "https://rpc.celo.grassecon.net/graphql" + graphqlEndpoint = os.Getenv("TEST_GRAPHQL_ENDPOINT") ) type itGraphqlTest struct { @@ -26,12 +28,12 @@ func (s *itGraphqlTest) SetupSuite() { } func (s *itGraphqlTest) Test_E2E_Fetch_Existing_Block() { - resp, err := s.graphqlFetcher.Block(14974600) + resp, err := s.graphqlFetcher.Block(context.Background(), 14974600) s.NoError(err) s.Len(resp.Data.Block.Transactions, 3) } func (s *itGraphqlTest) Test_E2E_Fetch_Non_Existing_Block() { - _, err := s.graphqlFetcher.Block(14974600000) + _, err := s.graphqlFetcher.Block(context.Background(), 14974600000) s.Error(err) } diff --git a/internal/filter/address_filter.go b/internal/filter/address_filter.go index 365336e..a3594f8 100644 --- a/internal/filter/address_filter.go +++ b/internal/filter/address_filter.go @@ -1,6 +1,8 @@ package filter import ( + "context" + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/zerodha/logf" ) @@ -23,7 +25,7 @@ func NewAddressFilter(o AddressFilterOpts) Filter { } } -func (f *AddressFilter) Execute(transaction fetch.Transaction) (bool, error) { +func (f *AddressFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) { if transaction.To.Address == cUSD { return true, nil } diff --git a/internal/filter/filter.go b/internal/filter/filter.go index 600e7c5..bf9ed2a 100644 --- a/internal/filter/filter.go +++ b/internal/filter/filter.go @@ -1,8 +1,12 @@ package filter -import "github.com/grassrootseconomics/cic-chain-events/internal/fetch" +import ( + "context" + + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" +) // Filter defines a read only filter which must return next as true/false or an error type Filter interface { - Execute(inputTransaction fetch.Transaction) (next bool, err error) + Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) } diff --git a/internal/filter/noop_filter.go b/internal/filter/noop_filter.go index fcfb5c1..75f213d 100644 --- a/internal/filter/noop_filter.go +++ b/internal/filter/noop_filter.go @@ -1,6 +1,8 @@ package filter import ( + "context" + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/zerodha/logf" ) @@ -19,7 +21,7 @@ func NewNoopFilter(o NoopFilterOpts) Filter { } } -func (f *NoopFilter) Execute(transaction fetch.Transaction) (bool, error) { +func (f *NoopFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) { f.logg.Debug("noop filter", "block", transaction.Block.Number, "index", transaction.Index) return true, nil } diff --git a/internal/filter/transfer_filter.go b/internal/filter/transfer_filter.go index a69c1b3..daf2857 100644 --- a/internal/filter/transfer_filter.go +++ b/internal/filter/transfer_filter.go @@ -1,6 +1,8 @@ package filter import ( + "context" + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/zerodha/logf" ) @@ -19,7 +21,7 @@ func NewTransferFilter(o TransferFilterOpts) Filter { } } -func (f *TransferFilter) Execute(transaction fetch.Transaction) (bool, error) { +func (f *TransferFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) { switch transaction.InputData[:10] { case "0xa9059cbb": f.logg.Info("cUSD transfer", "block", transaction.Block.Number, "index", transaction.Index) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 51eecd8..a1dbd70 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -1,6 +1,8 @@ package pipeline import ( + "context" + "github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/store" @@ -31,19 +33,25 @@ func NewPipeline(o PipelineOpts) *Pipeline { } } -// Run is the task executor which fetches and processes a block and its transactions through the pipeline filters -func (md *Pipeline) Run(blockNumber uint64) error { - fetchResp, err := md.fetch.Block(blockNumber) +// Run is the task executor which runs in its own goroutine and does the following: +// 1. Fetches the block and all transactional data +// 2. Passes the block through all filters +// 3. Commits the block to store as successfully processed +// +// Note: +// - Blocks are processed atomically, a failure inbetween will process the block from the start +// - Therefore, any side effect/event sink in the filter should support dedup +func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { + md.logg.Debug("pipeline: processing block", "block", blockNumber) + fetchResp, err := md.fetch.Block(ctx, blockNumber) if err != nil { - md.logg.Error("pipeline block fetch error", "error", err) return err } for _, tx := range fetchResp.Data.Block.Transactions { for _, filter := range md.filters { - next, err := filter.Execute(tx) + next, err := filter.Execute(ctx, tx) if err != nil { - md.logg.Error("pipeline run error", "error", err) return err } if !next { @@ -52,10 +60,10 @@ func (md *Pipeline) Run(blockNumber uint64) error { } } - if err := md.store.CommitBlock(blockNumber); err != nil { + if err := md.store.CommitBlock(ctx, blockNumber); err != nil { return err } - md.logg.Debug("successfully commited block", "block", blockNumber) + md.logg.Debug("pipeline: commited block", "block", blockNumber) return nil } diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index 737b245..87a06b7 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -1,7 +1,9 @@ package pipeline import ( + "context" "errors" + "os" "testing" "github.com/grassrootseconomics/cic-chain-events/internal/fetch" @@ -11,7 +13,7 @@ import ( ) var ( - graphqlEndpoint = "https://rpc.celo.grassecon.net/graphql" + graphqlEndpoint = os.Getenv("TEST_GRAPHQL_ENDPOINT") ) type itPipelineTest struct { @@ -31,7 +33,7 @@ func newErrorFilter() filter.Filter { return &errorFilter{} } -func (f *errorFilter) Execute(transaction fetch.Transaction) (bool, error) { +func (f *errorFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) { return false, errors.New("crash") } @@ -41,7 +43,7 @@ func newEarlyExitFilter() filter.Filter { return &earlyExitFilter{} } -func (f *earlyExitFilter) Execute(transaction fetch.Transaction) (bool, error) { +func (f *earlyExitFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) { return false, nil } @@ -91,46 +93,46 @@ func (s *itPipelineTest) SetupSuite() { } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_No_Err() { - err := s.normalPipeline.Run(14974600) + err := s.normalPipeline.Run(context.Background(), 14974600) s.NoError(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_No_Err() { - err := s.normalPipeline.Run(14974600000) + err := s.normalPipeline.Run(context.Background(), 14974600000) s.Error(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_Early() { - err := s.earlyExitPipeline.Run(14974600) + err := s.earlyExitPipeline.Run(context.Background(), 14974600) s.NoError(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_With_Err() { - err := s.errorPipeline.Run(14974600) + err := s.errorPipeline.Run(context.Background(), 14974600) s.Error(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_With_Err() { - err := s.errorPipeline.Run(14974600000) + err := s.errorPipeline.Run(context.Background(), 14974600000) s.Error(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_Early() { - err := s.earlyExitPipeline.Run(14974600000) + err := s.earlyExitPipeline.Run(context.Background(), 14974600000) s.Error(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_No_Err() { - err := s.normalPipeline.Run(15370320) + err := s.normalPipeline.Run(context.Background(), 15370320) s.NoError(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_Err() { - err := s.errorPipeline.Run(15370320) + err := s.errorPipeline.Run(context.Background(), 15370320) s.NoError(err) } func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_Early() { - err := s.earlyExitPipeline.Run(15370320) + err := s.earlyExitPipeline.Run(context.Background(), 15370320) s.NoError(err) } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 4aba90d..aa20b6c 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -1,6 +1,7 @@ package pool import ( + "context" "time" "github.com/alitto/pond" @@ -11,11 +12,13 @@ type Opts struct { PoolQueueSize int } -func NewPool(o Opts) *pond.WorkerPool { +// NewPool creates a fixed size (and buffered) go routine worker pool. +func NewPool(ctx context.Context, o Opts) *pond.WorkerPool { return pond.New( o.ConcurrencyFactor, o.PoolQueueSize, pond.MinWorkers(o.ConcurrencyFactor), pond.IdleTimeout(time.Second*1), + pond.Context(ctx), ) } diff --git a/internal/store/postgres.go b/internal/store/postgres.go index ec885a0..233cb0b 100644 --- a/internal/store/postgres.go +++ b/internal/store/postgres.go @@ -60,14 +60,14 @@ func NewPostgresStore(o PostgresStoreOpts) (Store[pgx.Rows], error) { return postgresStore, nil } -func (s *PostgresStore) GetSearchBounds(batchSize uint64, headCursor uint64, headBlockLag uint64) (uint64, uint64, error) { +func (s *PostgresStore) GetSearchBounds(ctx context.Context, batchSize uint64, headCursor uint64, headBlockLag uint64) (uint64, uint64, error) { var ( lowerBound uint64 upperBound uint64 ) if err := s.pool.QueryRow( - context.Background(), + ctx, s.queries.GetSearchBounds, batchSize, headCursor, @@ -80,8 +80,8 @@ func (s *PostgresStore) GetSearchBounds(batchSize uint64, headCursor uint64, hea return lowerBound, upperBound, nil } -func (s *PostgresStore) GetMissingBlocks(lowerBound uint64, upperBound uint64) (pgx.Rows, error) { - rows, err := s.pool.Query(context.Background(), s.queries.GetMissingBlocks, lowerBound, upperBound) +func (s *PostgresStore) GetMissingBlocks(ctx context.Context, lowerBound uint64, upperBound uint64) (pgx.Rows, error) { + rows, err := s.pool.Query(ctx, s.queries.GetMissingBlocks, lowerBound, upperBound) if err != nil { return nil, err } @@ -89,8 +89,8 @@ func (s *PostgresStore) GetMissingBlocks(lowerBound uint64, upperBound uint64) ( return rows, nil } -func (s *PostgresStore) SetSearchLowerBound(newLowerBound uint64) error { - _, err := s.pool.Exec(context.Background(), s.queries.SetSearchLowerBound, newLowerBound) +func (s *PostgresStore) SetSearchLowerBound(ctx context.Context, newLowerBound uint64) error { + _, err := s.pool.Exec(ctx, s.queries.SetSearchLowerBound, newLowerBound) if err != nil { return err } @@ -98,8 +98,8 @@ func (s *PostgresStore) SetSearchLowerBound(newLowerBound uint64) error { return nil } -func (s *PostgresStore) CommitBlock(block uint64) error { - _, err := s.pool.Exec(context.Background(), s.queries.CommitBlock, block) +func (s *PostgresStore) CommitBlock(ctx context.Context, block uint64) error { + _, err := s.pool.Exec(ctx, s.queries.CommitBlock, block) if err != nil { return err } diff --git a/internal/store/store.go b/internal/store/store.go index c48633a..a88d060 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -1,10 +1,11 @@ package store -// Store defines indexer get and set queries. -// GetMissingBlocks returns a generic iterable. +import "context" + +// Store defines all relevant get/set queries against the implemented storage backend. type Store[T any] interface { - GetSearchBounds(batchSize uint64, headCursor uint64, headBlockLag uint64) (lowerBound uint64, upperBound uint64, err error) - GetMissingBlocks(lowerBound uint64, upperBound uint64) (missingBlocksIterable T, err error) - SetSearchLowerBound(newLowerBound uint64) (err error) - CommitBlock(block uint64) (err error) + GetSearchBounds(ctx context.Context, batchSize uint64, headCursor uint64, headBlockLag uint64) (lowerBound uint64, upperBound uint64, err error) + GetMissingBlocks(ctx context.Context, lowerBound uint64, upperBound uint64) (missingBlocksIterable T, err error) + SetSearchLowerBound(ctx context.Context, newLowerBound uint64) (err error) + CommitBlock(ctx context.Context, block uint64) (err error) } diff --git a/internal/syncer/head.go b/internal/syncer/head.go index e4a7b0d..6c86be9 100644 --- a/internal/syncer/head.go +++ b/internal/syncer/head.go @@ -41,33 +41,34 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) { }, nil } +// Start creates a websocket subscription and actively receives new blocks untill stopped +// or a critical error occurs. func (hs *HeadSyncer) Start(ctx context.Context) error { - headerReceiver := make(chan *types.Header, 10) + headerReceiver := make(chan *types.Header, 1) sub, err := hs.ethClient.SubscribeNewHead(ctx, headerReceiver) if err != nil { return err } + defer sub.Unsubscribe() for { select { + case <-ctx.Done(): + hs.logg.Info("head syncer: shutdown signal received") + return nil + case err := <-sub.Err(): + return err case header := <-headerReceiver: - block := header.Number.Uint64() - hs.logg.Debug("head syncer received new block", "block", block) + blockNumber := header.Number.Uint64() + hs.logg.Debug("head syncer: received new block", "block", blockNumber) - hs.stats.UpdateHeadCursor(block) + hs.stats.UpdateHeadCursor(blockNumber) hs.pool.Submit(func() { - if err := hs.pipeline.Run(block); err != nil { - hs.logg.Error("pipeline run error", "error", err) + if err := hs.pipeline.Run(context.Background(), blockNumber); err != nil { + hs.logg.Error("head syncer: piepline run error", "error", err) } }) - case err := <-sub.Err(): - hs.logg.Error("head syncer error", "error", err) - return err - case <-ctx.Done(): - hs.logg.Debug("head syncer shutdown signnal received") - sub.Unsubscribe() - return nil } } } diff --git a/internal/syncer/janitor.go b/internal/syncer/janitor.go index e9a159a..b721bd7 100644 --- a/internal/syncer/janitor.go +++ b/internal/syncer/janitor.go @@ -51,32 +51,35 @@ func (j *Janitor) Start(ctx context.Context) error { for { select { + case <-ctx.Done(): + j.logg.Info("janitor: shutdown signal received") + return nil case <-timer.C: - j.logg.Debug("janitor starting sweep") - if err := j.QueueMissingBlocks(); err != nil { - j.logg.Error("janitor error", "error", err) + j.logg.Debug("janitor: starting sweep") + if err := j.QueueMissingBlocks(context.Background()); err != nil { + j.logg.Error("janitor: queue missing blocks error", "error", err) } timer.Reset(j.sweepInterval) - case <-ctx.Done(): - j.logg.Debug("janitor shutdown signal received") - return nil } } } -func (j *Janitor) QueueMissingBlocks() error { +// QueueMissingBlocks searches for missing block and queues the block for processing. +// It will run twice for a given search range and only after, raise the lower bound. +func (j *Janitor) QueueMissingBlocks(ctx context.Context) error { if j.stats.GetHeadCursor() == 0 { - j.logg.Debug("janitor waiting for head synchronization") + j.logg.Warn("janitor: (skipping) awaiting head synchronization") return nil } if j.pool.WaitingTasks() >= j.batchSize { - j.logg.Debug("janitor skipping potential queue pressure") + j.logg.Warn("janitor: (skipping) avoiding queue pressure") return nil } lowerBound, upperBound, err := j.store.GetSearchBounds( + ctx, j.batchSize, j.stats.GetHeadCursor(), j.headBlockLag, @@ -84,9 +87,8 @@ func (j *Janitor) QueueMissingBlocks() error { if err != nil { return err } - j.logg.Debug("janitor search bounds", "lower_bound", lowerBound, "upper_bound", upperBound) - rows, err := j.store.GetMissingBlocks(lowerBound, upperBound) + rows, err := j.store.GetMissingBlocks(ctx, lowerBound, upperBound) if err != nil { return err } @@ -99,19 +101,18 @@ func (j *Janitor) QueueMissingBlocks() error { } j.pool.Submit(func() { - if err := j.pipeline.Run(n); err != nil { - j.logg.Error("pipeline run error", "error", err) + if err := j.pipeline.Run(ctx, n); err != nil { + j.logg.Error("janitor: pipeline run error", "error", err) } }) - rowsProcessed++ } - j.logg.Debug("janitor missing block count", "count", rowsProcessed) + j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed) if rowsProcessed == 0 { - j.logg.Debug("no missing blocks, rasing lower bound") + j.logg.Debug("janitor: rasing lower bound") j.stats.UpdateLowerBound(upperBound) - j.store.SetSearchLowerBound(upperBound) + j.store.SetSearchLowerBound(ctx, upperBound) } if rows.Err() != nil { diff --git a/internal/syncer/stats.go b/internal/syncer/stats.go index 62c11b7..a86a458 100644 --- a/internal/syncer/stats.go +++ b/internal/syncer/stats.go @@ -4,8 +4,7 @@ import ( "sync/atomic" ) -// Stats synchronize syncer values across the head and janitor. -// could also be used to expose Prom gauges +// Stats synchronizes block cursors values across the head and janitor. type Stats struct { headCursor atomic.Uint64 lowerBound atomic.Uint64 diff --git a/run_tests.sh b/run_tests.sh new file mode 100644 index 0000000..869445b --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,4 @@ +#! /bin/bash +set -e + +source .env.test && go test -v -covermode atomic -coverprofile=covprofile ./internal/...