diff --git a/cmd/tracker/init.go b/cmd/init.go similarity index 100% rename from cmd/tracker/init.go rename to cmd/init.go diff --git a/cmd/tracker/main.go b/cmd/main.go similarity index 98% rename from cmd/tracker/main.go rename to cmd/main.go index 6ddc2b3..8d43bf4 100644 --- a/cmd/tracker/main.go +++ b/cmd/main.go @@ -14,12 +14,12 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/ef-ds/deque/v2" "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" "github.com/knadh/koanf/v2" ) diff --git a/config.toml b/config.toml index 71f11d8..43429a9 100644 --- a/config.toml +++ b/config.toml @@ -9,9 +9,9 @@ ws_endpoint = "wss://ws.celo.grassecon.net" rpc_endpoint = "https://celo.grassecon.net" testnet = false realtime = true -historical = false -start_block = 25151040 -batch_size = 100 +historical = true +start_block = 25091040 +batch_size = 30 [bootstrap] # https://software.grassecon.org/addresses @@ -20,7 +20,7 @@ ge_registries = [ "0x0cc9f4fff962def35bb34a53691180b13e653030", ] watchlist = [""] -blacklist = [""] +blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"] [jetstream] enable = true diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 5c07b87..64a565b 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -1,6 +1,7 @@ services: nats: image: nats:2 + restart: unless-stopped command: -js -sd /tmp/nats/data -m 8222 ports: - 127.0.0.1:4222:4222 diff --git a/go.mod b/go.mod index 15d8cdd..c702f86 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,8 @@ require ( github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/gammazero/deque v0.2.0 // indirect + github.com/gammazero/workerpool v1.1.3 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-stack/stack v1.8.1 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect diff --git a/go.sum b/go.sum index 2e0e81c..d6530cb 100644 --- a/go.sum +++ b/go.sum @@ -167,6 +167,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA= +github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= +github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= diff --git a/internal/cache/bootstrap.go b/internal/cache/bootstrap.go index 7c8ee5e..b3840c9 100644 --- a/internal/cache/bootstrap.go +++ b/internal/cache/bootstrap.go @@ -4,7 +4,7 @@ import ( "context" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" "github.com/grassrootseconomics/celoutils/v2" "github.com/grassrootseconomics/w3-celo" "github.com/grassrootseconomics/w3-celo/module/eth" diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7a09385..16efe3c 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -4,7 +4,7 @@ import ( "context" "log/slog" - "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" "github.com/grassrootseconomics/w3-celo" ) diff --git a/internal/db/blocks.go b/internal/db/blocks.go deleted file mode 100644 index 74522cf..0000000 --- a/internal/db/blocks.go +++ /dev/null @@ -1,115 +0,0 @@ -package db - -import ( - "bytes" - - "github.com/bits-and-blooms/bitset" - bolt "go.etcd.io/bbolt" -) - -func (d *DB) SetLowerBound(v uint64) error { - return d.setUint64(lowerBoundKey, v) -} - -func (d *DB) GetLowerBound() (uint64, error) { - v, err := d.get(lowerBoundKey) - if err != nil { - return 0, err - } - return unmarshalUint64(v), nil -} - -func (d *DB) SetUpperBound(v uint64) error { - return d.setUint64(upperBoundKey, v) -} - -func (d *DB) GetUpperBound() (uint64, error) { - v, err := d.get(upperBoundKey) - if err != nil { - return 0, err - } - return unmarshalUint64(v), nil -} - -func (d *DB) SetValue(v uint64) error { - return d.setUint64AsKey(v) -} - -func (d *DB) GetMissingValuesBitSet(lowerBound uint64, upperBound uint64) (*bitset.BitSet, error) { - var ( - b bitset.BitSet - ) - - err := d.db.View(func(tx *bolt.Tx) error { - var ( - lowerRaw = marshalUint64(lowerBound) - upperRaw = marshalUint64(upperBound) - ) - - for i := lowerBound; i <= upperBound; i++ { - b.Set(uint(i)) - } - - c := tx.Bucket([]byte("blocks")).Cursor() - - for k, _ := c.Seek(lowerRaw); k != nil && bytes.Compare(k, upperRaw) <= 0; k, _ = c.Next() { - b.Clear(uint(unmarshalUint64(k))) - } - - return nil - }) - if err != nil { - return nil, err - } - - return &b, nil -} - -// func (d *DB) Cleanup() error { -// var ( -// safeToDeleteKeys [][]byte -// ) - -// err := d.db.View(func(txn *badger.Txn) error { -// lowerBound, err := d.get(lowerBoundKey) -// if err != nil { -// return err -// } - -// lowerBound = marshalUint64(unmarshalUint64(lowerBound) - 1) - -// opts := badger.DefaultIteratorOptions -// opts.PrefetchValues = false - -// it := txn.NewIterator(opts) -// defer it.Close() - -// for it.Rewind(); it.Valid(); it.Next() { -// k := it.Item().Key() - -// if bytes.Compare(k, lowerBound) > 0 { -// return nil -// } - -// safeToDeleteKeys = append(safeToDeleteKeys, it.Item().KeyCopy(nil)) -// } - -// return nil -// }) -// if err != nil { -// return err -// } - -// wb := d.db.NewWriteBatch() -// for _, k := range safeToDeleteKeys { -// if err := wb.Delete(k); err != nil { -// return nil -// } -// } - -// if err := wb.Flush(); err != nil { -// return err -// } - -// return nil -// } diff --git a/internal/db/db.go b/internal/db/db.go index 1926cd0..e9cbe11 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -1,10 +1,12 @@ package db import ( + "bytes" "encoding/binary" "fmt" "log/slog" + "github.com/bits-and-blooms/bitset" bolt "go.etcd.io/bbolt" ) @@ -20,7 +22,7 @@ type ( ) const ( - dbFolderName = "celo_tracker_blocks_db" + dbFolderName = "tracker_db" upperBoundKey = "upper" lowerBoundKey = "lower" @@ -100,3 +102,87 @@ func marshalUint64(v uint64) []byte { sortableOrder.PutUint64(b, v) return b } + +func (d *DB) SetLowerBound(v uint64) error { + return d.setUint64(lowerBoundKey, v) +} + +func (d *DB) GetLowerBound() (uint64, error) { + v, err := d.get(lowerBoundKey) + if err != nil { + return 0, err + } + return unmarshalUint64(v), nil +} + +func (d *DB) SetUpperBound(v uint64) error { + return d.setUint64(upperBoundKey, v) +} + +func (d *DB) GetUpperBound() (uint64, error) { + v, err := d.get(upperBoundKey) + if err != nil { + return 0, err + } + return unmarshalUint64(v), nil +} + +func (d *DB) SetValue(v uint64) error { + return d.setUint64AsKey(v) +} + +func (d *DB) GetMissingValuesBitSet(lowerBound uint64, upperBound uint64) (*bitset.BitSet, error) { + var ( + b bitset.BitSet + ) + + err := d.db.View(func(tx *bolt.Tx) error { + var ( + lowerRaw = marshalUint64(lowerBound) + upperRaw = marshalUint64(upperBound) + ) + + for i := lowerBound; i <= upperBound; i++ { + b.Set(uint(i)) + } + + c := tx.Bucket([]byte("blocks")).Cursor() + + for k, _ := c.Seek(lowerRaw); k != nil && bytes.Compare(k, upperRaw) <= 0; k, _ = c.Next() { + b.Clear(uint(unmarshalUint64(k))) + } + + return nil + }) + if err != nil { + return nil, err + } + + return &b, nil +} + +func (d *DB) Cleanup() error { + lowerBound, err := d.GetLowerBound() + if err != nil { + return err + } + target := marshalUint64(lowerBound - 1) + + err = d.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte("blocks")) + c := b.Cursor() + + for k, _ := c.First(); k != nil && bytes.Compare(k, target) <= 0; k, _ = c.Next() { + if err := b.Delete(k); err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go index e04371f..8f102f0 100644 --- a/internal/handler/faucet_give.go +++ b/internal/handler/faucet_give.go @@ -48,7 +48,7 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, pub p Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: faucetGiveEventName, Payload: map[string]any{ diff --git a/internal/handler/handler.go b/internal/handler/handler.go index b83d1f8..345df91 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -15,9 +15,11 @@ type ( HandleRevert(context.Context, RevertMessage, pub.Pub) error } + HandlerPipeline []Handler + LogMessage struct { Log *types.Log - BlockTime uint64 + Timestamp uint64 } RevertMessage struct { @@ -31,7 +33,7 @@ type ( } ) -func New(cache cache.Cache) []Handler { +func New(cache cache.Cache) HandlerPipeline { return []Handler{ &TokenTransferHandler{}, &PoolSwapHandler{}, diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go index 139e905..8ee0589 100644 --- a/internal/handler/index_add.go +++ b/internal/handler/index_add.go @@ -47,7 +47,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: indexAddEventName, Payload: map[string]any{ diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go index 090e5e6..521866c 100644 --- a/internal/handler/index_remove.go +++ b/internal/handler/index_remove.go @@ -46,7 +46,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: indexRemoveEventName, Payload: map[string]any{ diff --git a/internal/handler/ownership.go b/internal/handler/ownership.go index 2e23b46..1aed895 100644 --- a/internal/handler/ownership.go +++ b/internal/handler/ownership.go @@ -45,7 +45,7 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: ownershipEventName, Payload: map[string]any{ diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go index b66302a..3595468 100644 --- a/internal/handler/pool_deposit.go +++ b/internal/handler/pool_deposit.go @@ -52,7 +52,7 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: poolDepositEventName, Payload: map[string]any{ diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go index 87cae07..90673e8 100644 --- a/internal/handler/pool_swap.go +++ b/internal/handler/pool_swap.go @@ -58,7 +58,7 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: poolSwapEventName, Payload: map[string]any{ diff --git a/internal/handler/quoter_price.go b/internal/handler/quoter_price.go index 4385ce0..c654e4c 100644 --- a/internal/handler/quoter_price.go +++ b/internal/handler/quoter_price.go @@ -46,7 +46,7 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: quoterPriceEventName, Payload: map[string]any{ diff --git a/internal/handler/seal.go b/internal/handler/seal.go index b94d5a4..0311a0c 100644 --- a/internal/handler/seal.go +++ b/internal/handler/seal.go @@ -46,7 +46,7 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: sealEventName, Payload: map[string]any{ diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go index d5fa847..d262c9b 100644 --- a/internal/handler/token_burn.go +++ b/internal/handler/token_burn.go @@ -46,7 +46,7 @@ func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: burnEventName, Payload: map[string]any{ diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go index 0f9b62f..459755d 100644 --- a/internal/handler/token_mint.go +++ b/internal/handler/token_mint.go @@ -47,7 +47,7 @@ func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: mintEventName, Payload: map[string]any{ diff --git a/internal/handler/token_transfer.go b/internal/handler/token_transfer.go index f3e7a03..1da30c9 100644 --- a/internal/handler/token_transfer.go +++ b/internal/handler/token_transfer.go @@ -48,7 +48,7 @@ func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, pu Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, - Timestamp: msg.BlockTime, + Timestamp: msg.Timestamp, TxHash: msg.Log.TxHash.Hex(), TxType: transferEventName, Payload: map[string]any{ diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 6d05ca0..3177f48 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -1,28 +1,11 @@ package pool import ( - "log/slog" "runtime" - "runtime/debug" - "github.com/alitto/pond" + "github.com/gammazero/workerpool" ) -const ( - nProcFactor = 5 -) - -func NewPool(logg *slog.Logger) *pond.WorkerPool { - return pond.New( - runtime.NumCPU()*nProcFactor, - 1, - pond.Strategy(pond.Balanced()), - pond.PanicHandler(panicHandler(logg)), - ) -} - -func panicHandler(logg *slog.Logger) func(interface{}) { - return func(panic interface{}) { - logg.Error("block processor worker exited from a panic", "error", panic, "stack_trace", string(debug.Stack())) - } +func NewPool() *workerpool.WorkerPool { + return workerpool.New(runtime.NumCPU()) } diff --git a/internal/processor/block.go b/internal/processor/block.go deleted file mode 100644 index c48b7df..0000000 --- a/internal/processor/block.go +++ /dev/null @@ -1,94 +0,0 @@ -package processor - -import ( - "context" - "fmt" - - "github.com/celo-org/celo-blockchain/common" - "github.com/celo-org/celo-blockchain/core/types" - "github.com/grassrootseconomics/celo-tracker/internal/handler" -) - -func (p *Processor) processBlock(ctx context.Context, block types.Block) error { - blockNumber := block.NumberU64() - - txs, err := p.chain.GetTransactions(ctx, block) - if err != nil { - return err - } - - receiptsResp, err := p.chain.GetReceipts(ctx, block) - if err != nil { - return err - } - - for i, receipt := range receiptsResp { - if receipt.Status > 0 { - for _, log := range receipt.Logs { - if p.cache.Exists(log.Address.Hex()) { - msg := handler.LogMessage{ - Log: log, - BlockTime: block.Time(), - } - - if err := p.handleLogs(ctx, msg); err != nil { - return err - } - } - } - } else { - if txs[i].To() != nil && p.cache.Exists(txs[i].To().Hex()) { - from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) - if err != nil { - return err - } - - revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) - if err != nil { - return err - } - - msg := handler.RevertMessage{ - From: from.Hex(), - RevertReason: revertReason, - InputData: common.Bytes2Hex(txs[i].Data()), - Block: blockNumber, - ContractAddress: txs[i].To().Hex(), - Timestamp: block.Time(), - TxHash: receipt.TxHash.Hex(), - } - - if err := p.handleRevert(ctx, msg); err != nil { - return err - } - } - } - } - - if err := p.db.SetValue(blockNumber); err != nil { - return err - } - p.logg.Debug("successfully processed block", "block", blockNumber) - - return nil -} - -func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error { - for _, handler := range p.handlers { - if err := handler.HandleLog(ctx, msg, p.pub); err != nil { - return fmt.Errorf("log handler: %s err: %v", handler.Name(), err) - } - } - - return nil -} - -func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error { - for _, handler := range p.handlers { - if err := handler.HandleRevert(ctx, msg, p.pub); err != nil { - return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err) - } - } - - return nil -} diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 2406075..0b0a614 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -2,94 +2,132 @@ package processor import ( "context" + "fmt" "log/slog" - "time" - "github.com/alitto/pond" + "github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/core/types" - "github.com/ef-ds/deque/v2" "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/handler" - "github.com/grassrootseconomics/celo-tracker/internal/pool" "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" ) type ( ProcessorOpts struct { - Chain *chain.Chain - BlocksQueue *deque.Deque[types.Block] - Logg *slog.Logger - Stats *stats.Stats - DB *db.DB - Cache cache.Cache - Pub pub.Pub + Cache cache.Cache + Chain *chain.Chain + DB *db.DB + Logg *slog.Logger + Pub pub.Pub + Stats *stats.Stats } Processor struct { - chain *chain.Chain - pool *pond.WorkerPool - blocksQueue *deque.Deque[types.Block] - logg *slog.Logger - stats *stats.Stats - db *db.DB - quit chan struct{} - handlers []handler.Handler - cache cache.Cache - pub pub.Pub + cache cache.Cache + chain *chain.Chain + db *db.DB + handlerPipeline handler.HandlerPipeline + logg *slog.Logger + pub pub.Pub + quit chan struct{} + stats *stats.Stats } ) -const ( - emptyQueueIdleTime = 1 * time.Second -) - func NewProcessor(o ProcessorOpts) *Processor { return &Processor{ - chain: o.Chain, - pool: pool.NewPool(o.Logg), - blocksQueue: o.BlocksQueue, - logg: o.Logg, - stats: o.Stats, - db: o.DB, - quit: make(chan struct{}), - handlers: handler.New(o.Cache), - cache: o.Cache, - pub: o.Pub, + cache: o.Cache, + chain: o.Chain, + db: o.DB, + handlerPipeline: handler.New(o.Cache), + logg: o.Logg, + pub: o.Pub, + quit: make(chan struct{}), + stats: o.Stats, } } -func (p *Processor) Start() { - p.logg.Info("processor started") - for { - select { - case <-p.quit: - p.logg.Info("processor stopped, draining workerpool queue") - p.pool.StopAndWait() - if err := p.db.Close(); err != nil { - p.logg.Info("error closing db", "error", err) - } - return - default: - if p.blocksQueue.Len() > 0 { - v, _ := p.blocksQueue.PopFront() - p.pool.Submit(func() { - p.logg.Debug("processing", "block", v.Number()) - if err := p.processBlock(context.Background(), v); err != nil { - p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) +func (p *Processor) ProcessBlock(ctx context.Context, block types.Block) error { + receiptsResp, err := p.chain.GetReceipts(ctx, block) + if err != nil { + return err + } + + for _, receipt := range receiptsResp { + if receipt.Status > 0 { + for _, log := range receipt.Logs { + if p.cache.Exists(log.Address.Hex()) { + msg := handler.LogMessage{ + Log: log, + Timestamp: block.Time(), } - }) - } else { - time.Sleep(emptyQueueIdleTime) - p.logg.Debug("processor queue empty slept for 1 second") + + if err := p.handleLogs(ctx, msg); err != nil { + return err + } + } + } + } else { + tx, err := p.chain.GetTransaction(ctx, receipt.TxHash) + if err != nil { + return err + } + + if tx.To() != nil && p.cache.Exists(tx.To().Hex()) { + from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), &tx) + if err != nil { + return err + } + + revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) + if err != nil { + return err + } + + msg := handler.RevertMessage{ + From: from.Hex(), + RevertReason: revertReason, + InputData: common.Bytes2Hex(tx.Data()), + Block: block.NumberU64(), + ContractAddress: tx.To().Hex(), + Timestamp: block.Time(), + TxHash: receipt.TxHash.Hex(), + } + + if err := p.handleRevert(ctx, msg); err != nil { + return err + } } } } + + if err := p.db.SetValue(block.NumberU64()); err != nil { + return err + } + p.logg.Debug("successfully processed block", "block", block.NumberU64()) + + return nil } -func (p *Processor) Stop() { - p.logg.Info("signaling processor shutdown") - p.quit <- struct{}{} +func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error { + for _, handler := range p.handlerPipeline { + if err := handler.HandleLog(ctx, msg, p.pub); err != nil { + return fmt.Errorf("log handler: %s err: %v", handler.Name(), err) + } + } + + return nil +} + +func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error { + for _, handler := range p.handlerPipeline { + if err := handler.HandleRevert(ctx, msg, p.pub); err != nil { + return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err) + } + } + + return nil } diff --git a/internal/pub/jetstream.go b/internal/pub/jetstream.go index 757e9d4..5e86a9e 100644 --- a/internal/pub/jetstream.go +++ b/internal/pub/jetstream.go @@ -19,7 +19,7 @@ type ( PersistDuration time.Duration } - JetStreamEmitter struct { + JetStreamPub struct { natsConn *nats.Conn jsCtx nats.JetStreamContext } @@ -60,19 +60,19 @@ func NewJetStreamPub(o JetStreamOpts) (Pub, error) { o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName) } - return &JetStreamEmitter{ + return &JetStreamPub{ natsConn: natsConn, jsCtx: js, }, nil } -func (p *JetStreamEmitter) Close() { +func (p *JetStreamPub) Close() { if p.natsConn != nil { p.natsConn.Close() } } -func (p *JetStreamEmitter) Send(_ context.Context, payload event.Event) error { +func (p *JetStreamPub) Send(_ context.Context, payload event.Event) error { data, err := payload.Serialize() if err != nil { return err diff --git a/internal/syncer/historical.go b/internal/syncer/historical.go deleted file mode 100644 index 105c51e..0000000 --- a/internal/syncer/historical.go +++ /dev/null @@ -1,83 +0,0 @@ -package syncer - -import ( - "context" - "fmt" - "time" -) - -const ( - emptyQueueIdelTime = 1 * time.Second -) - -func (s *Syncer) BootstrapHistoricalSyncer() error { - lower, err := s.db.GetLowerBound() - if err != nil { - return err - } - - upper, err := s.db.GetUpperBound() - if err != nil { - return err - } - - missingBlocks, err := s.db.GetMissingValuesBitSet(lower, upper) - if err != nil { - return err - } - missingBlocksCount := missingBlocks.Count() - s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", lower, "upper_bound", upper) - - buffer := make([]uint, missingBlocksCount) - missingBlocks.NextSetMany(0, buffer) - for _, v := range buffer { - s.batchQueue.PushFront(uint64(v)) - } - - return nil -} - -func (s *Syncer) StartHistoricalSyncer() error { - s.logg.Info("starting historical syncer", "batch_size", s.batchSize) - for { - select { - case <-s.quit: - s.logg.Info("historical syncer stopped") - return nil - default: - if s.batchQueue.Len() > 0 { - var ( - currentIterLen = s.batchQueue.Len() - ) - - if currentIterLen > s.batchSize { - currentIterLen = s.batchSize - } - batch := make([]uint64, currentIterLen) - for i := 0; i < currentIterLen; i++ { - v, _ := s.batchQueue.PopFront() - batch[i] = v - } - - blocks, err := s.chain.GetBlocks(context.Background(), batch) - if err != nil { - s.logg.Error("batch blocks fetcher error", "fetch_size", currentIterLen, "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err) - } - - for _, v := range blocks { - s.blocksQueue.PushBack(v) - } - } else { - time.Sleep(emptyQueueIdelTime) - s.logg.Debug("historical batcher queue empty slept for 2 seconds") - } - } - } -} - -func (s *Syncer) StopHistoricalSyncer() { - if s.historicalEnabled { - s.logg.Info("signaling historical syncer shutdown") - s.quit <- struct{}{} - } -} diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index d6d232e..ce39d1e 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -19,23 +19,23 @@ const ( resubscribeInterval = 2 * time.Second ) -func (s *Syncer) StartRealtime() { +func (s *Syncer) Start() { s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn()) } -func (s *Syncer) StopRealtime() { +func (s *Syncer) Stop() { if s.realtimeSub != nil { s.realtimeSub.Unsubscribe() } } func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) { - newHeadersReceiver := make(chan *types.Header, 10) + newHeadersReceiver := make(chan *types.Header, 1) sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) - s.logg.Info("realtime syncer connected to ws endpoint") if err != nil { return nil, err } + s.logg.Info("realtime syncer connected to ws endpoint") return event.NewSubscription(func(quit <-chan struct{}) error { eventsCtx, eventsCancel := context.WithCancel(context.Background()) @@ -77,7 +77,13 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err return fmt.Errorf("block %d error: %v", blockNumber, err) } } - s.blocksQueue.PushFront(block) + + s.blockWorker.Submit(func() { + if err := s.blockProcessor.ProcessBlock(context.Background(), block); err != nil { + s.logg.Error("block processor error", "source", "realtime", "block", blockNumber, "error", err) + } + }) + return nil } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index ff45544..51b1c58 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -5,63 +5,52 @@ import ( "log/slog" "github.com/celo-org/celo-blockchain" - "github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/ethclient" - "github.com/ef-ds/deque/v2" - "github.com/grassrootseconomics/celo-tracker/internal/chain" + "github.com/gammazero/workerpool" "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" ) type ( SyncerOpts struct { - WebSocketEndpoint string - EnableHistorical bool - StartBlock uint64 - BatchQueue *deque.Deque[uint64] - BlocksQueue *deque.Deque[types.Block] - BatchSize int + BlockWorker *workerpool.WorkerPool + BlockProcessor *processor.Processor Chain *chain.Chain + DB *db.DB Logg *slog.Logger Stats *stats.Stats - DB *db.DB + WebSocketEndpoint string } Syncer struct { - batchQueue *deque.Deque[uint64] - blocksQueue *deque.Deque[types.Block] - chain *chain.Chain - logg *slog.Logger - stats *stats.Stats - ethClient *ethclient.Client - batchSize int - db *db.DB - quit chan struct{} - startBlock uint64 - realtimeSub celo.Subscription - historicalEnabled bool + blockWorker *workerpool.WorkerPool + blockProcessor *processor.Processor + chain *chain.Chain + db *db.DB + ethClient *ethclient.Client + logg *slog.Logger + quit chan struct{} + realtimeSub celo.Subscription + stats *stats.Stats } ) func New(o SyncerOpts) (*Syncer, error) { - if o.EnableHistorical { - latestBlock, err := o.Chain.GetLatestBlock(context.Background()) - if err != nil { - return nil, err - } - - if o.StartBlock == 0 { - o.StartBlock = latestBlock - } - - if err := o.DB.SetLowerBound(o.StartBlock); err != nil { - return nil, err - } - - if err := o.DB.SetUpperBound(latestBlock); err != nil { + latestBlock, err := o.Chain.GetLatestBlock(context.Background()) + lowerBound, err := o.DB.GetLowerBound() + if err != nil { + return nil, err + } + if lowerBound == 0 { + if err := o.DB.SetLowerBound(latestBlock); err != nil { return nil, err } } + if err := o.DB.SetUpperBound(latestBlock); err != nil { + return nil, err + } ethClient, err := ethclient.Dial(o.WebSocketEndpoint) if err != nil { @@ -69,16 +58,12 @@ func New(o SyncerOpts) (*Syncer, error) { } return &Syncer{ - batchQueue: o.BatchQueue, - blocksQueue: o.BlocksQueue, - chain: o.Chain, - logg: o.Logg, - stats: o.Stats, - ethClient: ethClient, - db: o.DB, - batchSize: o.BatchSize, - quit: make(chan struct{}), - startBlock: o.StartBlock, - historicalEnabled: o.EnableHistorical, + blockWorker: o.BlockWorker, + chain: o.Chain, + db: o.DB, + ethClient: ethClient, + logg: o.Logg, + quit: make(chan struct{}), + stats: o.Stats, }, nil } diff --git a/internal/verifier/verifier.go b/internal/verifier/verifier.go new file mode 100644 index 0000000..b710f11 --- /dev/null +++ b/internal/verifier/verifier.go @@ -0,0 +1,111 @@ +package verifier + +import ( + "context" + "fmt" + "log/slog" + + "github.com/gammazero/workerpool" + "github.com/grassrootseconomics/celo-tracker/internal/db" + "github.com/grassrootseconomics/celo-tracker/internal/processor" + "github.com/grassrootseconomics/celo-tracker/internal/stats" + "github.com/grassrootseconomics/celo-tracker/pkg/chain" +) + +type ( + VerifierOpts struct { + BlockWorker *workerpool.WorkerPool + BlockProcessor *processor.Processor + Chain *chain.Chain + DB *db.DB + Logg *slog.Logger + Stats *stats.Stats + } + + Verifier struct { + blockWorker *workerpool.WorkerPool + blockProcessor *processor.Processor + chain *chain.Chain + db *db.DB + logg *slog.Logger + quit chan struct{} + stats *stats.Stats + } +) + +const ( + blockBatchSize = 25 +) + +func New(o VerifierOpts) *Verifier { + return &Verifier{ + blockWorker: o.BlockWorker, + chain: o.Chain, + db: o.DB, + logg: o.Logg, + quit: make(chan struct{}), + stats: o.Stats, + } +} + +func (v *Verifier) Start() + +func (v *Verifier) getMissingBlocks() error { + lower, err := v.db.GetLowerBound() + if err != nil { + return err + } + + upper, err := v.db.GetUpperBound() + if err != nil { + return err + } + + missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1) + if err != nil { + return err + } + missingBlocksCount := missingBlocks.Count() + + if missingBlocksCount > 0 { + v.logg.Info("verifier found block gap", "missing_blocks_count", missingBlocksCount, "lower_bound", lower, "upper_bound", upper) + + buffer := make([]uint, missingBlocksCount) + missingBlocks.NextSetMany(0, buffer) + + for i := 0; i < int(missingBlocksCount); i += blockBatchSize { + end := i + blockBatchSize + if end > int(missingBlocksCount) { + end = int(missingBlocksCount) + } + batch := make([]uint64, end-i) + for j := i; j < end; j++ { + batch[j-i] = uint64(buffer[j]) + } + + v.processMissingBlocksBatch(batch) + } + } else { + v.logg.Debug("verifier running db compactor") + if err := v.db.Cleanup(); err != nil { + v.logg.Error("verifier compactor error", "error", err) + } + } + + return nil +} + +func (v *Verifier) processMissingBlocksBatch(batch []uint64) { + blocks, err := v.chain.GetBlocks(context.Background(), batch) + if err != nil { + v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err) + } + + for _, block := range blocks { + v.blockWorker.Submit(func() { + if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil { + v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err) + } + }) + } +} diff --git a/internal/chain/block.go b/pkg/chain/block.go similarity index 100% rename from internal/chain/block.go rename to pkg/chain/block.go diff --git a/internal/chain/chain.go b/pkg/chain/chain.go similarity index 100% rename from internal/chain/chain.go rename to pkg/chain/chain.go diff --git a/internal/chain/revert.go b/pkg/chain/revert.go similarity index 100% rename from internal/chain/revert.go rename to pkg/chain/revert.go diff --git a/internal/chain/token_index.go b/pkg/chain/token_index.go similarity index 100% rename from internal/chain/token_index.go rename to pkg/chain/token_index.go diff --git a/internal/chain/transaction.go b/pkg/chain/transaction.go similarity index 58% rename from internal/chain/transaction.go rename to pkg/chain/transaction.go index 73ef907..d9c2883 100644 --- a/internal/chain/transaction.go +++ b/pkg/chain/transaction.go @@ -2,27 +2,21 @@ package chain import ( "context" + "math/big" + "github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/w3-celo/module/eth" "github.com/grassrootseconomics/w3-celo/w3types" ) -func (c *Chain) GetTransactions(ctx context.Context, block types.Block) ([]types.Transaction, error) { - txCount := len(block.Transactions()) - - calls := make([]w3types.RPCCaller, txCount) - transactions := make([]types.Transaction, txCount) - - for i, tx := range block.Transactions() { - calls[i] = eth.Tx(tx.Hash()).Returns(&transactions[i]) +func (c *Chain) GetTransaction(ctx context.Context, txHash common.Hash) (types.Transaction, error) { + var transaction types.Transaction + if err := c.Provider.Client.CallCtx(ctx, eth.Tx(txHash).Returns(&transaction)); err != nil { + return transaction, err } - if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil { - return nil, err - } - - return transactions, nil + return transaction, nil } func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Receipt, error) { @@ -41,3 +35,7 @@ func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Rec return receipts, nil } + +func (c *Chain) GetBlockReceipts(ctx context.Context, blockNumber *big.Int) (types.Receipts, error) { + return nil, nil +}