major changes: add verifier and compactor, switch to gammazero workerpool

This commit is contained in:
Mohamed Sohail 2024-05-02 16:52:25 +08:00
parent 23ec001c62
commit 73ea42bfa5
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
35 changed files with 388 additions and 464 deletions

View File

@ -14,12 +14,12 @@ import (
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/ef-ds/deque/v2" "github.com/ef-ds/deque/v2"
"github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/processor" "github.com/grassrootseconomics/celo-tracker/internal/processor"
"github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/pub"
"github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/stats"
"github.com/grassrootseconomics/celo-tracker/internal/syncer" "github.com/grassrootseconomics/celo-tracker/internal/syncer"
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
) )

View File

@ -9,9 +9,9 @@ ws_endpoint = "wss://ws.celo.grassecon.net"
rpc_endpoint = "https://celo.grassecon.net" rpc_endpoint = "https://celo.grassecon.net"
testnet = false testnet = false
realtime = true realtime = true
historical = false historical = true
start_block = 25151040 start_block = 25091040
batch_size = 100 batch_size = 30
[bootstrap] [bootstrap]
# https://software.grassecon.org/addresses # https://software.grassecon.org/addresses
@ -20,7 +20,7 @@ ge_registries = [
"0x0cc9f4fff962def35bb34a53691180b13e653030", "0x0cc9f4fff962def35bb34a53691180b13e653030",
] ]
watchlist = [""] watchlist = [""]
blacklist = [""] blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"]
[jetstream] [jetstream]
enable = true enable = true

View File

@ -1,6 +1,7 @@
services: services:
nats: nats:
image: nats:2 image: nats:2
restart: unless-stopped
command: -js -sd /tmp/nats/data -m 8222 command: -js -sd /tmp/nats/data -m 8222
ports: ports:
- 127.0.0.1:4222:4222 - 127.0.0.1:4222:4222

2
go.mod
View File

@ -37,6 +37,8 @@ require (
github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gammazero/deque v0.2.0 // indirect
github.com/gammazero/workerpool v1.1.3 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-stack/stack v1.8.1 // indirect github.com/go-stack/stack v1.8.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect

4
go.sum
View File

@ -167,6 +167,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/pkg/chain"
"github.com/grassrootseconomics/celoutils/v2" "github.com/grassrootseconomics/celoutils/v2"
"github.com/grassrootseconomics/w3-celo" "github.com/grassrootseconomics/w3-celo"
"github.com/grassrootseconomics/w3-celo/module/eth" "github.com/grassrootseconomics/w3-celo/module/eth"

View File

@ -4,7 +4,7 @@ import (
"context" "context"
"log/slog" "log/slog"
"github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/pkg/chain"
"github.com/grassrootseconomics/w3-celo" "github.com/grassrootseconomics/w3-celo"
) )

View File

@ -1,115 +0,0 @@
package db
import (
"bytes"
"github.com/bits-and-blooms/bitset"
bolt "go.etcd.io/bbolt"
)
func (d *DB) SetLowerBound(v uint64) error {
return d.setUint64(lowerBoundKey, v)
}
func (d *DB) GetLowerBound() (uint64, error) {
v, err := d.get(lowerBoundKey)
if err != nil {
return 0, err
}
return unmarshalUint64(v), nil
}
func (d *DB) SetUpperBound(v uint64) error {
return d.setUint64(upperBoundKey, v)
}
func (d *DB) GetUpperBound() (uint64, error) {
v, err := d.get(upperBoundKey)
if err != nil {
return 0, err
}
return unmarshalUint64(v), nil
}
func (d *DB) SetValue(v uint64) error {
return d.setUint64AsKey(v)
}
func (d *DB) GetMissingValuesBitSet(lowerBound uint64, upperBound uint64) (*bitset.BitSet, error) {
var (
b bitset.BitSet
)
err := d.db.View(func(tx *bolt.Tx) error {
var (
lowerRaw = marshalUint64(lowerBound)
upperRaw = marshalUint64(upperBound)
)
for i := lowerBound; i <= upperBound; i++ {
b.Set(uint(i))
}
c := tx.Bucket([]byte("blocks")).Cursor()
for k, _ := c.Seek(lowerRaw); k != nil && bytes.Compare(k, upperRaw) <= 0; k, _ = c.Next() {
b.Clear(uint(unmarshalUint64(k)))
}
return nil
})
if err != nil {
return nil, err
}
return &b, nil
}
// func (d *DB) Cleanup() error {
// var (
// safeToDeleteKeys [][]byte
// )
// err := d.db.View(func(txn *badger.Txn) error {
// lowerBound, err := d.get(lowerBoundKey)
// if err != nil {
// return err
// }
// lowerBound = marshalUint64(unmarshalUint64(lowerBound) - 1)
// opts := badger.DefaultIteratorOptions
// opts.PrefetchValues = false
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// k := it.Item().Key()
// if bytes.Compare(k, lowerBound) > 0 {
// return nil
// }
// safeToDeleteKeys = append(safeToDeleteKeys, it.Item().KeyCopy(nil))
// }
// return nil
// })
// if err != nil {
// return err
// }
// wb := d.db.NewWriteBatch()
// for _, k := range safeToDeleteKeys {
// if err := wb.Delete(k); err != nil {
// return nil
// }
// }
// if err := wb.Flush(); err != nil {
// return err
// }
// return nil
// }

View File

@ -1,10 +1,12 @@
package db package db
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"log/slog" "log/slog"
"github.com/bits-and-blooms/bitset"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -20,7 +22,7 @@ type (
) )
const ( const (
dbFolderName = "celo_tracker_blocks_db" dbFolderName = "tracker_db"
upperBoundKey = "upper" upperBoundKey = "upper"
lowerBoundKey = "lower" lowerBoundKey = "lower"
@ -100,3 +102,87 @@ func marshalUint64(v uint64) []byte {
sortableOrder.PutUint64(b, v) sortableOrder.PutUint64(b, v)
return b return b
} }
func (d *DB) SetLowerBound(v uint64) error {
return d.setUint64(lowerBoundKey, v)
}
func (d *DB) GetLowerBound() (uint64, error) {
v, err := d.get(lowerBoundKey)
if err != nil {
return 0, err
}
return unmarshalUint64(v), nil
}
func (d *DB) SetUpperBound(v uint64) error {
return d.setUint64(upperBoundKey, v)
}
func (d *DB) GetUpperBound() (uint64, error) {
v, err := d.get(upperBoundKey)
if err != nil {
return 0, err
}
return unmarshalUint64(v), nil
}
func (d *DB) SetValue(v uint64) error {
return d.setUint64AsKey(v)
}
func (d *DB) GetMissingValuesBitSet(lowerBound uint64, upperBound uint64) (*bitset.BitSet, error) {
var (
b bitset.BitSet
)
err := d.db.View(func(tx *bolt.Tx) error {
var (
lowerRaw = marshalUint64(lowerBound)
upperRaw = marshalUint64(upperBound)
)
for i := lowerBound; i <= upperBound; i++ {
b.Set(uint(i))
}
c := tx.Bucket([]byte("blocks")).Cursor()
for k, _ := c.Seek(lowerRaw); k != nil && bytes.Compare(k, upperRaw) <= 0; k, _ = c.Next() {
b.Clear(uint(unmarshalUint64(k)))
}
return nil
})
if err != nil {
return nil, err
}
return &b, nil
}
func (d *DB) Cleanup() error {
lowerBound, err := d.GetLowerBound()
if err != nil {
return err
}
target := marshalUint64(lowerBound - 1)
err = d.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("blocks"))
c := b.Cursor()
for k, _ := c.First(); k != nil && bytes.Compare(k, target) <= 0; k, _ = c.Next() {
if err := b.Delete(k); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
return nil
}

View File

@ -48,7 +48,7 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, pub p
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: faucetGiveEventName, TxType: faucetGiveEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -15,9 +15,11 @@ type (
HandleRevert(context.Context, RevertMessage, pub.Pub) error HandleRevert(context.Context, RevertMessage, pub.Pub) error
} }
HandlerPipeline []Handler
LogMessage struct { LogMessage struct {
Log *types.Log Log *types.Log
BlockTime uint64 Timestamp uint64
} }
RevertMessage struct { RevertMessage struct {
@ -31,7 +33,7 @@ type (
} }
) )
func New(cache cache.Cache) []Handler { func New(cache cache.Cache) HandlerPipeline {
return []Handler{ return []Handler{
&TokenTransferHandler{}, &TokenTransferHandler{},
&PoolSwapHandler{}, &PoolSwapHandler{},

View File

@ -47,7 +47,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: indexAddEventName, TxType: indexAddEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -46,7 +46,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: indexRemoveEventName, TxType: indexRemoveEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -45,7 +45,7 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: ownershipEventName, TxType: ownershipEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -52,7 +52,7 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: poolDepositEventName, TxType: poolDepositEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -58,7 +58,7 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: poolSwapEventName, TxType: poolSwapEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -46,7 +46,7 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: quoterPriceEventName, TxType: quoterPriceEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -46,7 +46,7 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: sealEventName, TxType: sealEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -46,7 +46,7 @@ func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: burnEventName, TxType: burnEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -47,7 +47,7 @@ func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, pub pu
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: mintEventName, TxType: mintEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -48,7 +48,7 @@ func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, pu
Block: msg.Log.BlockNumber, Block: msg.Log.BlockNumber,
ContractAddress: msg.Log.Address.Hex(), ContractAddress: msg.Log.Address.Hex(),
Success: true, Success: true,
Timestamp: msg.BlockTime, Timestamp: msg.Timestamp,
TxHash: msg.Log.TxHash.Hex(), TxHash: msg.Log.TxHash.Hex(),
TxType: transferEventName, TxType: transferEventName,
Payload: map[string]any{ Payload: map[string]any{

View File

@ -1,28 +1,11 @@
package pool package pool
import ( import (
"log/slog"
"runtime" "runtime"
"runtime/debug"
"github.com/alitto/pond" "github.com/gammazero/workerpool"
) )
const ( func NewPool() *workerpool.WorkerPool {
nProcFactor = 5 return workerpool.New(runtime.NumCPU())
)
func NewPool(logg *slog.Logger) *pond.WorkerPool {
return pond.New(
runtime.NumCPU()*nProcFactor,
1,
pond.Strategy(pond.Balanced()),
pond.PanicHandler(panicHandler(logg)),
)
}
func panicHandler(logg *slog.Logger) func(interface{}) {
return func(panic interface{}) {
logg.Error("block processor worker exited from a panic", "error", panic, "stack_trace", string(debug.Stack()))
}
} }

View File

@ -1,94 +0,0 @@
package processor
import (
"context"
"fmt"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/internal/handler"
)
func (p *Processor) processBlock(ctx context.Context, block types.Block) error {
blockNumber := block.NumberU64()
txs, err := p.chain.GetTransactions(ctx, block)
if err != nil {
return err
}
receiptsResp, err := p.chain.GetReceipts(ctx, block)
if err != nil {
return err
}
for i, receipt := range receiptsResp {
if receipt.Status > 0 {
for _, log := range receipt.Logs {
if p.cache.Exists(log.Address.Hex()) {
msg := handler.LogMessage{
Log: log,
BlockTime: block.Time(),
}
if err := p.handleLogs(ctx, msg); err != nil {
return err
}
}
}
} else {
if txs[i].To() != nil && p.cache.Exists(txs[i].To().Hex()) {
from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i])
if err != nil {
return err
}
revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber)
if err != nil {
return err
}
msg := handler.RevertMessage{
From: from.Hex(),
RevertReason: revertReason,
InputData: common.Bytes2Hex(txs[i].Data()),
Block: blockNumber,
ContractAddress: txs[i].To().Hex(),
Timestamp: block.Time(),
TxHash: receipt.TxHash.Hex(),
}
if err := p.handleRevert(ctx, msg); err != nil {
return err
}
}
}
}
if err := p.db.SetValue(blockNumber); err != nil {
return err
}
p.logg.Debug("successfully processed block", "block", blockNumber)
return nil
}
func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error {
for _, handler := range p.handlers {
if err := handler.HandleLog(ctx, msg, p.pub); err != nil {
return fmt.Errorf("log handler: %s err: %v", handler.Name(), err)
}
}
return nil
}
func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error {
for _, handler := range p.handlers {
if err := handler.HandleRevert(ctx, msg, p.pub); err != nil {
return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err)
}
}
return nil
}

View File

@ -2,94 +2,132 @@ package processor
import ( import (
"context" "context"
"fmt"
"log/slog" "log/slog"
"time"
"github.com/alitto/pond" "github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/ef-ds/deque/v2"
"github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/handler" "github.com/grassrootseconomics/celo-tracker/internal/handler"
"github.com/grassrootseconomics/celo-tracker/internal/pool"
"github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/pub"
"github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/stats"
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
) )
type ( type (
ProcessorOpts struct { ProcessorOpts struct {
Chain *chain.Chain
BlocksQueue *deque.Deque[types.Block]
Logg *slog.Logger
Stats *stats.Stats
DB *db.DB
Cache cache.Cache Cache cache.Cache
Chain *chain.Chain
DB *db.DB
Logg *slog.Logger
Pub pub.Pub Pub pub.Pub
Stats *stats.Stats
} }
Processor struct { Processor struct {
chain *chain.Chain
pool *pond.WorkerPool
blocksQueue *deque.Deque[types.Block]
logg *slog.Logger
stats *stats.Stats
db *db.DB
quit chan struct{}
handlers []handler.Handler
cache cache.Cache cache cache.Cache
chain *chain.Chain
db *db.DB
handlerPipeline handler.HandlerPipeline
logg *slog.Logger
pub pub.Pub pub pub.Pub
quit chan struct{}
stats *stats.Stats
} }
) )
const (
emptyQueueIdleTime = 1 * time.Second
)
func NewProcessor(o ProcessorOpts) *Processor { func NewProcessor(o ProcessorOpts) *Processor {
return &Processor{ return &Processor{
chain: o.Chain,
pool: pool.NewPool(o.Logg),
blocksQueue: o.BlocksQueue,
logg: o.Logg,
stats: o.Stats,
db: o.DB,
quit: make(chan struct{}),
handlers: handler.New(o.Cache),
cache: o.Cache, cache: o.Cache,
chain: o.Chain,
db: o.DB,
handlerPipeline: handler.New(o.Cache),
logg: o.Logg,
pub: o.Pub, pub: o.Pub,
quit: make(chan struct{}),
stats: o.Stats,
} }
} }
func (p *Processor) Start() { func (p *Processor) ProcessBlock(ctx context.Context, block types.Block) error {
p.logg.Info("processor started") receiptsResp, err := p.chain.GetReceipts(ctx, block)
for { if err != nil {
select { return err
case <-p.quit: }
p.logg.Info("processor stopped, draining workerpool queue")
p.pool.StopAndWait() for _, receipt := range receiptsResp {
if err := p.db.Close(); err != nil { if receipt.Status > 0 {
p.logg.Info("error closing db", "error", err) for _, log := range receipt.Logs {
if p.cache.Exists(log.Address.Hex()) {
msg := handler.LogMessage{
Log: log,
Timestamp: block.Time(),
}
if err := p.handleLogs(ctx, msg); err != nil {
return err
}
} }
return
default:
if p.blocksQueue.Len() > 0 {
v, _ := p.blocksQueue.PopFront()
p.pool.Submit(func() {
p.logg.Debug("processing", "block", v.Number())
if err := p.processBlock(context.Background(), v); err != nil {
p.logg.Info("block processor error", "block", v.NumberU64(), "error", err)
} }
})
} else { } else {
time.Sleep(emptyQueueIdleTime) tx, err := p.chain.GetTransaction(ctx, receipt.TxHash)
p.logg.Debug("processor queue empty slept for 1 second") if err != nil {
return err
}
if tx.To() != nil && p.cache.Exists(tx.To().Hex()) {
from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), &tx)
if err != nil {
return err
}
revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber)
if err != nil {
return err
}
msg := handler.RevertMessage{
From: from.Hex(),
RevertReason: revertReason,
InputData: common.Bytes2Hex(tx.Data()),
Block: block.NumberU64(),
ContractAddress: tx.To().Hex(),
Timestamp: block.Time(),
TxHash: receipt.TxHash.Hex(),
}
if err := p.handleRevert(ctx, msg); err != nil {
return err
} }
} }
} }
} }
func (p *Processor) Stop() { if err := p.db.SetValue(block.NumberU64()); err != nil {
p.logg.Info("signaling processor shutdown") return err
p.quit <- struct{}{} }
p.logg.Debug("successfully processed block", "block", block.NumberU64())
return nil
}
func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error {
for _, handler := range p.handlerPipeline {
if err := handler.HandleLog(ctx, msg, p.pub); err != nil {
return fmt.Errorf("log handler: %s err: %v", handler.Name(), err)
}
}
return nil
}
func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error {
for _, handler := range p.handlerPipeline {
if err := handler.HandleRevert(ctx, msg, p.pub); err != nil {
return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err)
}
}
return nil
} }

View File

@ -19,7 +19,7 @@ type (
PersistDuration time.Duration PersistDuration time.Duration
} }
JetStreamEmitter struct { JetStreamPub struct {
natsConn *nats.Conn natsConn *nats.Conn
jsCtx nats.JetStreamContext jsCtx nats.JetStreamContext
} }
@ -60,19 +60,19 @@ func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName) o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName)
} }
return &JetStreamEmitter{ return &JetStreamPub{
natsConn: natsConn, natsConn: natsConn,
jsCtx: js, jsCtx: js,
}, nil }, nil
} }
func (p *JetStreamEmitter) Close() { func (p *JetStreamPub) Close() {
if p.natsConn != nil { if p.natsConn != nil {
p.natsConn.Close() p.natsConn.Close()
} }
} }
func (p *JetStreamEmitter) Send(_ context.Context, payload event.Event) error { func (p *JetStreamPub) Send(_ context.Context, payload event.Event) error {
data, err := payload.Serialize() data, err := payload.Serialize()
if err != nil { if err != nil {
return err return err

View File

@ -1,83 +0,0 @@
package syncer
import (
"context"
"fmt"
"time"
)
const (
emptyQueueIdelTime = 1 * time.Second
)
func (s *Syncer) BootstrapHistoricalSyncer() error {
lower, err := s.db.GetLowerBound()
if err != nil {
return err
}
upper, err := s.db.GetUpperBound()
if err != nil {
return err
}
missingBlocks, err := s.db.GetMissingValuesBitSet(lower, upper)
if err != nil {
return err
}
missingBlocksCount := missingBlocks.Count()
s.logg.Info("bootstrapping historical syncer", "missing_blocks", missingBlocksCount, "lower_bound", lower, "upper_bound", upper)
buffer := make([]uint, missingBlocksCount)
missingBlocks.NextSetMany(0, buffer)
for _, v := range buffer {
s.batchQueue.PushFront(uint64(v))
}
return nil
}
func (s *Syncer) StartHistoricalSyncer() error {
s.logg.Info("starting historical syncer", "batch_size", s.batchSize)
for {
select {
case <-s.quit:
s.logg.Info("historical syncer stopped")
return nil
default:
if s.batchQueue.Len() > 0 {
var (
currentIterLen = s.batchQueue.Len()
)
if currentIterLen > s.batchSize {
currentIterLen = s.batchSize
}
batch := make([]uint64, currentIterLen)
for i := 0; i < currentIterLen; i++ {
v, _ := s.batchQueue.PopFront()
batch[i] = v
}
blocks, err := s.chain.GetBlocks(context.Background(), batch)
if err != nil {
s.logg.Error("batch blocks fetcher error", "fetch_size", currentIterLen, "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err)
}
for _, v := range blocks {
s.blocksQueue.PushBack(v)
}
} else {
time.Sleep(emptyQueueIdelTime)
s.logg.Debug("historical batcher queue empty slept for 2 seconds")
}
}
}
}
func (s *Syncer) StopHistoricalSyncer() {
if s.historicalEnabled {
s.logg.Info("signaling historical syncer shutdown")
s.quit <- struct{}{}
}
}

View File

@ -19,23 +19,23 @@ const (
resubscribeInterval = 2 * time.Second resubscribeInterval = 2 * time.Second
) )
func (s *Syncer) StartRealtime() { func (s *Syncer) Start() {
s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn()) s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn())
} }
func (s *Syncer) StopRealtime() { func (s *Syncer) Stop() {
if s.realtimeSub != nil { if s.realtimeSub != nil {
s.realtimeSub.Unsubscribe() s.realtimeSub.Unsubscribe()
} }
} }
func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) { func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) {
newHeadersReceiver := make(chan *types.Header, 10) newHeadersReceiver := make(chan *types.Header, 1)
sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
s.logg.Info("realtime syncer connected to ws endpoint")
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.logg.Info("realtime syncer connected to ws endpoint")
return event.NewSubscription(func(quit <-chan struct{}) error { return event.NewSubscription(func(quit <-chan struct{}) error {
eventsCtx, eventsCancel := context.WithCancel(context.Background()) eventsCtx, eventsCancel := context.WithCancel(context.Background())
@ -77,7 +77,13 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err
return fmt.Errorf("block %d error: %v", blockNumber, err) return fmt.Errorf("block %d error: %v", blockNumber, err)
} }
} }
s.blocksQueue.PushFront(block)
s.blockWorker.Submit(func() {
if err := s.blockProcessor.ProcessBlock(context.Background(), block); err != nil {
s.logg.Error("block processor error", "source", "realtime", "block", blockNumber, "error", err)
}
})
return nil return nil
} }

View File

@ -5,63 +5,52 @@ import (
"log/slog" "log/slog"
"github.com/celo-org/celo-blockchain" "github.com/celo-org/celo-blockchain"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/ethclient" "github.com/celo-org/celo-blockchain/ethclient"
"github.com/ef-ds/deque/v2" "github.com/gammazero/workerpool"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/db" "github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
"github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/stats"
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
) )
type ( type (
SyncerOpts struct { SyncerOpts struct {
WebSocketEndpoint string BlockWorker *workerpool.WorkerPool
EnableHistorical bool BlockProcessor *processor.Processor
StartBlock uint64
BatchQueue *deque.Deque[uint64]
BlocksQueue *deque.Deque[types.Block]
BatchSize int
Chain *chain.Chain Chain *chain.Chain
DB *db.DB
Logg *slog.Logger Logg *slog.Logger
Stats *stats.Stats Stats *stats.Stats
DB *db.DB WebSocketEndpoint string
} }
Syncer struct { Syncer struct {
batchQueue *deque.Deque[uint64] blockWorker *workerpool.WorkerPool
blocksQueue *deque.Deque[types.Block] blockProcessor *processor.Processor
chain *chain.Chain chain *chain.Chain
logg *slog.Logger
stats *stats.Stats
ethClient *ethclient.Client
batchSize int
db *db.DB db *db.DB
ethClient *ethclient.Client
logg *slog.Logger
quit chan struct{} quit chan struct{}
startBlock uint64
realtimeSub celo.Subscription realtimeSub celo.Subscription
historicalEnabled bool stats *stats.Stats
} }
) )
func New(o SyncerOpts) (*Syncer, error) { func New(o SyncerOpts) (*Syncer, error) {
if o.EnableHistorical {
latestBlock, err := o.Chain.GetLatestBlock(context.Background()) latestBlock, err := o.Chain.GetLatestBlock(context.Background())
lowerBound, err := o.DB.GetLowerBound()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if lowerBound == 0 {
if o.StartBlock == 0 { if err := o.DB.SetLowerBound(latestBlock); err != nil {
o.StartBlock = latestBlock
}
if err := o.DB.SetLowerBound(o.StartBlock); err != nil {
return nil, err return nil, err
} }
}
if err := o.DB.SetUpperBound(latestBlock); err != nil { if err := o.DB.SetUpperBound(latestBlock); err != nil {
return nil, err return nil, err
} }
}
ethClient, err := ethclient.Dial(o.WebSocketEndpoint) ethClient, err := ethclient.Dial(o.WebSocketEndpoint)
if err != nil { if err != nil {
@ -69,16 +58,12 @@ func New(o SyncerOpts) (*Syncer, error) {
} }
return &Syncer{ return &Syncer{
batchQueue: o.BatchQueue, blockWorker: o.BlockWorker,
blocksQueue: o.BlocksQueue,
chain: o.Chain, chain: o.Chain,
logg: o.Logg,
stats: o.Stats,
ethClient: ethClient,
db: o.DB, db: o.DB,
batchSize: o.BatchSize, ethClient: ethClient,
logg: o.Logg,
quit: make(chan struct{}), quit: make(chan struct{}),
startBlock: o.StartBlock, stats: o.Stats,
historicalEnabled: o.EnableHistorical,
}, nil }, nil
} }

View File

@ -0,0 +1,111 @@
package verifier
import (
"context"
"fmt"
"log/slog"
"github.com/gammazero/workerpool"
"github.com/grassrootseconomics/celo-tracker/internal/db"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
"github.com/grassrootseconomics/celo-tracker/internal/stats"
"github.com/grassrootseconomics/celo-tracker/pkg/chain"
)
type (
VerifierOpts struct {
BlockWorker *workerpool.WorkerPool
BlockProcessor *processor.Processor
Chain *chain.Chain
DB *db.DB
Logg *slog.Logger
Stats *stats.Stats
}
Verifier struct {
blockWorker *workerpool.WorkerPool
blockProcessor *processor.Processor
chain *chain.Chain
db *db.DB
logg *slog.Logger
quit chan struct{}
stats *stats.Stats
}
)
const (
blockBatchSize = 25
)
func New(o VerifierOpts) *Verifier {
return &Verifier{
blockWorker: o.BlockWorker,
chain: o.Chain,
db: o.DB,
logg: o.Logg,
quit: make(chan struct{}),
stats: o.Stats,
}
}
func (v *Verifier) Start()
func (v *Verifier) getMissingBlocks() error {
lower, err := v.db.GetLowerBound()
if err != nil {
return err
}
upper, err := v.db.GetUpperBound()
if err != nil {
return err
}
missingBlocks, err := v.db.GetMissingValuesBitSet(lower, upper-1)
if err != nil {
return err
}
missingBlocksCount := missingBlocks.Count()
if missingBlocksCount > 0 {
v.logg.Info("verifier found block gap", "missing_blocks_count", missingBlocksCount, "lower_bound", lower, "upper_bound", upper)
buffer := make([]uint, missingBlocksCount)
missingBlocks.NextSetMany(0, buffer)
for i := 0; i < int(missingBlocksCount); i += blockBatchSize {
end := i + blockBatchSize
if end > int(missingBlocksCount) {
end = int(missingBlocksCount)
}
batch := make([]uint64, end-i)
for j := i; j < end; j++ {
batch[j-i] = uint64(buffer[j])
}
v.processMissingBlocksBatch(batch)
}
} else {
v.logg.Debug("verifier running db compactor")
if err := v.db.Cleanup(); err != nil {
v.logg.Error("verifier compactor error", "error", err)
}
}
return nil
}
func (v *Verifier) processMissingBlocksBatch(batch []uint64) {
blocks, err := v.chain.GetBlocks(context.Background(), batch)
if err != nil {
v.logg.Error("batch blocks fetcher error", "error", "block_range", fmt.Sprintf("%d-%d", batch[0], batch[len(batch)-1]), "error", err)
}
for _, block := range blocks {
v.blockWorker.Submit(func() {
if err := v.blockProcessor.ProcessBlock(context.Background(), block); err != nil {
v.logg.Error("block processor error", "source", "verifier", "block", block.NumberU64(), "error", err)
}
})
}
}

View File

@ -2,27 +2,21 @@ package chain
import ( import (
"context" "context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/w3-celo/module/eth" "github.com/grassrootseconomics/w3-celo/module/eth"
"github.com/grassrootseconomics/w3-celo/w3types" "github.com/grassrootseconomics/w3-celo/w3types"
) )
func (c *Chain) GetTransactions(ctx context.Context, block types.Block) ([]types.Transaction, error) { func (c *Chain) GetTransaction(ctx context.Context, txHash common.Hash) (types.Transaction, error) {
txCount := len(block.Transactions()) var transaction types.Transaction
if err := c.Provider.Client.CallCtx(ctx, eth.Tx(txHash).Returns(&transaction)); err != nil {
calls := make([]w3types.RPCCaller, txCount) return transaction, err
transactions := make([]types.Transaction, txCount)
for i, tx := range block.Transactions() {
calls[i] = eth.Tx(tx.Hash()).Returns(&transactions[i])
} }
if err := c.Provider.Client.CallCtx(ctx, calls...); err != nil { return transaction, nil
return nil, err
}
return transactions, nil
} }
func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Receipt, error) { func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Receipt, error) {
@ -41,3 +35,7 @@ func (c *Chain) GetReceipts(ctx context.Context, block types.Block) ([]types.Rec
return receipts, nil return receipts, nil
} }
func (c *Chain) GetBlockReceipts(ctx context.Context, blockNumber *big.Int) (types.Receipts, error) {
return nil, nil
}