diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index 365ee6c..17f4618 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -96,6 +96,7 @@ func main() { WebSocketEndpoint: ko.MustString("chain.ws_endpoint"), EnableHistorical: ko.Bool("chain.historical"), StartBlock: uint64(ko.MustInt64("chain.start_block")), + BatchSize: ko.MustInt("chain.batch_size"), BatchQueue: &batchQueue, BlocksQueue: &blocksQueue, Chain: chain, diff --git a/config.toml b/config.toml index e31e656..1b5c1b0 100644 --- a/config.toml +++ b/config.toml @@ -10,7 +10,8 @@ rpc_endpoint = "https://celo.grassecon.net" testnet = false realtime = true historical = true -start_block = 25217425 +start_block = 25151040 +batch_size = 100 [bootstrap] # https://software.grassecon.org/addresses diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go index b589455..6bb6ee0 100644 --- a/internal/handler/faucet_give.go +++ b/internal/handler/faucet_give.go @@ -16,6 +16,10 @@ type ( } ) +const ( + faucetGiveEventName = "FAUCET_GIVE" +) + var ( faucetGiveTopicHash = w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c") faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)") @@ -23,6 +27,10 @@ var ( faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256") ) +func (h *FaucetGiveHandler) Name() string { + return faucetGiveEventName +} + func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == faucetGiveTopicHash { var ( @@ -41,7 +49,7 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitt Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "FAUCET_GIVE", + TxType: faucetGiveEventName, Payload: map[string]any{ "recipient": recipient.Hex(), "token": token.Hex(), @@ -76,7 +84,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "FAUCET_GIVE", + TxType: faucetGiveEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "recipient": to.Hex(), @@ -93,7 +101,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "FAUCET_GIVE", + TxType: faucetGiveEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "recipient": common.ZeroAddress.Hex(), diff --git a/internal/handler/handler.go b/internal/handler/handler.go index cbb03b1..f73bbd8 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -4,6 +4,7 @@ import ( "context" "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/emitter" ) @@ -40,10 +41,22 @@ type ( } ) -func New() []Handler { +func New(cache cache.Cache) []Handler { return []Handler{ &TokenTransferHandler{}, + &PoolSwapHandler{}, + &FaucetGiveHandler{}, + &PoolDepositHandler{}, &TokenMintHandler{}, &TokenBurnHandler{}, + &QuoterPriceHandler{}, + &OwnershipHandler{}, + &SealHandler{}, + &IndexAddHandler{ + cache: cache, + }, + &IndexRemoveHandler{ + cache: cache, + }, } } diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go index df8707f..9cfbc91 100644 --- a/internal/handler/index_add.go +++ b/internal/handler/index_add.go @@ -17,6 +17,10 @@ type ( } ) +const ( + indexAddEventName = "INDEX_ADD" +) + var ( indexAddTopicHash = w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f") indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)") @@ -24,6 +28,10 @@ var ( indexRegisterSig = w3.MustNewFunc("register(address)", "bool") ) +func (h *IndexAddHandler) Name() string { + return indexAddEventName +} + func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == indexAddTopicHash { var ( @@ -40,7 +48,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "INDEX_ADD", + TxType: indexAddEventName, Payload: map[string]any{ "address": address.Hex(), }, @@ -77,7 +85,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "INDEX_ADD", + TxType: indexAddEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "address": address.Hex(), @@ -100,7 +108,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "INDEX_ADD", + TxType: indexAddEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "address": address.Hex(), diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go index 2e5d173..891aab5 100644 --- a/internal/handler/index_remove.go +++ b/internal/handler/index_remove.go @@ -17,12 +17,20 @@ type ( } ) +const ( + indexRemoveEventName = "INDEX_REMOVE" +) + var ( indexRemoveTopicHash = w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb") indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)") indexRemoveSig = w3.MustNewFunc("remove(address)", "bool") ) +func (h *IndexRemoveHandler) Name() string { + return indexRemoveEventName +} + func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == indexRemoveTopicHash { var ( @@ -39,7 +47,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emit Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "INDEX_REMOVE", + TxType: indexRemoveEventName, Payload: map[string]any{ "address": address.Hex(), }, @@ -76,7 +84,7 @@ func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "INDEX_REMOVE", + TxType: indexRemoveEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "address": address.Hex(), diff --git a/internal/handler/ownership.go b/internal/handler/ownership.go index e0fc77c..af6ec7b 100644 --- a/internal/handler/ownership.go +++ b/internal/handler/ownership.go @@ -15,12 +15,20 @@ type ( } ) +const ( + ownershipEventName = "OWNERSHIP_TRANSFERRED" +) + var ( ownershipTopicHash = w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0") ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)") ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool") ) +func (h *OwnershipHandler) Name() string { + return ownershipEventName +} + func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == ownershipTopicHash { var ( @@ -38,7 +46,7 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitte Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "OWNERSHIP_TRANSFERRED", + TxType: ownershipEventName, Payload: map[string]any{ "previousOwner": previousOwner.Hex(), "newOwner": newOwner.Hex(), @@ -72,7 +80,7 @@ func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage, Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "ownership", + TxType: ownershipEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "previousOwner": msg.From, diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go index 6d11d5e..0525563 100644 --- a/internal/handler/pool_deposit.go +++ b/internal/handler/pool_deposit.go @@ -16,12 +16,20 @@ type ( } ) +const ( + poolDepositEventName = "POOL_DEPOSIT" +) + var ( poolDepositTopicHash = w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62") poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)") poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "") ) +func (h *PoolDepositHandler) Name() string { + return poolDepositEventName +} + func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == poolDepositTopicHash { var ( @@ -45,7 +53,7 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emit Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "POOL_DEPOSIT", + TxType: poolDepositEventName, Payload: map[string]any{ "initiator": initiator.Hex(), "tokenIn": tokenIn.Hex(), @@ -81,7 +89,7 @@ func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "POOL_DEPOSIT", + TxType: poolDepositEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "initiator": msg.From, diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go index f4aaf7c..21f41d3 100644 --- a/internal/handler/pool_swap.go +++ b/internal/handler/pool_swap.go @@ -16,12 +16,20 @@ type ( } ) +const ( + poolSwapEventName = "POOL_SWAP" +) + var ( poolSwapTopicHash = w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413") poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)") poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "") ) +func (h *PoolSwapHandler) Name() string { + return poolSwapEventName +} + func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == poolSwapTopicHash { var ( @@ -51,7 +59,7 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "POOL_SWAP", + TxType: poolSwapEventName, Payload: map[string]any{ "initiator": initiator.Hex(), "tokenIn": tokenIn.Hex(), @@ -91,7 +99,7 @@ func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, e Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "POOL_SWAP", + TxType: poolSwapEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "initiator": msg.From, diff --git a/internal/handler/quoter_price.go b/internal/handler/quoter_price.go index 6e07228..cf0d734 100644 --- a/internal/handler/quoter_price.go +++ b/internal/handler/quoter_price.go @@ -16,12 +16,20 @@ type ( } ) +const ( + quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED" +) + var ( quoterPriceTopicHash = w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1") quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)") quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256") ) +func (h *QuoterPriceHandler) Name() string { + return quoterPriceEventName +} + func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == quoterPriceTopicHash { var ( @@ -39,7 +47,7 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emit Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "QUOTER_PRICE_INDEX_UPDATED", + TxType: quoterPriceEventName, Payload: map[string]any{ "token": token.Hex(), "exchangeRate": exchangeRate.String(), @@ -74,7 +82,7 @@ func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "QUOTER_PRICE_INDEX_UPDATED", + TxType: quoterPriceEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "token": token.Hex(), diff --git a/internal/handler/seal.go b/internal/handler/seal.go index a76f03e..245106f 100644 --- a/internal/handler/seal.go +++ b/internal/handler/seal.go @@ -16,12 +16,20 @@ type ( } ) +const ( + sealEventName = "SEAL_STATE_CHANGE" +) + var ( sealTopicHash = w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e") sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)") sealToSig = w3.MustNewFunc("seal(uint256)", "uint256") ) +func (h *SealHandler) Name() string { + return sealEventName +} + func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { if msg.Log.Topics[0] == sealTopicHash { var ( @@ -39,7 +47,7 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emi Success: true, Timestamp: msg.BlockTime, TxHash: msg.Log.TxHash.Hex(), - TxType: "SEAL_STATE_CHANGE", + TxType: sealEventName, Payload: map[string]any{ "final": final, "sealState": sealState.String(), @@ -73,7 +81,7 @@ func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitt Success: false, Timestamp: msg.Timestamp, TxHash: msg.TxHash, - TxType: "SEAL_STATE_CHANGE", + TxType: sealEventName, Payload: map[string]any{ "revertReason": msg.RevertReason, "sealState": sealState.String(), diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 59ae45f..df1e0f2 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -55,7 +55,7 @@ func NewProcessor(o ProcessorOpts) *Processor { stats: o.Stats, db: o.DB, quit: make(chan struct{}), - handlers: handler.New(), + handlers: handler.New(o.Cache), cache: o.Cache, emitter: o.Emitter, } diff --git a/internal/syncer/historical.go b/internal/syncer/historical.go index 284e1aa..991bf4b 100644 --- a/internal/syncer/historical.go +++ b/internal/syncer/historical.go @@ -7,8 +7,7 @@ import ( ) const ( - blockBatchSize = 10 - emptyQueueIdelTime = 2 * time.Second + emptyQueueIdelTime = 1 * time.Second ) func (s *Syncer) BootstrapHistoricalSyncer() error { @@ -39,7 +38,7 @@ func (s *Syncer) BootstrapHistoricalSyncer() error { } func (s *Syncer) StartHistoricalSyncer() error { - s.logg.Info("starting historical syncer", "batch_size", blockBatchSize) + s.logg.Info("starting historical syncer", "batch_size", s.batchSize) for { select { case <-s.quit: @@ -51,8 +50,8 @@ func (s *Syncer) StartHistoricalSyncer() error { currentIterLen = s.batchQueue.Len() ) - if currentIterLen > blockBatchSize { - currentIterLen = blockBatchSize + if currentIterLen > s.batchSize { + currentIterLen = s.batchSize } batch := make([]uint64, currentIterLen) for i := 0; i < currentIterLen; i++ { diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 99417fa..0aad43c 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -20,6 +20,7 @@ type ( StartBlock uint64 BatchQueue *deque.Deque[uint64] BlocksQueue *deque.Deque[types.Block] + BatchSize int Chain *chain.Chain Logg *slog.Logger Stats *stats.Stats @@ -33,6 +34,7 @@ type ( logg *slog.Logger stats *stats.Stats ethClient *ethclient.Client + batchSize int db *db.DB quit chan struct{} startBlock uint64 @@ -73,6 +75,7 @@ func New(o SyncerOpts) (*Syncer, error) { stats: o.Stats, ethClient: ethClient, db: o.DB, + batchSize: o.BatchSize, quit: make(chan struct{}), startBlock: o.StartBlock, }, nil