diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index 03a2e4e..a1966c8 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -94,13 +94,14 @@ func main() { chainSyncer, err := syncer.New(syncer.SyncerOpts{ WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), + EnableHistorical: ko.Bool("chain.historical"), + StartBlock: uint64(ko.MustInt64("bootstrap.start_block")), BatchQueue: &batchQueue, BlocksQueue: &blocksQueue, Chain: chain, Logg: lo, Stats: stats, DB: db, - InitialLowerBound: uint64(ko.MustInt64("bootstrap.start_block")), }) if err != nil { lo.Error("could not initialize chain syncer", "error", err) @@ -114,7 +115,9 @@ func main() { cache, err := cache.New(cache.CacheOpts{ Logg: lo, Chain: chain, - Registries: ko.MustStrings("bootstrap.registries"), + Registries: ko.MustStrings("bootstrap.ge_registries"), + Blacklist: ko.MustStrings("bootstrap.blacklist"), + Watchlist: ko.MustStrings("bootstrap.watchlist"), }) if err != nil { lo.Error("could not initialize cache", "error", err) diff --git a/config.toml b/config.toml index c57a78e..5c4b8ff 100644 --- a/config.toml +++ b/config.toml @@ -6,14 +6,17 @@ address = ":5001" [chain] ws_endpoint = "wss://ws.celo.grassecon.net" -rpc_endpoint = "https://rpc.ankr.com/celo/bae2b7745f52c50974d7ecb1a7c23dc05d9ab5b68caf498a7c73f09a3e8bc04a" +rpc_endpoint = "https://1rpc.io/celo" testnet = false +realtime = true +historical = false +start_block = 24905000 [bootstrap] -start_block = 24905000 # https://software.grassecon.org/addresses -registries = [ +ge_registries = [ "0xd1FB944748aca327a1ba036B082993D9dd9Bfa0C", "0x0cc9f4fff962def35bb34a53691180b13e653030", ] +watchlist = [""] blacklist = [""] diff --git a/internal/cache/bootstrap.go b/internal/cache/bootstrap.go index 70df188..7c8ee5e 100644 --- a/internal/cache/bootstrap.go +++ b/internal/cache/bootstrap.go @@ -10,7 +10,7 @@ import ( "github.com/grassrootseconomics/w3-celo/module/eth" ) -func bootstrapAllGESmartContracts(ctx context.Context, registries []string, chain *chain.Chain, cache Cache) (WatchableIndex, error) { +func bootstrapGESmartContracts(ctx context.Context, registries []string, chain *chain.Chain, cache Cache) (WatchableIndex, error) { var ( watchableIndex = make(WatchableIndex) ) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index cef41e7..ba2e723 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -25,6 +25,7 @@ type ( CacheType string Registries []string Blacklist []string + Watchlist []string } WatchableIndex map[string]bool @@ -47,7 +48,7 @@ func New(o CacheOpts) (Cache, error) { cache = NewMapCache() } - watchableIndex, err := bootstrapAllGESmartContracts( + watchableIndex, err := bootstrapGESmartContracts( context.Background(), o.Registries, o.Chain, @@ -56,12 +57,16 @@ func New(o CacheOpts) (Cache, error) { if err != nil { return nil, err } + // We only watch the token and pool indexes + // If at some point we want to eatch the user index, this line should be removed cache.SetWatchableIndex(watchableIndex) + for _, address := range o.Watchlist { + cache.Add(address) + } for _, address := range o.Blacklist { cache.Remove(address) } - o.Logg.Debug("cache bootstrap complete", "cached_addresses", cache.Size()) return cache, nil diff --git a/internal/processor/block.go b/internal/processor/block.go index 63e8f75..4db1413 100644 --- a/internal/processor/block.go +++ b/internal/processor/block.go @@ -13,13 +13,11 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { blockNumber := block.NumberU64() txs, err := p.chain.GetTransactions(ctx, block) - p.logg.Debug("successfully fetched transactions", "txs", len(txs)) if err != nil { return err } receiptsResp, err := p.chain.GetReceipts(ctx, block) - p.logg.Debug("successfully fetched receipts", "receipts", len(txs)) if err != nil { return err } @@ -42,7 +40,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { if p.cache.Exists(txs[i].To().Hex()) { from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) if err != nil { - p.logg.Error("hanlder error", "handler_type", "revert", "error", err) + p.logg.Error("handler error", "handler_type", "revert", "error", err) } revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index e04a987..59ae45f 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -83,7 +83,7 @@ func (p *Processor) Start() { }) } else { time.Sleep(emptyQueueIdleTime) - p.logg.Debug("queue empty slept for 1 second") + p.logg.Debug("processor queue empty slept for 1 second") } } } diff --git a/internal/syncer/historical.go b/internal/syncer/historical.go index 88a9d67..482b91f 100644 --- a/internal/syncer/historical.go +++ b/internal/syncer/historical.go @@ -3,41 +3,31 @@ package syncer import ( "context" "fmt" - - "github.com/dgraph-io/badger/v4" + "time" ) const ( - blockBatchSize = 100 + blockBatchSize = 100 + emptyQueueIdelTime = 2 * time.Second ) func (s *Syncer) BootstrapHistoricalSyncer() error { - v, err := s.db.GetLowerBound() - if err != nil { - if err == badger.ErrKeyNotFound { - if err := s.db.SetLowerBound(s.initialLowerBound); err != nil { - return err - } - v = s.initialLowerBound - } else { - return err - } - } - - latestBlock, err := s.chain.GetLatestBlock(context.Background()) + lower, err := s.db.GetLowerBound() if err != nil { return err } - if err := s.db.SetUpperBound(latestBlock); err != nil { + + upper, err := s.db.GetUpperBound() + if err != nil { return err } - missingBlocks, err := s.db.GetMissingValuesBitSet(v, latestBlock) + missingBlocks, err := s.db.GetMissingValuesBitSet(lower, upper) if err != nil { return err } missingBlocksCount := missingBlocks.Count() - s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", v, "upper_bound", latestBlock) + s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", lower, "upper_bound", upper) buffer := make([]uint, missingBlocksCount) missingBlocks.NextSetMany(0, buffer) @@ -48,12 +38,12 @@ func (s *Syncer) BootstrapHistoricalSyncer() error { return nil } -func (s *Syncer) StartHistoricalSyncer(ctx context.Context) error { +func (s *Syncer) StartHistoricalSyncer() error { s.logg.Info("starting historical syncer", "batch_size", blockBatchSize) for { select { - case <-ctx.Done(): - s.logg.Info("historical syncer shutting down") + case <-s.quit: + s.logg.Info("historical syncer stopped") return nil default: if s.batchQueue.Len() > 0 { @@ -78,7 +68,15 @@ func (s *Syncer) StartHistoricalSyncer(ctx context.Context) error { for _, v := range blocks { s.blocksQueue.PushBack(v) } + } else { + time.Sleep(emptyQueueIdelTime) + s.logg.Debug("historical batcher queue empty slept for 2 seconds") } } } } + +func (s *Syncer) StopHistoricalSyncer() { + s.logg.Info("signaling historical syncer shutdown") + s.quit <- struct{}{} +} diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index 8c72ddf..b870c27 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -16,7 +16,7 @@ type ( ) const ( - resubscribeInterval = 15 * time.Second + resubscribeInterval = 5 * time.Second ) func (s *Syncer) StartRealtime() { @@ -55,7 +55,6 @@ func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (ce for { select { case header := <-newHeadersReceiver: - s.logg.Debug("received block", "block", header.Number.Uint64()) if err := fn(eventsCtx, header.Number.Uint64()); err != nil { if !errors.Is(err, context.Canceled) { s.logg.Error("realtime block queuer error", "error", err) @@ -79,7 +78,6 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err } } s.blocksQueue.PushFront(block) - s.logg.Debug("queued block", "block", blockNumber) return nil } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 213a391..99417fa 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -1,7 +1,7 @@ package syncer import ( - "errors" + "context" "log/slog" "github.com/celo-org/celo-blockchain" @@ -16,32 +16,48 @@ import ( type ( SyncerOpts struct { WebSocketEndpoint string + EnableHistorical bool + StartBlock uint64 BatchQueue *deque.Deque[uint64] BlocksQueue *deque.Deque[types.Block] Chain *chain.Chain Logg *slog.Logger Stats *stats.Stats DB *db.DB - InitialLowerBound uint64 } Syncer struct { - batchQueue *deque.Deque[uint64] - blocksQueue *deque.Deque[types.Block] - chain *chain.Chain - logg *slog.Logger - stats *stats.Stats - ethClient *ethclient.Client - db *db.DB - initialLowerBound uint64 - // + batchQueue *deque.Deque[uint64] + blocksQueue *deque.Deque[types.Block] + chain *chain.Chain + logg *slog.Logger + stats *stats.Stats + ethClient *ethclient.Client + db *db.DB + quit chan struct{} + startBlock uint64 realtimeSub celo.Subscription } ) func New(o SyncerOpts) (*Syncer, error) { - if o.InitialLowerBound == 0 { - return nil, errors.New("initial lower bound not set") + if o.EnableHistorical { + latestBlock, err := o.Chain.GetLatestBlock(context.Background()) + if err != nil { + return nil, err + } + + if o.StartBlock == 0 { + o.StartBlock = latestBlock + } + + if err := o.DB.SetLowerBound(o.StartBlock); err != nil { + return nil, err + } + + if err := o.DB.SetUpperBound(latestBlock); err != nil { + return nil, err + } } ethClient, err := ethclient.Dial(o.WebSocketEndpoint) @@ -50,13 +66,14 @@ func New(o SyncerOpts) (*Syncer, error) { } return &Syncer{ - batchQueue: o.BatchQueue, - blocksQueue: o.BlocksQueue, - chain: o.Chain, - logg: o.Logg, - stats: o.Stats, - ethClient: ethClient, - db: o.DB, - initialLowerBound: o.InitialLowerBound, + batchQueue: o.BatchQueue, + blocksQueue: o.BlocksQueue, + chain: o.Chain, + logg: o.Logg, + stats: o.Stats, + ethClient: ethClient, + db: o.DB, + quit: make(chan struct{}), + startBlock: o.StartBlock, }, nil }