refactor: perf, update libraries ci, and add docs

* update config to better defaults
* add docs, inline and md
* add context support throughout
* replace json with goccy/go-json for better decoding of large JSON
* update graphql fetcher: replace ioutil with io
* test runner script (until CI is ready)
* update CI build config
This commit is contained in:
Mohamed Sohail 2023-01-11 08:13:59 +00:00
parent 29e7de6816
commit df88d9df16
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
27 changed files with 204 additions and 144 deletions

1
.env.test.example Normal file
View File

@ -0,0 +1 @@
export TEST_GRAPHQL_ENDPOINT=

View File

@ -1,31 +0,0 @@
name: binary_release
on:
push:
tags:
- "v*"
jobs:
goreleaser:
runs-on: ubuntu-latest
container:
image: goreleaser/goreleaser-cross
environment: build
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.19
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
version: latest
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,4 +1,4 @@
name: docker_release name: build
on: on:
push: push:
@ -6,9 +6,31 @@ on:
- "v*" - "v*"
jobs: jobs:
goreleaser:
runs-on: ubuntu-latest
container:
image: goreleaser/goreleaser-cross
environment: build
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.19.3
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
version: latest
args: release --rm-dist
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
build-publish: build-publish:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: docker/login-action@v1.14.1 - uses: docker/login-action@v1.14.1

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
covprofile covprofile
.env .env
.env.test

10
README.md Normal file
View File

@ -0,0 +1,10 @@
# cic-chain-events
![GitHub release (latest by date)](https://img.shields.io/github/v/release/grassrootseconomics/cic-chain-events)
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/grassrootseconomics/cic-chain-events/build.yml)
> CIC Chain Events
## Documentation
- [API](docs/api.md)

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"strings" "strings"
"github.com/alitto/pond" "github.com/alitto/pond"
@ -60,7 +61,7 @@ func initQueries(queriesPath string) goyesql.Queries {
func initPgStore() (store.Store[pgx.Rows], error) { func initPgStore() (store.Store[pgx.Rows], error) {
pgStore, err := store.NewPostgresStore(store.PostgresStoreOpts{ pgStore, err := store.NewPostgresStore(store.PostgresStoreOpts{
DSN: ko.MustString("postgres.dsn"), DSN: ko.MustString("postgres.dsn"),
InitialLowerBound: uint64(ko.MustInt64("indexer.initial_lower_bound")), InitialLowerBound: uint64(ko.MustInt64("syncer.initial_lower_bound")),
Logg: lo, Logg: lo,
Queries: q, Queries: q,
}) })
@ -71,10 +72,10 @@ func initPgStore() (store.Store[pgx.Rows], error) {
return pgStore, nil return pgStore, nil
} }
func initWorkerPool() *pond.WorkerPool { func initWorkerPool(ctx context.Context) *pond.WorkerPool {
return pool.NewPool(pool.Opts{ return pool.NewPool(ctx, pool.Opts{
ConcurrencyFactor: ko.MustInt("indexer.concurrency"), ConcurrencyFactor: ko.MustInt("syncer.concurrency"),
PoolQueueSize: ko.MustInt("indexer.queue_size"), PoolQueueSize: ko.MustInt("syncer.queue_size"),
}) })
} }

View File

@ -48,11 +48,13 @@ func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop() defer stop()
workerPool := initWorkerPool(ctx)
pgStore, err := initPgStore() pgStore, err := initPgStore()
if err != nil { if err != nil {
lo.Fatal("error loading pg store", "error", err) lo.Fatal("main: critical error loading pg store", "error", err)
} }
workerPool := initWorkerPool()
graphqlFetcher := initFetcher() graphqlFetcher := initFetcher()
pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{ pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{
@ -60,7 +62,6 @@ func main() {
Filters: []filter.Filter{ Filters: []filter.Filter{
initAddressFilter(), initAddressFilter(),
initTransferFilter(), initTransferFilter(),
// initNoopFilter(),
}, },
Logg: lo, Logg: lo,
Store: pgStore, Store: pgStore,
@ -74,18 +75,18 @@ func main() {
WsEndpoint: ko.MustString("chain.ws_endpoint"), WsEndpoint: ko.MustString("chain.ws_endpoint"),
}) })
if err != nil { if err != nil {
lo.Fatal("error loading head syncer", "error", err) lo.Fatal("main: crticial error loading head syncer", "error", err)
} }
janitor := syncer.NewJanitor(syncer.JanitorOpts{ janitor := syncer.NewJanitor(syncer.JanitorOpts{
BatchSize: uint64(ko.MustInt64("indexer.batch_size")), BatchSize: uint64(ko.MustInt64("syncer.batch_size")),
HeadBlockLag: uint64(ko.MustInt64("indexer.head_block_lag")), HeadBlockLag: uint64(ko.MustInt64("syncer.head_block_lag")),
Logg: lo, Logg: lo,
Pipeline: pipeline, Pipeline: pipeline,
Pool: workerPool, Pool: workerPool,
Stats: syncerStats, Stats: syncerStats,
Store: pgStore, Store: pgStore,
SweepInterval: time.Second * time.Duration(ko.MustInt64("indexer.sweep_interval")), SweepInterval: time.Second * time.Duration(ko.MustInt64("syncer.sweep_interval")),
}) })
apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool, lo)) apiServer.GET("/stats", api.StatsHandler(syncerStats, workerPool, lo))
@ -94,7 +95,7 @@ func main() {
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := headSyncer.Start(ctx); err != nil { if err := headSyncer.Start(ctx); err != nil {
lo.Fatal("head syncer error", "error", err) lo.Fatal("main: critical error starting head syncer", "error", err)
} }
}() }()
@ -102,7 +103,7 @@ func main() {
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := janitor.Start(ctx); err != nil { if err := janitor.Start(ctx); err != nil {
lo.Fatal("janitor error", "error", err) lo.Fatal("main: critical error starting janitor", "error", err)
} }
}() }()
@ -112,19 +113,19 @@ func main() {
lo.Info("starting API server") lo.Info("starting API server")
if err := apiServer.Start(ko.MustString("api.address")); err != nil { if err := apiServer.Start(ko.MustString("api.address")); err != nil {
if strings.Contains(err.Error(), "Server closed") { if strings.Contains(err.Error(), "Server closed") {
lo.Info("shutting down server") lo.Info("main: shutting down server")
} else { } else {
lo.Fatal("could not start api server", "err", err) lo.Fatal("main: critical error shutting down server", "err", err)
} }
} }
}() }()
<-ctx.Done() <-ctx.Done()
lo.Info("graceful shutdown triggered")
workerPool.Stop() workerPool.Stop()
if err := apiServer.Shutdown(ctx); err != nil { if err := apiServer.Shutdown(ctx); err != nil {
lo.Error("could not gracefully shutdown api server", "err", err) lo.Error("main: could not gracefully shutdown api server", "err", err)
} }
wg.Wait() wg.Wait()

View File

@ -1,20 +1,31 @@
# Exposes Prometheus metrics
[metrics] [metrics]
go_process = true go_process = true
# API server
[api] [api]
address = ":8080" address = ":8080"
# Geth API endpoints
[chain] [chain]
graphql_endpoint = "https://rpc.celo.grassecon.net/graphql" graphql_endpoint = "https://rpc.celo.grassecon.net/graphql"
ws_endpoint = "wss://ws.celo.grassecon.net" ws_endpoint = "wss://ws.celo.grassecon.net"
[indexer] # Syncer configs
[syncer]
# Maximum number of missing blocks pushed into the worker queue every janitor sweep
batch_size = 200 batch_size = 200
# Number of goroutines assigned to the worker pool
concurrency = 3 concurrency = 3
# Prevents reprocessing head block already in queue
head_block_lag = 5 head_block_lag = 5
# Max idle time after which goroutine is returned back to the pool
idle_worker_timeout = 1 idle_worker_timeout = 1
# Syncer start block
initial_lower_bound = 17034445 initial_lower_bound = 17034445
queue_size = 1250 # Max blocks in worker queue awaiting processing
queue_size = 500
# Janitor sweep interval, should take into account concurrency and queue_size
sweep_interval = 10 sweep_interval = 10
[postgres] [postgres]

4
docs/api.md Normal file
View File

@ -0,0 +1,4 @@
## API
- `/stats` - Syncer stats (Ready after 1st successful janitor sweep).
- `/metrics` - Go process metrics (Prometheus format).

1
go.mod
View File

@ -23,6 +23,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-ole/go-ole v1.2.5 // indirect github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-stack/stack v1.8.0 // indirect github.com/go-stack/stack v1.8.0 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect

2
go.sum
View File

@ -178,6 +178,8 @@ github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/goccy/go-json v0.10.0 h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=

View File

@ -1,8 +1,10 @@
package fetch package fetch
import "context"
// Fetch defines a block fetcher that must return a full JSON response // Fetch defines a block fetcher that must return a full JSON response
type Fetch interface { type Fetch interface {
Block(block uint64) (fetchResponse FetchResponse, err error) Block(ctx context.Context, block uint64) (fetchResponse FetchResponse, err error)
} }
// Transaction reprsents a JSON object of all important mined transaction information // Transaction reprsents a JSON object of all important mined transaction information

View File

@ -2,15 +2,16 @@ package fetch
import ( import (
"bytes" "bytes"
"encoding/json" "context"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"time" "time"
"github.com/goccy/go-json"
) )
const ( const (
contentType = "application/json"
graphqlQuery = `{"query":"{block(number:%d){transactions{block{number,timestamp},hash,index,from{address},to{address},value,inputData,status,gasUsed}}}"}` graphqlQuery = `{"query":"{block(number:%d){transactions{block{number,timestamp},hash,index,from{address},to{address},value,inputData,status,gasUsed}}}"}`
) )
@ -26,31 +27,34 @@ type Graphql struct {
func NewGraphqlFetcher(o GraphqlOpts) Fetch { func NewGraphqlFetcher(o GraphqlOpts) Fetch {
return &Graphql{ return &Graphql{
httpClient: &http.Client{ httpClient: &http.Client{
Timeout: time.Second * 5, Timeout: time.Second * 2,
}, },
graphqlEndpoint: o.GraphqlEndpoint, graphqlEndpoint: o.GraphqlEndpoint,
} }
} }
func (f *Graphql) Block(blockNumber uint64) (FetchResponse, error) { func (f *Graphql) Block(ctx context.Context, blockNumber uint64) (FetchResponse, error) {
var ( var (
fetchResponse FetchResponse fetchResponse FetchResponse
) )
resp, err := f.httpClient.Post( req, err := http.NewRequestWithContext(ctx, http.MethodPost, f.graphqlEndpoint, bytes.NewBufferString(fmt.Sprintf(graphqlQuery, blockNumber)))
f.graphqlEndpoint,
contentType,
bytes.NewBufferString(fmt.Sprintf(graphqlQuery, blockNumber)),
)
if err != nil { if err != nil {
return FetchResponse{}, err return FetchResponse{}, err
} }
if resp.StatusCode != http.StatusOK { req.Header.Set("Content-Type", "application/json")
resp, err := f.httpClient.Do(req)
if err != nil {
return FetchResponse{}, err
}
if resp.StatusCode >= http.StatusBadRequest {
return FetchResponse{}, fmt.Errorf("error fetching block %s", resp.Status) return FetchResponse{}, fmt.Errorf("error fetching block %s", resp.Status)
} }
defer resp.Body.Close()
out, err := ioutil.ReadAll(resp.Body) out, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil { if err != nil {
return FetchResponse{}, nil return FetchResponse{}, nil
} }

View File

@ -1,13 +1,15 @@
package fetch package fetch
import ( import (
"context"
"os"
"testing" "testing"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
var ( var (
graphqlEndpoint = "https://rpc.celo.grassecon.net/graphql" graphqlEndpoint = os.Getenv("TEST_GRAPHQL_ENDPOINT")
) )
type itGraphqlTest struct { type itGraphqlTest struct {
@ -26,12 +28,12 @@ func (s *itGraphqlTest) SetupSuite() {
} }
func (s *itGraphqlTest) Test_E2E_Fetch_Existing_Block() { func (s *itGraphqlTest) Test_E2E_Fetch_Existing_Block() {
resp, err := s.graphqlFetcher.Block(14974600) resp, err := s.graphqlFetcher.Block(context.Background(), 14974600)
s.NoError(err) s.NoError(err)
s.Len(resp.Data.Block.Transactions, 3) s.Len(resp.Data.Block.Transactions, 3)
} }
func (s *itGraphqlTest) Test_E2E_Fetch_Non_Existing_Block() { func (s *itGraphqlTest) Test_E2E_Fetch_Non_Existing_Block() {
_, err := s.graphqlFetcher.Block(14974600000) _, err := s.graphqlFetcher.Block(context.Background(), 14974600000)
s.Error(err) s.Error(err)
} }

View File

@ -1,6 +1,8 @@
package filter package filter
import ( import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/fetch"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -23,7 +25,7 @@ func NewAddressFilter(o AddressFilterOpts) Filter {
} }
} }
func (f *AddressFilter) Execute(transaction fetch.Transaction) (bool, error) { func (f *AddressFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) {
if transaction.To.Address == cUSD { if transaction.To.Address == cUSD {
return true, nil return true, nil
} }

View File

@ -1,8 +1,12 @@
package filter package filter
import "github.com/grassrootseconomics/cic-chain-events/internal/fetch" import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch"
)
// Filter defines a read only filter which must return next as true/false or an error // Filter defines a read only filter which must return next as true/false or an error
type Filter interface { type Filter interface {
Execute(inputTransaction fetch.Transaction) (next bool, err error) Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error)
} }

View File

@ -1,6 +1,8 @@
package filter package filter
import ( import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/fetch"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -19,7 +21,7 @@ func NewNoopFilter(o NoopFilterOpts) Filter {
} }
} }
func (f *NoopFilter) Execute(transaction fetch.Transaction) (bool, error) { func (f *NoopFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) {
f.logg.Debug("noop filter", "block", transaction.Block.Number, "index", transaction.Index) f.logg.Debug("noop filter", "block", transaction.Block.Number, "index", transaction.Index)
return true, nil return true, nil
} }

View File

@ -1,6 +1,8 @@
package filter package filter
import ( import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/fetch"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -19,7 +21,7 @@ func NewTransferFilter(o TransferFilterOpts) Filter {
} }
} }
func (f *TransferFilter) Execute(transaction fetch.Transaction) (bool, error) { func (f *TransferFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) {
switch transaction.InputData[:10] { switch transaction.InputData[:10] {
case "0xa9059cbb": case "0xa9059cbb":
f.logg.Info("cUSD transfer", "block", transaction.Block.Number, "index", transaction.Index) f.logg.Info("cUSD transfer", "block", transaction.Block.Number, "index", transaction.Index)

View File

@ -1,6 +1,8 @@
package pipeline package pipeline
import ( import (
"context"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/fetch"
"github.com/grassrootseconomics/cic-chain-events/internal/filter" "github.com/grassrootseconomics/cic-chain-events/internal/filter"
"github.com/grassrootseconomics/cic-chain-events/internal/store" "github.com/grassrootseconomics/cic-chain-events/internal/store"
@ -31,19 +33,25 @@ func NewPipeline(o PipelineOpts) *Pipeline {
} }
} }
// Run is the task executor which fetches and processes a block and its transactions through the pipeline filters // Run is the task executor which runs in its own goroutine and does the following:
func (md *Pipeline) Run(blockNumber uint64) error { // 1. Fetches the block and all transactional data
fetchResp, err := md.fetch.Block(blockNumber) // 2. Passes the block through all filters
// 3. Commits the block to store as successfully processed
//
// Note:
// - Blocks are processed atomically, a failure inbetween will process the block from the start
// - Therefore, any side effect/event sink in the filter should support dedup
func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error {
md.logg.Debug("pipeline: processing block", "block", blockNumber)
fetchResp, err := md.fetch.Block(ctx, blockNumber)
if err != nil { if err != nil {
md.logg.Error("pipeline block fetch error", "error", err)
return err return err
} }
for _, tx := range fetchResp.Data.Block.Transactions { for _, tx := range fetchResp.Data.Block.Transactions {
for _, filter := range md.filters { for _, filter := range md.filters {
next, err := filter.Execute(tx) next, err := filter.Execute(ctx, tx)
if err != nil { if err != nil {
md.logg.Error("pipeline run error", "error", err)
return err return err
} }
if !next { if !next {
@ -52,10 +60,10 @@ func (md *Pipeline) Run(blockNumber uint64) error {
} }
} }
if err := md.store.CommitBlock(blockNumber); err != nil { if err := md.store.CommitBlock(ctx, blockNumber); err != nil {
return err return err
} }
md.logg.Debug("successfully commited block", "block", blockNumber) md.logg.Debug("pipeline: commited block", "block", blockNumber)
return nil return nil
} }

View File

@ -1,7 +1,9 @@
package pipeline package pipeline
import ( import (
"context"
"errors" "errors"
"os"
"testing" "testing"
"github.com/grassrootseconomics/cic-chain-events/internal/fetch" "github.com/grassrootseconomics/cic-chain-events/internal/fetch"
@ -11,7 +13,7 @@ import (
) )
var ( var (
graphqlEndpoint = "https://rpc.celo.grassecon.net/graphql" graphqlEndpoint = os.Getenv("TEST_GRAPHQL_ENDPOINT")
) )
type itPipelineTest struct { type itPipelineTest struct {
@ -31,7 +33,7 @@ func newErrorFilter() filter.Filter {
return &errorFilter{} return &errorFilter{}
} }
func (f *errorFilter) Execute(transaction fetch.Transaction) (bool, error) { func (f *errorFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) {
return false, errors.New("crash") return false, errors.New("crash")
} }
@ -41,7 +43,7 @@ func newEarlyExitFilter() filter.Filter {
return &earlyExitFilter{} return &earlyExitFilter{}
} }
func (f *earlyExitFilter) Execute(transaction fetch.Transaction) (bool, error) { func (f *earlyExitFilter) Execute(ctx context.Context, transaction fetch.Transaction) (bool, error) {
return false, nil return false, nil
} }
@ -91,46 +93,46 @@ func (s *itPipelineTest) SetupSuite() {
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_No_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_No_Err() {
err := s.normalPipeline.Run(14974600) err := s.normalPipeline.Run(context.Background(), 14974600)
s.NoError(err) s.NoError(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_No_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_No_Err() {
err := s.normalPipeline.Run(14974600000) err := s.normalPipeline.Run(context.Background(), 14974600000)
s.Error(err) s.Error(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_Early() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_Early() {
err := s.earlyExitPipeline.Run(14974600) err := s.earlyExitPipeline.Run(context.Background(), 14974600)
s.NoError(err) s.NoError(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_With_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Existing_Block_With_Err() {
err := s.errorPipeline.Run(14974600) err := s.errorPipeline.Run(context.Background(), 14974600)
s.Error(err) s.Error(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_With_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_With_Err() {
err := s.errorPipeline.Run(14974600000) err := s.errorPipeline.Run(context.Background(), 14974600000)
s.Error(err) s.Error(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_Early() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Non_Existing_Block_Early() {
err := s.earlyExitPipeline.Run(14974600000) err := s.earlyExitPipeline.Run(context.Background(), 14974600000)
s.Error(err) s.Error(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_No_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_No_Err() {
err := s.normalPipeline.Run(15370320) err := s.normalPipeline.Run(context.Background(), 15370320)
s.NoError(err) s.NoError(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_Err() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_With_Err() {
err := s.errorPipeline.Run(15370320) err := s.errorPipeline.Run(context.Background(), 15370320)
s.NoError(err) s.NoError(err)
} }
func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_Early() { func (s *itPipelineTest) Test_E2E_Pipeline_Run_On_Empty_Block_Early() {
err := s.earlyExitPipeline.Run(15370320) err := s.earlyExitPipeline.Run(context.Background(), 15370320)
s.NoError(err) s.NoError(err)
} }

View File

@ -1,6 +1,7 @@
package pool package pool
import ( import (
"context"
"time" "time"
"github.com/alitto/pond" "github.com/alitto/pond"
@ -11,11 +12,13 @@ type Opts struct {
PoolQueueSize int PoolQueueSize int
} }
func NewPool(o Opts) *pond.WorkerPool { // NewPool creates a fixed size (and buffered) go routine worker pool.
func NewPool(ctx context.Context, o Opts) *pond.WorkerPool {
return pond.New( return pond.New(
o.ConcurrencyFactor, o.ConcurrencyFactor,
o.PoolQueueSize, o.PoolQueueSize,
pond.MinWorkers(o.ConcurrencyFactor), pond.MinWorkers(o.ConcurrencyFactor),
pond.IdleTimeout(time.Second*1), pond.IdleTimeout(time.Second*1),
pond.Context(ctx),
) )
} }

View File

@ -60,14 +60,14 @@ func NewPostgresStore(o PostgresStoreOpts) (Store[pgx.Rows], error) {
return postgresStore, nil return postgresStore, nil
} }
func (s *PostgresStore) GetSearchBounds(batchSize uint64, headCursor uint64, headBlockLag uint64) (uint64, uint64, error) { func (s *PostgresStore) GetSearchBounds(ctx context.Context, batchSize uint64, headCursor uint64, headBlockLag uint64) (uint64, uint64, error) {
var ( var (
lowerBound uint64 lowerBound uint64
upperBound uint64 upperBound uint64
) )
if err := s.pool.QueryRow( if err := s.pool.QueryRow(
context.Background(), ctx,
s.queries.GetSearchBounds, s.queries.GetSearchBounds,
batchSize, batchSize,
headCursor, headCursor,
@ -80,8 +80,8 @@ func (s *PostgresStore) GetSearchBounds(batchSize uint64, headCursor uint64, hea
return lowerBound, upperBound, nil return lowerBound, upperBound, nil
} }
func (s *PostgresStore) GetMissingBlocks(lowerBound uint64, upperBound uint64) (pgx.Rows, error) { func (s *PostgresStore) GetMissingBlocks(ctx context.Context, lowerBound uint64, upperBound uint64) (pgx.Rows, error) {
rows, err := s.pool.Query(context.Background(), s.queries.GetMissingBlocks, lowerBound, upperBound) rows, err := s.pool.Query(ctx, s.queries.GetMissingBlocks, lowerBound, upperBound)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -89,8 +89,8 @@ func (s *PostgresStore) GetMissingBlocks(lowerBound uint64, upperBound uint64) (
return rows, nil return rows, nil
} }
func (s *PostgresStore) SetSearchLowerBound(newLowerBound uint64) error { func (s *PostgresStore) SetSearchLowerBound(ctx context.Context, newLowerBound uint64) error {
_, err := s.pool.Exec(context.Background(), s.queries.SetSearchLowerBound, newLowerBound) _, err := s.pool.Exec(ctx, s.queries.SetSearchLowerBound, newLowerBound)
if err != nil { if err != nil {
return err return err
} }
@ -98,8 +98,8 @@ func (s *PostgresStore) SetSearchLowerBound(newLowerBound uint64) error {
return nil return nil
} }
func (s *PostgresStore) CommitBlock(block uint64) error { func (s *PostgresStore) CommitBlock(ctx context.Context, block uint64) error {
_, err := s.pool.Exec(context.Background(), s.queries.CommitBlock, block) _, err := s.pool.Exec(ctx, s.queries.CommitBlock, block)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,10 +1,11 @@
package store package store
// Store defines indexer get and set queries. import "context"
// GetMissingBlocks returns a generic iterable.
// Store defines all relevant get/set queries against the implemented storage backend.
type Store[T any] interface { type Store[T any] interface {
GetSearchBounds(batchSize uint64, headCursor uint64, headBlockLag uint64) (lowerBound uint64, upperBound uint64, err error) GetSearchBounds(ctx context.Context, batchSize uint64, headCursor uint64, headBlockLag uint64) (lowerBound uint64, upperBound uint64, err error)
GetMissingBlocks(lowerBound uint64, upperBound uint64) (missingBlocksIterable T, err error) GetMissingBlocks(ctx context.Context, lowerBound uint64, upperBound uint64) (missingBlocksIterable T, err error)
SetSearchLowerBound(newLowerBound uint64) (err error) SetSearchLowerBound(ctx context.Context, newLowerBound uint64) (err error)
CommitBlock(block uint64) (err error) CommitBlock(ctx context.Context, block uint64) (err error)
} }

View File

@ -41,33 +41,34 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
}, nil }, nil
} }
// Start creates a websocket subscription and actively receives new blocks untill stopped
// or a critical error occurs.
func (hs *HeadSyncer) Start(ctx context.Context) error { func (hs *HeadSyncer) Start(ctx context.Context) error {
headerReceiver := make(chan *types.Header, 10) headerReceiver := make(chan *types.Header, 1)
sub, err := hs.ethClient.SubscribeNewHead(ctx, headerReceiver) sub, err := hs.ethClient.SubscribeNewHead(ctx, headerReceiver)
if err != nil { if err != nil {
return err return err
} }
defer sub.Unsubscribe()
for { for {
select { select {
case <-ctx.Done():
hs.logg.Info("head syncer: shutdown signal received")
return nil
case err := <-sub.Err():
return err
case header := <-headerReceiver: case header := <-headerReceiver:
block := header.Number.Uint64() blockNumber := header.Number.Uint64()
hs.logg.Debug("head syncer received new block", "block", block) hs.logg.Debug("head syncer: received new block", "block", blockNumber)
hs.stats.UpdateHeadCursor(block) hs.stats.UpdateHeadCursor(blockNumber)
hs.pool.Submit(func() { hs.pool.Submit(func() {
if err := hs.pipeline.Run(block); err != nil { if err := hs.pipeline.Run(context.Background(), blockNumber); err != nil {
hs.logg.Error("pipeline run error", "error", err) hs.logg.Error("head syncer: piepline run error", "error", err)
} }
}) })
case err := <-sub.Err():
hs.logg.Error("head syncer error", "error", err)
return err
case <-ctx.Done():
hs.logg.Debug("head syncer shutdown signnal received")
sub.Unsubscribe()
return nil
} }
} }
} }

View File

@ -51,32 +51,35 @@ func (j *Janitor) Start(ctx context.Context) error {
for { for {
select { select {
case <-ctx.Done():
j.logg.Info("janitor: shutdown signal received")
return nil
case <-timer.C: case <-timer.C:
j.logg.Debug("janitor starting sweep") j.logg.Debug("janitor: starting sweep")
if err := j.QueueMissingBlocks(); err != nil { if err := j.QueueMissingBlocks(context.Background()); err != nil {
j.logg.Error("janitor error", "error", err) j.logg.Error("janitor: queue missing blocks error", "error", err)
} }
timer.Reset(j.sweepInterval) timer.Reset(j.sweepInterval)
case <-ctx.Done():
j.logg.Debug("janitor shutdown signal received")
return nil
} }
} }
} }
func (j *Janitor) QueueMissingBlocks() error { // QueueMissingBlocks searches for missing block and queues the block for processing.
// It will run twice for a given search range and only after, raise the lower bound.
func (j *Janitor) QueueMissingBlocks(ctx context.Context) error {
if j.stats.GetHeadCursor() == 0 { if j.stats.GetHeadCursor() == 0 {
j.logg.Debug("janitor waiting for head synchronization") j.logg.Warn("janitor: (skipping) awaiting head synchronization")
return nil return nil
} }
if j.pool.WaitingTasks() >= j.batchSize { if j.pool.WaitingTasks() >= j.batchSize {
j.logg.Debug("janitor skipping potential queue pressure") j.logg.Warn("janitor: (skipping) avoiding queue pressure")
return nil return nil
} }
lowerBound, upperBound, err := j.store.GetSearchBounds( lowerBound, upperBound, err := j.store.GetSearchBounds(
ctx,
j.batchSize, j.batchSize,
j.stats.GetHeadCursor(), j.stats.GetHeadCursor(),
j.headBlockLag, j.headBlockLag,
@ -84,9 +87,8 @@ func (j *Janitor) QueueMissingBlocks() error {
if err != nil { if err != nil {
return err return err
} }
j.logg.Debug("janitor search bounds", "lower_bound", lowerBound, "upper_bound", upperBound)
rows, err := j.store.GetMissingBlocks(lowerBound, upperBound) rows, err := j.store.GetMissingBlocks(ctx, lowerBound, upperBound)
if err != nil { if err != nil {
return err return err
} }
@ -99,19 +101,18 @@ func (j *Janitor) QueueMissingBlocks() error {
} }
j.pool.Submit(func() { j.pool.Submit(func() {
if err := j.pipeline.Run(n); err != nil { if err := j.pipeline.Run(ctx, n); err != nil {
j.logg.Error("pipeline run error", "error", err) j.logg.Error("janitor: pipeline run error", "error", err)
} }
}) })
rowsProcessed++ rowsProcessed++
} }
j.logg.Debug("janitor missing block count", "count", rowsProcessed) j.logg.Debug("janitor: missing blocks count", "count", rowsProcessed)
if rowsProcessed == 0 { if rowsProcessed == 0 {
j.logg.Debug("no missing blocks, rasing lower bound") j.logg.Debug("janitor: rasing lower bound")
j.stats.UpdateLowerBound(upperBound) j.stats.UpdateLowerBound(upperBound)
j.store.SetSearchLowerBound(upperBound) j.store.SetSearchLowerBound(ctx, upperBound)
} }
if rows.Err() != nil { if rows.Err() != nil {

View File

@ -4,8 +4,7 @@ import (
"sync/atomic" "sync/atomic"
) )
// Stats synchronize syncer values across the head and janitor. // Stats synchronizes block cursors values across the head and janitor.
// could also be used to expose Prom gauges
type Stats struct { type Stats struct {
headCursor atomic.Uint64 headCursor atomic.Uint64
lowerBound atomic.Uint64 lowerBound atomic.Uint64

4
run_tests.sh Normal file
View File

@ -0,0 +1,4 @@
#! /bin/bash
set -e
source .env.test && go test -v -covermode atomic -coverprofile=covprofile ./internal/...