From 22ffc224ca6eb814b4d8daf326dacc00604651cf Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 5 Sep 2024 14:24:32 +0300 Subject: [PATCH] fix: mint and burn func signatures, adjustable batch size for pool and backfill buffer --- cmd/service/main.go | 8 ++++--- config.toml | 1 + internal/backfill/backfill.go | 39 +++++++++++++++++---------------- internal/pool/pool.go | 5 ++--- internal/processor/processor.go | 4 ++-- internal/router/token_burn.go | 2 +- internal/router/token_mint.go | 2 +- 7 files changed, 32 insertions(+), 29 deletions(-) diff --git a/cmd/service/main.go b/cmd/service/main.go index 4064da2..643c4ed 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -108,6 +108,7 @@ func main() { poolOpts := pool.PoolOpts{ Logg: lo, WorkerCount: ko.Int("core.pool_size"), + BatchSize: ko.MustInt("core.batch_size"), Processor: blockProcessor, } if ko.Int("core.pool_size") <= 0 { @@ -136,9 +137,10 @@ func main() { } backfill := backfill.New(backfill.BackfillOpts{ - DB: db, - Logg: lo, - Pool: workerPool, + BatchSize: ko.MustInt("core.batch_size"), + DB: db, + Logg: lo, + Pool: workerPool, }) apiServer := &http.Server{ diff --git a/config.toml b/config.toml index a419d20..ea05463 100644 --- a/config.toml +++ b/config.toml @@ -11,6 +11,7 @@ db_type = "bolt" # Defaults to (nproc * 3) pool_size = 0 # If you are using an archive node, set this to true +batch_size = 100 [redis] dsn = "127.0.0.1:6379" diff --git a/internal/backfill/backfill.go b/internal/backfill/backfill.go index 1ccf8c6..30e16f3 100644 --- a/internal/backfill/backfill.go +++ b/internal/backfill/backfill.go @@ -11,34 +11,35 @@ import ( type ( BackfillOpts struct { - DB db.DB - Logg *slog.Logger - Pool *pool.Pool + BatchSize int + DB db.DB + Logg *slog.Logger + Pool *pool.Pool } Backfill struct { - db db.DB - logg *slog.Logger - pool *pool.Pool - stopCh chan struct{} - ticker *time.Ticker + batchSize int + db db.DB + logg *slog.Logger + pool *pool.Pool + stopCh chan struct{} + ticker *time.Ticker } ) const ( idleCheckInterval = 60 * time.Second busyCheckInterval = 1 * time.Second - - maxPoolSizePush = 100 ) func New(o BackfillOpts) *Backfill { return &Backfill{ - db: o.DB, - logg: o.Logg, - pool: o.Pool, - stopCh: make(chan struct{}), - ticker: time.NewTicker(idleCheckInterval), + batchSize: o.BatchSize, + db: o.DB, + logg: o.Logg, + pool: o.Pool, + stopCh: make(chan struct{}), + ticker: time.NewTicker(idleCheckInterval), } } @@ -90,13 +91,13 @@ func (b *Backfill) Run(skipLatest bool) error { if missingBlocksCount > 0 { b.logg.Info("found missing blocks", "skip_latest", skipLatest, "missing_blocks_count", missingBlocksCount) - buffer := make([]uint, maxPoolSizePush) + buffer := make([]uint, b.batchSize) j := uint(0) pushedCount := 0 j, buffer = missingBlocks.NextSetMany(j, buffer) for ; len(buffer) > 0; j, buffer = missingBlocks.NextSetMany(j, buffer) { for k := range buffer { - if pushedCount >= maxPoolSizePush { + if pushedCount >= b.batchSize { break } @@ -104,11 +105,11 @@ func (b *Backfill) Run(skipLatest bool) error { b.logg.Debug("pushed block from backfill", "block", buffer[k]) pushedCount++ } - j += 1 + j++ } } - if missingBlocksCount > maxPoolSizePush { + if missingBlocksCount > uint(b.batchSize) { b.ticker.Reset(busyCheckInterval) } else { b.ticker.Reset(idleCheckInterval) diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 9697794..2cf2ab5 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -11,6 +11,7 @@ import ( type ( PoolOpts struct { + BatchSize int Logg *slog.Logger WorkerCount int Processor *processor.Processor @@ -23,14 +24,12 @@ type ( } ) -const blocksBuffer = 100 - func New(o PoolOpts) *Pool { return &Pool{ logg: o.Logg, workerPool: pond.New( o.WorkerCount, - blocksBuffer, + o.BatchSize, pond.Strategy(pond.Balanced()), pond.PanicHandler(panicHandler(o.Logg)), ), diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 554dee7..4db3c51 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -68,7 +68,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error Timestamp: block.Time(), }, ); err != nil && !errors.Is(err, context.Canceled) { - return err + return fmt.Errorf("route success transaction error: tx %s: %v", receipt.TxHash.Hex(), err) } } } @@ -100,7 +100,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error TxHash: receipt.TxHash.Hex(), }, ); err != nil && !errors.Is(err, context.Canceled) { - return err + return fmt.Errorf("route revert transaction error: tx %s: %v", receipt.TxHash.Hex(), err) } } } diff --git a/internal/router/token_burn.go b/internal/router/token_burn.go index cfc458c..ee39c6e 100644 --- a/internal/router/token_burn.go +++ b/internal/router/token_burn.go @@ -17,7 +17,7 @@ var ( _ Handler = (*tokenBurnHandler)(nil) tokenBurnEvent = w3.MustNewEvent("Burn(address indexed _tokenBurner, uint256 _value)") - tokenBurnToSig = w3.MustNewFunc("Burn(uint256)", "bool") + tokenBurnToSig = w3.MustNewFunc("burn(uint256)", "bool") ) func (h *tokenBurnHandler) Name() string { diff --git a/internal/router/token_mint.go b/internal/router/token_mint.go index 8c2258d..ed11050 100644 --- a/internal/router/token_mint.go +++ b/internal/router/token_mint.go @@ -17,7 +17,7 @@ var ( _ Handler = (*tokenMintHandler)(nil) tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)") - tokenMintToSig = w3.MustNewFunc("MintTo(address, uint256)", "bool") + tokenMintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool") ) func (h *tokenMintHandler) Name() string {