refactor: update handler implemhtations, make batch size configurable

This commit is contained in:
Mohamed Sohail 2024-04-22 12:38:04 +08:00
parent 03b765a3bb
commit 8a4c56c56a
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
14 changed files with 107 additions and 26 deletions

View File

@ -96,6 +96,7 @@ func main() {
WebSocketEndpoint: ko.MustString("chain.ws_endpoint"),
EnableHistorical: ko.Bool("chain.historical"),
StartBlock: uint64(ko.MustInt64("chain.start_block")),
BatchSize: ko.MustInt("chain.batch_size"),
BatchQueue: &batchQueue,
BlocksQueue: &blocksQueue,
Chain: chain,

View File

@ -10,7 +10,8 @@ rpc_endpoint = "https://celo.grassecon.net"
testnet = false
realtime = true
historical = true
start_block = 25217425
start_block = 25151040
batch_size = 100
[bootstrap]
# https://software.grassecon.org/addresses

View File

@ -16,6 +16,10 @@ type (
}
)
const (
faucetGiveEventName = "FAUCET_GIVE"
)
var (
faucetGiveTopicHash = w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c")
faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)")
@ -23,6 +27,10 @@ var (
faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256")
)
func (h *FaucetGiveHandler) Name() string {
return faucetGiveEventName
}
func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == faucetGiveTopicHash {
var (
@ -41,7 +49,7 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitt
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "FAUCET_GIVE",
TxType: faucetGiveEventName,
Payload: map[string]any{
"recipient": recipient.Hex(),
"token": token.Hex(),
@ -76,7 +84,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage,
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "FAUCET_GIVE",
TxType: faucetGiveEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"recipient": to.Hex(),
@ -93,7 +101,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage,
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "FAUCET_GIVE",
TxType: faucetGiveEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"recipient": common.ZeroAddress.Hex(),

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/emitter"
)
@ -40,10 +41,22 @@ type (
}
)
func New() []Handler {
func New(cache cache.Cache) []Handler {
return []Handler{
&TokenTransferHandler{},
&PoolSwapHandler{},
&FaucetGiveHandler{},
&PoolDepositHandler{},
&TokenMintHandler{},
&TokenBurnHandler{},
&QuoterPriceHandler{},
&OwnershipHandler{},
&SealHandler{},
&IndexAddHandler{
cache: cache,
},
&IndexRemoveHandler{
cache: cache,
},
}
}

View File

@ -17,6 +17,10 @@ type (
}
)
const (
indexAddEventName = "INDEX_ADD"
)
var (
indexAddTopicHash = w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f")
indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)")
@ -24,6 +28,10 @@ var (
indexRegisterSig = w3.MustNewFunc("register(address)", "bool")
)
func (h *IndexAddHandler) Name() string {
return indexAddEventName
}
func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == indexAddTopicHash {
var (
@ -40,7 +48,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "INDEX_ADD",
TxType: indexAddEventName,
Payload: map[string]any{
"address": address.Hex(),
},
@ -77,7 +85,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "INDEX_ADD",
TxType: indexAddEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"address": address.Hex(),
@ -100,7 +108,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "INDEX_ADD",
TxType: indexAddEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"address": address.Hex(),

View File

@ -17,12 +17,20 @@ type (
}
)
const (
indexRemoveEventName = "INDEX_REMOVE"
)
var (
indexRemoveTopicHash = w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb")
indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)")
indexRemoveSig = w3.MustNewFunc("remove(address)", "bool")
)
func (h *IndexRemoveHandler) Name() string {
return indexRemoveEventName
}
func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == indexRemoveTopicHash {
var (
@ -39,7 +47,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emit
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "INDEX_REMOVE",
TxType: indexRemoveEventName,
Payload: map[string]any{
"address": address.Hex(),
},
@ -76,7 +84,7 @@ func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "INDEX_REMOVE",
TxType: indexRemoveEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"address": address.Hex(),

View File

@ -15,12 +15,20 @@ type (
}
)
const (
ownershipEventName = "OWNERSHIP_TRANSFERRED"
)
var (
ownershipTopicHash = w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0")
ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)")
ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool")
)
func (h *OwnershipHandler) Name() string {
return ownershipEventName
}
func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == ownershipTopicHash {
var (
@ -38,7 +46,7 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitte
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "OWNERSHIP_TRANSFERRED",
TxType: ownershipEventName,
Payload: map[string]any{
"previousOwner": previousOwner.Hex(),
"newOwner": newOwner.Hex(),
@ -72,7 +80,7 @@ func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage,
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "ownership",
TxType: ownershipEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"previousOwner": msg.From,

View File

@ -16,12 +16,20 @@ type (
}
)
const (
poolDepositEventName = "POOL_DEPOSIT"
)
var (
poolDepositTopicHash = w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62")
poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)")
poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "")
)
func (h *PoolDepositHandler) Name() string {
return poolDepositEventName
}
func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == poolDepositTopicHash {
var (
@ -45,7 +53,7 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emit
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "POOL_DEPOSIT",
TxType: poolDepositEventName,
Payload: map[string]any{
"initiator": initiator.Hex(),
"tokenIn": tokenIn.Hex(),
@ -81,7 +89,7 @@ func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "POOL_DEPOSIT",
TxType: poolDepositEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"initiator": msg.From,

View File

@ -16,12 +16,20 @@ type (
}
)
const (
poolSwapEventName = "POOL_SWAP"
)
var (
poolSwapTopicHash = w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413")
poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)")
poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "")
)
func (h *PoolSwapHandler) Name() string {
return poolSwapEventName
}
func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == poolSwapTopicHash {
var (
@ -51,7 +59,7 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "POOL_SWAP",
TxType: poolSwapEventName,
Payload: map[string]any{
"initiator": initiator.Hex(),
"tokenIn": tokenIn.Hex(),
@ -91,7 +99,7 @@ func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, e
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "POOL_SWAP",
TxType: poolSwapEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"initiator": msg.From,

View File

@ -16,12 +16,20 @@ type (
}
)
const (
quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED"
)
var (
quoterPriceTopicHash = w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1")
quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)")
quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256")
)
func (h *QuoterPriceHandler) Name() string {
return quoterPriceEventName
}
func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == quoterPriceTopicHash {
var (
@ -39,7 +47,7 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emit
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "QUOTER_PRICE_INDEX_UPDATED",
TxType: quoterPriceEventName,
Payload: map[string]any{
"token": token.Hex(),
"exchangeRate": exchangeRate.String(),
@ -74,7 +82,7 @@ func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "QUOTER_PRICE_INDEX_UPDATED",
TxType: quoterPriceEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"token": token.Hex(),

View File

@ -16,12 +16,20 @@ type (
}
)
const (
sealEventName = "SEAL_STATE_CHANGE"
)
var (
sealTopicHash = w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e")
sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)")
sealToSig = w3.MustNewFunc("seal(uint256)", "uint256")
)
func (h *SealHandler) Name() string {
return sealEventName
}
func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error {
if msg.Log.Topics[0] == sealTopicHash {
var (
@ -39,7 +47,7 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emi
Success: true,
Timestamp: msg.BlockTime,
TxHash: msg.Log.TxHash.Hex(),
TxType: "SEAL_STATE_CHANGE",
TxType: sealEventName,
Payload: map[string]any{
"final": final,
"sealState": sealState.String(),
@ -73,7 +81,7 @@ func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitt
Success: false,
Timestamp: msg.Timestamp,
TxHash: msg.TxHash,
TxType: "SEAL_STATE_CHANGE",
TxType: sealEventName,
Payload: map[string]any{
"revertReason": msg.RevertReason,
"sealState": sealState.String(),

View File

@ -55,7 +55,7 @@ func NewProcessor(o ProcessorOpts) *Processor {
stats: o.Stats,
db: o.DB,
quit: make(chan struct{}),
handlers: handler.New(),
handlers: handler.New(o.Cache),
cache: o.Cache,
emitter: o.Emitter,
}

View File

@ -7,8 +7,7 @@ import (
)
const (
blockBatchSize = 10
emptyQueueIdelTime = 2 * time.Second
emptyQueueIdelTime = 1 * time.Second
)
func (s *Syncer) BootstrapHistoricalSyncer() error {
@ -39,7 +38,7 @@ func (s *Syncer) BootstrapHistoricalSyncer() error {
}
func (s *Syncer) StartHistoricalSyncer() error {
s.logg.Info("starting historical syncer", "batch_size", blockBatchSize)
s.logg.Info("starting historical syncer", "batch_size", s.batchSize)
for {
select {
case <-s.quit:
@ -51,8 +50,8 @@ func (s *Syncer) StartHistoricalSyncer() error {
currentIterLen = s.batchQueue.Len()
)
if currentIterLen > blockBatchSize {
currentIterLen = blockBatchSize
if currentIterLen > s.batchSize {
currentIterLen = s.batchSize
}
batch := make([]uint64, currentIterLen)
for i := 0; i < currentIterLen; i++ {

View File

@ -20,6 +20,7 @@ type (
StartBlock uint64
BatchQueue *deque.Deque[uint64]
BlocksQueue *deque.Deque[types.Block]
BatchSize int
Chain *chain.Chain
Logg *slog.Logger
Stats *stats.Stats
@ -33,6 +34,7 @@ type (
logg *slog.Logger
stats *stats.Stats
ethClient *ethclient.Client
batchSize int
db *db.DB
quit chan struct{}
startBlock uint64
@ -73,6 +75,7 @@ func New(o SyncerOpts) (*Syncer, error) {
stats: o.Stats,
ethClient: ethClient,
db: o.DB,
batchSize: o.BatchSize,
quit: make(chan struct{}),
startBlock: o.StartBlock,
}, nil