release: v1.0.0-rc

This commit is contained in:
2024-09-05 09:48:59 +03:00
commit 30729e9318
50 changed files with 4404 additions and 0 deletions

23
internal/api/api.go Normal file
View File

@@ -0,0 +1,23 @@
package api
import (
"net/http"
"github.com/VictoriaMetrics/metrics"
"github.com/uptrace/bunrouter"
)
func New() *bunrouter.Router {
router := bunrouter.New()
router.GET("/metrics", metricsHandler())
return router
}
func metricsHandler() bunrouter.HandlerFunc {
return func(w http.ResponseWriter, _ bunrouter.Request) error {
metrics.WritePrometheus(w, true)
return nil
}
}

View File

@@ -0,0 +1,122 @@
package backfill
import (
"fmt"
"log/slog"
"time"
"github.com/grassrootseconomics/celo-tracker/db"
"github.com/grassrootseconomics/celo-tracker/internal/pool"
)
type (
BackfillOpts struct {
DB db.DB
Logg *slog.Logger
Pool *pool.Pool
}
Backfill struct {
db db.DB
logg *slog.Logger
pool *pool.Pool
stopCh chan struct{}
ticker *time.Ticker
}
)
const (
idleCheckInterval = 60 * time.Second
busyCheckInterval = 1 * time.Second
maxPoolSizePush = 100
)
func New(o BackfillOpts) *Backfill {
return &Backfill{
db: o.DB,
logg: o.Logg,
pool: o.Pool,
stopCh: make(chan struct{}),
ticker: time.NewTicker(idleCheckInterval),
}
}
func (b *Backfill) Stop() {
b.ticker.Stop()
b.stopCh <- struct{}{}
}
func (b *Backfill) Start() {
for {
select {
case <-b.stopCh:
b.logg.Debug("backfill shutting down")
return
case <-b.ticker.C:
if b.pool.Size() <= 1 {
if err := b.Run(true); err != nil {
b.logg.Error("backfill run error", "err", err)
}
b.logg.Debug("backfill successful run", "queue_size", b.pool.Size())
} else {
b.logg.Debug("skipping backfill tick", "queue_size", b.pool.Size())
}
}
}
}
func (b *Backfill) Run(skipLatest bool) error {
lower, err := b.db.GetLowerBound()
if err != nil {
return fmt.Errorf("verifier could not get lower bound from db: err %v", err)
}
upper, err := b.db.GetUpperBound()
if err != nil {
return fmt.Errorf("verifier could not get upper bound from db: err %v", err)
}
if skipLatest {
upper--
}
missingBlocks, err := b.db.GetMissingValuesBitSet(lower, upper)
if err != nil {
return fmt.Errorf("verifier could not get missing values bitset: err %v", err)
}
missingBlocksCount := missingBlocks.Count()
if missingBlocksCount > 0 {
b.logg.Info("found missing blocks", "skip_latest", skipLatest, "missing_blocks_count", missingBlocksCount)
buffer := make([]uint, maxPoolSizePush)
j := uint(0)
pushedCount := 0
j, buffer = missingBlocks.NextSetMany(j, buffer)
for ; len(buffer) > 0; j, buffer = missingBlocks.NextSetMany(j, buffer) {
for k := range buffer {
if pushedCount >= maxPoolSizePush {
break
}
b.pool.Push(uint64(buffer[k]))
b.logg.Debug("pushed block from backfill", "block", buffer[k])
pushedCount++
}
j += 1
}
}
if missingBlocksCount > maxPoolSizePush {
b.ticker.Reset(busyCheckInterval)
} else {
b.ticker.Reset(idleCheckInterval)
}
missingBlocks.ClearAll()
missingBlocks = nil
b.logg.Info("backfill tick run complete")
return nil
}

62
internal/cache/cache.go vendored Normal file
View File

@@ -0,0 +1,62 @@
package cache
import (
"context"
"log/slog"
)
type (
Cache interface {
Add(context.Context, string) error
Remove(context.Context, string) error
Exists(context.Context, string) (bool, error)
Size(context.Context) (int64, error)
}
CacheOpts struct {
Logg *slog.Logger
RedisDSN string
CacheType string
}
)
func New(o CacheOpts) (Cache, error) {
var cache Cache
switch o.CacheType {
case "map":
cache = NewMapCache()
case "redis":
redisCache, err := NewRedisCache(redisOpts{
DSN: o.RedisDSN,
})
if err != nil {
return nil, err
}
cache = redisCache
default:
cache = NewMapCache()
o.Logg.Warn("invalid cache type, using default type (map)")
}
// geSmartContracts, err := o.Chain.Provider().GetGESmartContracts(
// context.Background(),
// o.Registries,
// )
// if err != nil {
// return nil, fmt.Errorf("cache could not bootstrap GE smart contracts: err %v", err)
// }
// for k, v := range geSmartContracts {
// cache.Add(k, v)
// }
// for _, address := range o.Watchlist {
// cache.Add(address, false)
// }
// for _, address := range o.Blacklist {
// cache.Remove(address)
// }
// o.Logg.Info("cache bootstrap complete", "cached_addresses", cache.Size())
return cache, nil
}

56
internal/cache/redis.go vendored Normal file
View File

@@ -0,0 +1,56 @@
package cache
import (
"context"
"github.com/redis/rueidis"
)
type (
redisOpts struct {
DSN string
}
redisCache struct {
client rueidis.Client
}
)
func NewRedisCache(o redisOpts) (Cache, error) {
client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{o.DSN},
})
if err != nil {
return nil, err
}
return &redisCache{
client: client,
}, nil
}
func (c *redisCache) Add(ctx context.Context, key string) error {
// Without NX it will overwrite any existing KEY
cmd := c.client.B().Set().Key(key).Value("true").Build()
return c.client.Do(ctx, cmd).Error()
}
func (c *redisCache) Remove(ctx context.Context, key string) error {
cmd := c.client.B().Del().Key(key).Build()
return c.client.Do(ctx, cmd).Error()
}
func (c *redisCache) Exists(ctx context.Context, key string) (bool, error) {
cmd := c.client.B().Exists().Key(key).Build()
res, err := c.client.Do(ctx, cmd).AsBool()
if err != nil {
return false, err
}
return res, nil
}
func (c *redisCache) Size(ctx context.Context) (int64, error) {
cmd := c.client.B().Dbsize().Build()
return c.client.Do(ctx, cmd).AsInt64()
}

36
internal/cache/xmap.go vendored Normal file
View File

@@ -0,0 +1,36 @@
package cache
import (
"context"
"github.com/puzpuzpuz/xsync/v3"
)
type mapCache struct {
xmap *xsync.Map
}
func NewMapCache() Cache {
return &mapCache{
xmap: xsync.NewMap(),
}
}
func (c *mapCache) Add(_ context.Context, key string) error {
c.xmap.Store(key, true)
return nil
}
func (c *mapCache) Remove(_ context.Context, key string) error {
c.xmap.Delete(key)
return nil
}
func (c *mapCache) Exists(_ context.Context, key string) (bool, error) {
_, ok := c.xmap.Load(key)
return ok, nil
}
func (c *mapCache) Size(_ context.Context) (int64, error) {
return int64(c.xmap.Size()), nil
}

121
internal/chain/celo_rpc.go Normal file
View File

@@ -0,0 +1,121 @@
package chain
import (
"context"
"math/big"
"net/http"
"time"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/rpc"
"github.com/grassrootseconomics/celoutils/v3"
"github.com/grassrootseconomics/w3-celo"
"github.com/grassrootseconomics/w3-celo/module/eth"
"github.com/grassrootseconomics/w3-celo/w3types"
)
type (
CeloRPCOpts struct {
RPCEndpoint string
ChainID int64
}
CeloRPC struct {
provider *celoutils.Provider
}
)
func NewRPCFetcher(o CeloRPCOpts) (Chain, error) {
customRPCClient, err := lowTimeoutRPCClient(o.RPCEndpoint)
if err != nil {
return nil, err
}
chainProvider := celoutils.NewProvider(
o.RPCEndpoint,
o.ChainID,
celoutils.WithClient(customRPCClient),
)
return &CeloRPC{
provider: chainProvider,
}, nil
}
func lowTimeoutRPCClient(rpcEndpoint string) (*w3.Client, error) {
httpClient := &http.Client{
Timeout: 10 * time.Second,
}
rpcClient, err := rpc.DialHTTPWithClient(
rpcEndpoint,
httpClient,
)
if err != nil {
return nil, err
}
return w3.NewClient(rpcClient), nil
}
func (c *CeloRPC) GetBlocks(ctx context.Context, blockNumbers []uint64) ([]types.Block, error) {
blocksCount := len(blockNumbers)
calls := make([]w3types.RPCCaller, blocksCount)
blocks := make([]types.Block, blocksCount)
for i, v := range blockNumbers {
calls[i] = eth.BlockByNumber(new(big.Int).SetUint64(v)).Returns(&blocks[i])
}
if err := c.provider.Client.CallCtx(ctx, calls...); err != nil {
return nil, err
}
return blocks, nil
}
func (c *CeloRPC) GetBlock(ctx context.Context, blockNumber uint64) (*types.Block, error) {
var block types.Block
blockCall := eth.BlockByNumber(new(big.Int).SetUint64(blockNumber)).Returns(&block)
if err := c.provider.Client.CallCtx(ctx, blockCall); err != nil {
return nil, err
}
return &block, nil
}
func (c *CeloRPC) GetLatestBlock(ctx context.Context) (uint64, error) {
var latestBlock big.Int
latestBlockCall := eth.BlockNumber().Returns(&latestBlock)
if err := c.provider.Client.CallCtx(ctx, latestBlockCall); err != nil {
return 0, err
}
return latestBlock.Uint64(), nil
}
func (c *CeloRPC) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, error) {
var transaction types.Transaction
if err := c.provider.Client.CallCtx(ctx, eth.Tx(txHash).Returns(&transaction)); err != nil {
return nil, err
}
return &transaction, nil
}
func (c *CeloRPC) GetReceipts(ctx context.Context, blockNumber *big.Int) (types.Receipts, error) {
var receipts types.Receipts
if err := c.provider.Client.CallCtx(ctx, eth.BlockReceipts(blockNumber).Returns(&receipts)); err != nil {
return nil, err
}
return receipts, nil
}
func (c *CeloRPC) Provider() *celoutils.Provider {
return c.provider
}

View File

@@ -0,0 +1,98 @@
package chain
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
const (
testRPCEndpoint = "https://celo.archive.grassecon.net"
testChainID = 42220
)
func setupCeloRPC() (Chain, error) {
opts := CeloRPCOpts{
RPCEndpoint: testRPCEndpoint,
ChainID: testChainID,
}
return NewRPCFetcher(opts)
}
func TestRPC_GetBlocks(t *testing.T) {
rpcFetcher, err := setupCeloRPC()
require.NoError(t, err)
blockNumbers := []uint64{
19_600_000,
23_000_000,
27_000_000,
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
blocks, err := rpcFetcher.GetBlocks(ctx, blockNumbers)
require.NoError(t, err)
t.Logf("blocks %+v\n", blocks)
}
func TestRPC_GetBlock(t *testing.T) {
rpcFetcher, err := setupCeloRPC()
require.NoError(t, err)
var blockNumber uint64 = 19_900_000
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := rpcFetcher.GetBlock(ctx, blockNumber)
require.NoError(t, err)
t.Logf("block %+v\n", block)
}
func TestRPC_GetLatestBlock(t *testing.T) {
rpcFetcher, err := setupCeloRPC()
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := rpcFetcher.GetLatestBlock(ctx)
require.NoError(t, err)
t.Logf("block %+v\n", block)
}
func TestRPC_GetTransaction(t *testing.T) {
rpcFetcher, err := setupCeloRPC()
require.NoError(t, err)
var blockNumber uint64 = 19_900_000
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := rpcFetcher.GetBlock(ctx, blockNumber)
require.NoError(t, err)
require.NotNil(t, block)
transaction, err := rpcFetcher.GetTransaction(ctx, block.Transactions()[0].Hash())
require.NoError(t, err)
t.Logf("transaction %+v\n", transaction)
}
func TestRPC_GetReceipts(t *testing.T) {
rpcFetcher, err := setupCeloRPC()
require.NoError(t, err)
var blockNumber uint64 = 19_900_000
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
block, err := rpcFetcher.GetBlock(ctx, blockNumber)
require.NoError(t, err)
require.NotNil(t, block)
receipts, err := rpcFetcher.GetReceipts(ctx, block.Number())
require.NoError(t, err)
t.Logf("receipts %+v\n", receipts)
}

20
internal/chain/chain.go Normal file
View File

@@ -0,0 +1,20 @@
package chain
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celoutils/v3"
)
type Chain interface {
GetBlocks(context.Context, []uint64) ([]types.Block, error)
GetBlock(context.Context, uint64) (*types.Block, error)
GetLatestBlock(context.Context) (uint64, error)
GetTransaction(context.Context, common.Hash) (*types.Transaction, error)
GetReceipts(context.Context, *big.Int) (types.Receipts, error)
// Expose provider until we eject from celoutils
Provider() *celoutils.Provider
}

View File

@@ -0,0 +1,3 @@
package chain
// TBA

66
internal/pool/pool.go Normal file
View File

@@ -0,0 +1,66 @@
package pool
import (
"context"
"log/slog"
"runtime/debug"
"github.com/alitto/pond"
"github.com/grassrootseconomics/celo-tracker/internal/processor"
)
type (
PoolOpts struct {
Logg *slog.Logger
WorkerCount int
Processor *processor.Processor
}
Pool struct {
logg *slog.Logger
workerPool *pond.WorkerPool
processor *processor.Processor
}
)
const blocksBuffer = 100
func New(o PoolOpts) *Pool {
return &Pool{
logg: o.Logg,
workerPool: pond.New(
o.WorkerCount,
blocksBuffer,
pond.Strategy(pond.Balanced()),
pond.PanicHandler(panicHandler(o.Logg)),
),
processor: o.Processor,
}
}
func (p *Pool) Stop() {
p.workerPool.StopAndWait()
}
func (p *Pool) Push(block uint64) {
p.workerPool.Submit(func() {
err := p.processor.ProcessBlock(context.Background(), block)
if err != nil {
p.logg.Error("block processor error", "block_number", block, "error", err)
}
})
}
func (p *Pool) Size() uint64 {
return p.workerPool.WaitingTasks()
}
func (p *Pool) ActiveWorkers() int {
return p.workerPool.RunningWorkers()
}
func panicHandler(logg *slog.Logger) func(interface{}) {
return func(panic interface{}) {
logg.Error("block processor goroutine exited from a panic", "error", panic, "stack_trace", string(debug.Stack()))
}
}

View File

@@ -0,0 +1,116 @@
package processor
import (
"context"
"errors"
"fmt"
"log/slog"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/db"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/router"
)
type (
ProcessorOpts struct {
Cache cache.Cache
Chain chain.Chain
DB db.DB
Router *router.Router
Logg *slog.Logger
}
Processor struct {
cache cache.Cache
chain chain.Chain
db db.DB
router *router.Router
logg *slog.Logger
}
)
func NewProcessor(o ProcessorOpts) *Processor {
return &Processor{
cache: o.Cache,
chain: o.Chain,
db: o.DB,
router: o.Router,
logg: o.Logg,
}
}
func (p *Processor) ProcessBlock(ctx context.Context, blockNumber uint64) error {
block, err := p.chain.GetBlock(ctx, blockNumber)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("block %d error: %v", blockNumber, err)
}
receipts, err := p.chain.GetReceipts(ctx, block.Number())
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("receipts fetch error: block %d: %v", blockNumber, err)
}
for _, receipt := range receipts {
if receipt.Status > 0 {
for _, log := range receipt.Logs {
exists, err := p.cache.Exists(ctx, log.Address.Hex())
if err != nil {
return err
}
if exists {
if err := p.router.RouteSuccessTx(
ctx,
router.SuccessTx{
Log: log,
Timestamp: block.Time(),
},
); err != nil && !errors.Is(err, context.Canceled) {
return err
}
}
}
} else {
tx, err := p.chain.GetTransaction(ctx, receipt.TxHash)
if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("get transaction error: tx %s: %v", receipt.TxHash.Hex(), err)
}
if tx.To() != nil {
exists, err := p.cache.Exists(ctx, tx.To().Hex())
if err != nil {
return err
}
if exists {
from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx)
if err != nil {
return fmt.Errorf("transaction decode error: tx %s: %v", receipt.TxHash.Hex(), err)
}
if err := p.router.RouteRevertTx(
ctx,
router.RevertTx{
From: from.Hex(),
InputData: common.Bytes2Hex(tx.Data()),
Block: blockNumber,
ContractAddress: tx.To().Hex(),
Timestamp: block.Time(),
TxHash: receipt.TxHash.Hex(),
},
); err != nil && !errors.Is(err, context.Canceled) {
return err
}
}
}
}
}
if err := p.db.SetValue(blockNumber); err != nil {
return err
}
p.logg.Debug("successfully processed block", "block", blockNumber)
return nil
}

92
internal/pub/jetstream.go Normal file
View File

@@ -0,0 +1,92 @@
package pub
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/nats-io/nats.go"
)
type (
JetStreamOpts struct {
Logg *slog.Logger
Endpoint string
DedupDuration time.Duration
PersistDuration time.Duration
}
jetStreamPub struct {
natsConn *nats.Conn
jsCtx nats.JetStreamContext
}
)
const streamName string = "TRACKER"
var streamSubjects = []string{
"TRACKER.*",
}
func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
natsConn, err := nats.Connect(o.Endpoint)
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
o.Logg.Info("successfully connected to NATS server")
stream, err := js.StreamInfo(streamName)
if err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
return nil, err
}
if stream == nil {
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: streamSubjects,
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName)
}
return &jetStreamPub{
natsConn: natsConn,
jsCtx: js,
}, nil
}
func (p *jetStreamPub) Close() {
if p.natsConn != nil {
p.natsConn.Close()
}
}
func (p *jetStreamPub) Send(_ context.Context, payload event.Event) error {
data, err := payload.Serialize()
if err != nil {
return err
}
_, err = p.jsCtx.Publish(
fmt.Sprintf("%s.%s", streamName, payload.TxType),
data,
nats.MsgId(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)),
)
if err != nil {
return err
}
return nil
}

12
internal/pub/pub.go Normal file
View File

@@ -0,0 +1,12 @@
package pub
import (
"context"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
)
type Pub interface {
Send(context.Context, event.Event) error
Close()
}

73
internal/queue/queue.go Normal file
View File

@@ -0,0 +1,73 @@
package queue
//
// import (
// "context"
// "log/slog"
// "github.com/alitto/pond"
// "github.com/grassrootseconomics/celo-tracker/internal/processor"
// )
// type (
// QueueOpts struct {
// QueueSize int
// Logg *slog.Logger
// Processor *processor.Processor
// Pool *pond.WorkerPool
// }
// Queue struct {
// logg *slog.Logger
// processChan chan uint64
// stopSignal chan interface{}
// processor *processor.Processor
// pool *pond.WorkerPool
// }
// )
// func New(o QueueOpts) *Queue {
// return &Queue{
// logg: o.Logg,
// processChan: make(chan uint64, o.QueueSize),
// stopSignal: make(chan interface{}),
// processor: o.Processor,
// pool: o.Pool,
// }
// }
// func (q *Queue) Stop() {
// q.stopSignal <- struct{}{}
// }
// func (q *Queue) Process() {
// for {
// select {
// case <-q.stopSignal:
// q.logg.Info("shutdown signal received stopping queue processing")
// return
// case block, ok := <-q.processChan:
// if !ok {
// return
// }
// q.pool.Submit(func() {
// err := q.processor.ProcessBlock(context.Background(), block)
// if err != nil {
// q.logg.Error("block processor error", "block_number", block, "error", err)
// }
// })
// }
// }
// }
// func (q *Queue) Push(block uint64) {
// q.processChan <- block
// }
// func (q *Queue) Size() int {
// return len(q.processChan)
// }
// func (q *Queue) WaitingSize() uint64 {
// return q.pool.WaitingTasks()
// }

View File

@@ -0,0 +1,94 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/celoutils/v3"
"github.com/grassrootseconomics/w3-celo"
)
type faucetGiveHandler struct{}
const faucetGiveEventName = "FAUCET_GIVE"
var (
_ Handler = (*faucetGiveHandler)(nil)
faucetGiveEvent = w3.MustNewEvent("Give(address indexed _recipient, address indexed _token, uint256 _amount)")
faucetGiveToSig = w3.MustNewFunc("giveTo(address)", "uint256")
faucetGimmeSig = w3.MustNewFunc("gimme()", "uint256")
)
func (h *faucetGiveHandler) Name() string {
return faucetGiveEventName
}
func (h *faucetGiveHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
recipient common.Address
token common.Address
amount big.Int
)
if err := faucetGiveEvent.DecodeArgs(tx.Log, &recipient, &token, &amount); err != nil {
return err
}
faucetGiveEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: faucetGiveEventName,
Payload: map[string]any{
"recipient": recipient.Hex(),
"token": token.Hex(),
"amount": amount.String(),
},
}
return pubCB(ctx, faucetGiveEvent)
}
func (h *faucetGiveHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
faucetGiveEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: faucetGiveEventName,
}
switch tx.InputData[:8] {
case "63e4bff4":
var to common.Address
if err := faucetGiveToSig.DecodeArgs(w3.B(tx.InputData), &to); err != nil {
return err
}
faucetGiveEvent.Payload = map[string]any{
"recipient": to.Hex(),
"token": celoutils.ZeroAddress,
"amount": "0",
}
return pubCB(ctx, faucetGiveEvent)
case "de82efb4":
faucetGiveEvent.Payload = map[string]any{
"recipient": celoutils.ZeroAddress,
"token": celoutils.ZeroAddress,
"amount": "0",
}
return pubCB(ctx, faucetGiveEvent)
}
return nil
}

View File

@@ -0,0 +1,95 @@
package router
import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type indexAddHandler struct {
cache cache.Cache
}
const indexAddEventName = "INDEX_ADD"
var (
_ Handler = (*indexAddHandler)(nil)
indexAddEvent = w3.MustNewEvent("AddressAdded(address _token)")
indexAddSig = w3.MustNewFunc("add(address)", "bool")
indexRegisterSig = w3.MustNewFunc("register(address)", "bool")
)
func (h *indexAddHandler) Name() string {
return indexAddEventName
}
func (h *indexAddHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var address common.Address
if err := indexAddEvent.DecodeArgs(tx.Log, &address); err != nil {
return err
}
indexAddEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: indexAddEventName,
Payload: map[string]any{
"address": address.Hex(),
},
}
if err := h.cache.Add(ctx, address.Hex()); err != nil {
return err
}
return pubCB(ctx, indexAddEvent)
}
func (h *indexAddHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
indexAddEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: indexAddEventName,
}
switch tx.InputData[:8] {
case "0a3b0a4f":
var address common.Address
indexAddEvent.Payload = map[string]any{
"address": address.Hex(),
}
if err := indexAddSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil {
return err
}
return pubCB(ctx, indexAddEvent)
case "4420e486":
var address common.Address
indexAddEvent.Payload = map[string]any{
"address": address.Hex(),
}
if err := indexRegisterSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil {
return err
}
return pubCB(ctx, indexAddEvent)
}
return nil
}

View File

@@ -0,0 +1,77 @@
package router
import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type indexRemoveHandler struct {
cache cache.Cache
}
const indexRemoveEventName = "INDEX_REMOVE"
var (
_ Handler = (*indexRemoveHandler)(nil)
indexRemoveEvent = w3.MustNewEvent("AddressRemoved(address _token)")
indexRemoveSig = w3.MustNewFunc("remove(address)", "bool")
)
func (h *indexRemoveHandler) Name() string {
return indexRemoveEventName
}
func (h *indexRemoveHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var address common.Address
if err := indexRemoveEvent.DecodeArgs(tx.Log, &address); err != nil {
return err
}
indexRemoveEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: indexRemoveEventName,
Payload: map[string]any{
"address": address.Hex(),
},
}
if err := h.cache.Remove(ctx, address.Hex()); err != nil {
return err
}
return pubCB(ctx, indexRemoveEvent)
}
func (h *indexRemoveHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var address common.Address
if err := indexRemoveSig.DecodeArgs(w3.B(tx.InputData), &address); err != nil {
return err
}
indexRemoveEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: indexRemoveEventName,
Payload: map[string]any{
"address": address.Hex(),
},
}
return pubCB(ctx, indexRemoveEvent)
}

View File

@@ -0,0 +1,77 @@
package router
import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type ownershipHandler struct{}
const (
ownershipEventName = "OWNERSHIP_TRANSFERRED"
)
var (
_ Handler = (*ownershipHandler)(nil)
ownershipEvent = w3.MustNewEvent("OwnershipTransferred(address indexed previousOwner, address indexed newOwner)")
ownershipToSig = w3.MustNewFunc("transferOwnership(address)", "bool")
)
func (h *ownershipHandler) Name() string {
return ownershipEventName
}
func (h *ownershipHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
previousOwner common.Address
newOwner common.Address
)
if err := ownershipEvent.DecodeArgs(tx.Log, &previousOwner, &newOwner); err != nil {
return err
}
ownershipEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: ownershipEventName,
Payload: map[string]any{
"previousOwner": previousOwner.Hex(),
"newOwner": newOwner.Hex(),
},
}
return pubCB(ctx, ownershipEvent)
}
func (h *ownershipHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var newOwner common.Address
if err := ownershipToSig.DecodeArgs(w3.B(tx.InputData), &newOwner); err != nil {
return err
}
ownershipEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: ownershipEventName,
Payload: map[string]any{
"previousOwner": tx.From,
"newOwner": newOwner.Hex(),
},
}
return pubCB(ctx, ownershipEvent)
}

View File

@@ -0,0 +1,86 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type poolDepositHandler struct{}
const poolDepositEventName = "POOL_DEPOSIT"
var (
_ Handler = (*poolDepositHandler)(nil)
poolDepositEvent = w3.MustNewEvent("Deposit(address indexed initiator, address indexed tokenIn, uint256 amountIn)")
poolDepositSig = w3.MustNewFunc("deposit(address, uint256)", "")
)
func (h *poolDepositHandler) Name() string {
return poolDepositEventName
}
func (h *poolDepositHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
initiator common.Address
tokenIn common.Address
amountIn big.Int
)
if err := poolDepositEvent.DecodeArgs(
tx.Log,
&initiator,
&tokenIn,
&amountIn,
); err != nil {
return err
}
poolDepositEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: poolDepositEventName,
Payload: map[string]any{
"initiator": initiator.Hex(),
"tokenIn": tokenIn.Hex(),
"amountIn": amountIn.String(),
},
}
return pubCB(ctx, poolDepositEvent)
}
func (h *poolDepositHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var (
tokenIn common.Address
amountIn big.Int
)
if err := poolDepositSig.DecodeArgs(w3.B(tx.InputData), &tokenIn, &amountIn); err != nil {
return err
}
poolDepositEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: poolDepositEventName,
Payload: map[string]any{
"initiator": tx.From,
"tokenIn": tokenIn.Hex(),
"amountIn": amountIn.String(),
},
}
return pubCB(ctx, poolDepositEvent)
}

View File

@@ -0,0 +1,99 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type poolSwapHandler struct{}
const poolSwapEventName = "POOL_SWAP"
var (
_ Handler = (*poolSwapHandler)(nil)
poolSwapEvent = w3.MustNewEvent("Swap(address indexed initiator, address indexed tokenIn, address tokenOut, uint256 amountIn, uint256 amountOut, uint256 fee)")
poolSwapSig = w3.MustNewFunc("withdraw(address, address, uint256)", "")
)
func (h *poolSwapHandler) Name() string {
return poolSwapEventName
}
func (h *poolSwapHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
initiator common.Address
tokenIn common.Address
tokenOut common.Address
amountIn big.Int
amountOut big.Int
fee big.Int
)
if err := poolSwapEvent.DecodeArgs(
tx.Log,
&initiator,
&tokenIn,
&tokenOut,
&amountIn,
&amountOut,
&fee,
); err != nil {
return err
}
poolSwapEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: poolSwapEventName,
Payload: map[string]any{
"initiator": initiator.Hex(),
"tokenIn": tokenIn.Hex(),
"tokenOut": tokenOut.Hex(),
"amountIn": amountIn.String(),
"amountOut": amountOut.String(),
"fee": fee.String(),
},
}
return pubCB(ctx, poolSwapEvent)
}
func (h *poolSwapHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var (
tokenOut common.Address
tokenIn common.Address
amountIn big.Int
)
if err := poolSwapSig.DecodeArgs(w3.B(tx.InputData), &tokenOut, &tokenIn, &amountIn); err != nil {
return err
}
poolSwapEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: poolSwapEventName,
Payload: map[string]any{
"initiator": tx.From,
"tokenIn": tokenIn.Hex(),
"tokenOut": tokenOut.Hex(),
"amountIn": amountIn.String(),
"amountOut": "0",
"fee": "0",
},
}
return pubCB(ctx, poolSwapEvent)
}

View File

@@ -0,0 +1,78 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type quoterPriceHandler struct{}
const quoterPriceEventName = "QUOTER_PRICE_INDEX_UPDATED"
var (
_ Handler = (*quoterPriceHandler)(nil)
quoterPriceEvent = w3.MustNewEvent("PriceIndexUpdated(address _tokenAddress, uint256 _exchangeRate)")
quoterPriceToSig = w3.MustNewFunc("setPriceIndexValue(address, uint256)", "uint256")
)
func (h *quoterPriceHandler) Name() string {
return quoterPriceEventName
}
func (h *quoterPriceHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
token common.Address
exchangeRate big.Int
)
if err := quoterPriceEvent.DecodeArgs(tx.Log, &token, &exchangeRate); err != nil {
return err
}
quoterPriceEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: quoterPriceEventName,
Payload: map[string]any{
"token": token.Hex(),
"exchangeRate": exchangeRate.String(),
},
}
return pubCB(ctx, quoterPriceEvent)
}
func (h *quoterPriceHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var (
token common.Address
exchangeRate big.Int
)
if err := quoterPriceToSig.DecodeArgs(w3.B(tx.InputData), &token, &exchangeRate); err != nil {
return err
}
quoterPriceEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: quoterPriceEventName,
Payload: map[string]any{
"token": token.Hex(),
"exchangeRate": exchangeRate.String(),
},
}
return pubCB(ctx, quoterPriceEvent)
}

123
internal/router/router.go Normal file
View File

@@ -0,0 +1,123 @@
package router
import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/pub"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type (
PubCallback func(context.Context, event.Event) error
SuccessTx struct {
Log *types.Log
Timestamp uint64
}
RevertTx struct {
From string
InputData string
Block uint64
ContractAddress string
Timestamp uint64
TxHash string
}
Handler interface {
Name() string
SuccessTx(context.Context, SuccessTx, PubCallback) error
RevertTx(context.Context, RevertTx, PubCallback) error
}
RouterOpts struct {
Pub pub.Pub
Cache cache.Cache
}
Router struct {
pub pub.Pub
cache cache.Cache
logHandlers map[common.Hash]Handler
inputDataHandlers map[string]Handler
}
)
func New(o RouterOpts) *Router {
var (
indexAddHandler *indexAddHandler = &indexAddHandler{cache: o.Cache}
indexRemoveHandler *indexRemoveHandler = &indexRemoveHandler{cache: o.Cache}
tokenTransferHandler *tokenTransferHandler = &tokenTransferHandler{cache: o.Cache}
faucetGiveHandler *faucetGiveHandler = &faucetGiveHandler{}
ownershipHandler *ownershipHandler = &ownershipHandler{}
poolDepositHandler *poolDepositHandler = &poolDepositHandler{}
poolSwapHandler *poolSwapHandler = &poolSwapHandler{}
quoterPriceHandler *quoterPriceHandler = &quoterPriceHandler{}
sealHandler *sealHandler = &sealHandler{}
tokenBurnHandler *tokenBurnHandler = &tokenBurnHandler{}
tokenMintHandler *tokenMintHandler = &tokenMintHandler{}
)
logHandlers := map[common.Hash]Handler{
w3.H("0x26162814817e23ec5035d6a2edc6c422da2da2119e27cfca6be65cc2dc55ca4c"): faucetGiveHandler,
w3.H("0xa226db3f664042183ee0281230bba26cbf7b5057e50aee7f25a175ff45ce4d7f"): indexAddHandler,
w3.H("0x24a12366c02e13fe4a9e03d86a8952e85bb74a456c16e4a18b6d8295700b74bb"): indexRemoveHandler,
w3.H("0x8be0079c531659141344cd1fd0a4f28419497f9722a3daafe3b4186f6b6457e0"): ownershipHandler,
w3.H("0x5548c837ab068cf56a2c2479df0882a4922fd203edb7517321831d95078c5f62"): poolDepositHandler,
w3.H("0xd6d34547c69c5ee3d2667625c188acf1006abb93e0ee7cf03925c67cf7760413"): poolSwapHandler,
w3.H("0xdb9ce1a76955721ca61ac50cd1b87f9ab8620325c8619a62192c2dc7871d56b1"): quoterPriceHandler,
w3.H("0x6b7e2e653f93b645d4ed7292d6429f96637084363e477c8aaea1a43ed13c284e"): sealHandler,
w3.H("0xcc16f5dbb4873280815c1ee09dbd06736cffcc184412cf7a71a0fdb75d397ca5"): tokenBurnHandler,
w3.H("0xab8530f87dc9b59234c4623bf917212bb2536d647574c8e7e5da92c2ede0c9f8"): tokenMintHandler,
w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"): tokenTransferHandler,
}
inputDataHandlers := map[string]Handler{
"63e4bff4": faucetGiveHandler,
"de82efb4": faucetGiveHandler,
"0a3b0a4f": indexAddHandler,
"4420e486": indexAddHandler,
"29092d0e": indexRemoveHandler,
"f2fde38b": ownershipHandler,
"47e7ef24": poolDepositHandler,
"d9caed12": poolSwapHandler,
"ebc59dff": quoterPriceHandler,
"86fe212d": sealHandler,
"42966c68": tokenBurnHandler,
"449a52f8": tokenMintHandler,
"a9059cbb": tokenTransferHandler,
"23b872dd": tokenTransferHandler,
}
return &Router{
pub: o.Pub,
logHandlers: logHandlers,
inputDataHandlers: inputDataHandlers,
}
}
func (r *Router) RouteSuccessTx(ctx context.Context, msg SuccessTx) error {
handler, ok := r.logHandlers[msg.Log.Topics[0]]
if ok {
return handler.SuccessTx(ctx, msg, r.pub.Send)
}
return nil
}
func (r *Router) RouteRevertTx(ctx context.Context, msg RevertTx) error {
if len(msg.InputData) < 8 {
return nil
}
handler, ok := r.inputDataHandlers[msg.InputData[:8]]
if ok {
return handler.RevertTx(ctx, msg, r.pub.Send)
}
return nil
}

73
internal/router/seal.go Normal file
View File

@@ -0,0 +1,73 @@
package router
import (
"context"
"math/big"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type sealHandler struct{}
const sealEventName = "SEAL_STATE_CHANGE"
var (
_ Handler = (*sealHandler)(nil)
sealEvent = w3.MustNewEvent("SealStateChange(bool indexed _final, uint256 _sealState)")
sealToSig = w3.MustNewFunc("seal(uint256)", "uint256")
)
func (h *sealHandler) Name() string {
return sealEventName
}
func (h *sealHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
final bool
sealState big.Int
)
if err := sealEvent.DecodeArgs(tx.Log, &final, &sealState); err != nil {
return err
}
sealEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: sealEventName,
Payload: map[string]any{
"final": final,
"sealState": sealState.String(),
},
}
return pubCB(ctx, sealEvent)
}
func (h *sealHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var sealState big.Int
if err := sealToSig.DecodeArgs(w3.B(tx.InputData), &sealState); err != nil {
return err
}
sealEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: sealEventName,
Payload: map[string]any{
"sealState": sealState.String(),
},
}
return pubCB(ctx, sealEvent)
}

View File

@@ -0,0 +1,75 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type tokenBurnHandler struct{}
const burnEventName = "TOKEN_BURN"
var (
_ Handler = (*tokenBurnHandler)(nil)
tokenBurnEvent = w3.MustNewEvent("Burn(address indexed _tokenBurner, uint256 _value)")
tokenBurnToSig = w3.MustNewFunc("Burn(uint256)", "bool")
)
func (h *tokenBurnHandler) Name() string {
return burnEventName
}
func (h *tokenBurnHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
tokenBurner common.Address
value big.Int
)
if err := tokenBurnEvent.DecodeArgs(tx.Log, &tokenBurner, &value); err != nil {
return err
}
tokenBurnEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: burnEventName,
Payload: map[string]any{
"tokenBurner": tokenBurner.Hex(),
"value": value.String(),
},
}
return pubCB(ctx, tokenBurnEvent)
}
func (h *tokenBurnHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var value big.Int
if err := tokenBurnToSig.DecodeArgs(w3.B(tx.InputData), &value); err != nil {
return err
}
tokenBurnEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: burnEventName,
Payload: map[string]any{
"tokenBurner": tx.From,
"value": value.String(),
},
}
return pubCB(ctx, tokenBurnEvent)
}

View File

@@ -0,0 +1,82 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/w3-celo"
)
type tokenMintHandler struct{}
const mintEventName = "TOKEN_MINT"
var (
_ Handler = (*tokenMintHandler)(nil)
tokenMintEvent = w3.MustNewEvent("Mint(address indexed _tokenMinter, address indexed _beneficiary, uint256 _value)")
tokenMintToSig = w3.MustNewFunc("MintTo(address, uint256)", "bool")
)
func (h *tokenMintHandler) Name() string {
return mintEventName
}
func (h *tokenMintHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
tokenMinter common.Address
to common.Address
value big.Int
)
if err := tokenMintEvent.DecodeArgs(tx.Log, &tokenMinter, &to, &value); err != nil {
return err
}
tokenMintEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: mintEventName,
Payload: map[string]any{
"tokenMinter": tokenMinter.Hex(),
"to": to.Hex(),
"value": value.String(),
},
}
return pubCB(ctx, tokenMintEvent)
}
func (h *tokenMintHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
var (
to common.Address
value big.Int
)
if err := tokenMintToSig.DecodeArgs(w3.B(tx.InputData), &to, &value); err != nil {
return err
}
tokenMintEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: mintEventName,
Payload: map[string]any{
"tokenMinter": tx.From,
"to": to.Hex(),
"value": value.String(),
},
}
return pubCB(ctx, tokenMintEvent)
}

View File

@@ -0,0 +1,161 @@
package router
import (
"context"
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/grassrootseconomics/celoutils/v3"
"github.com/grassrootseconomics/w3-celo"
)
type tokenTransferHandler struct {
cache cache.Cache
}
const transferEventName = "TOKEN_TRANSFER"
var (
_ Handler = (*tokenTransferHandler)(nil)
tokenTransferEvent = w3.MustNewEvent("Transfer(address indexed _from, address indexed _to, uint256 _value)")
tokenTransferSig = w3.MustNewFunc("transfer(address, uint256)", "bool")
tokenTransferFromSig = w3.MustNewFunc("transferFrom(address, address, uint256)", "bool")
stables = map[string]bool{
celoutils.CUSDContractMainnet: true,
celoutils.CKESContractMainnet: true,
celoutils.USDTContractMainnet: true,
celoutils.USDCContractMainnet: true,
}
)
func (h *tokenTransferHandler) Name() string {
return transferEventName
}
func (h *tokenTransferHandler) SuccessTx(ctx context.Context, tx SuccessTx, pubCB PubCallback) error {
var (
from common.Address
to common.Address
value big.Int
)
if err := tokenTransferEvent.DecodeArgs(tx.Log, &from, &to, &value); err != nil {
return err
}
proceed, err := h.checkStables(ctx, from.Hex(), to.Hex(), tx.Log.Address.Hex())
if err != nil {
return err
}
if !proceed {
return nil
}
tokenTransferEvent := event.Event{
Index: tx.Log.Index,
Block: tx.Log.BlockNumber,
ContractAddress: tx.Log.Address.Hex(),
Success: true,
Timestamp: tx.Timestamp,
TxHash: tx.Log.TxHash.Hex(),
TxType: transferEventName,
Payload: map[string]any{
"from": from.Hex(),
"to": to.Hex(),
"value": value.String(),
},
}
return pubCB(ctx, tokenTransferEvent)
}
func (h *tokenTransferHandler) RevertTx(ctx context.Context, tx RevertTx, pubCB PubCallback) error {
tokenTransferEvent := event.Event{
Block: tx.Block,
ContractAddress: tx.ContractAddress,
Success: false,
Timestamp: tx.Timestamp,
TxHash: tx.TxHash,
TxType: transferEventName,
}
switch tx.InputData[:8] {
case "a9059cbb":
var (
to common.Address
value big.Int
)
if err := tokenTransferSig.DecodeArgs(w3.B(tx.InputData), &to, &value); err != nil {
return err
}
proceed, err := h.checkStables(ctx, tx.From, to.Hex(), tx.ContractAddress)
if err != nil {
return err
}
if !proceed {
return nil
}
tokenTransferEvent.Payload = map[string]any{
"from": tx.From,
"to": to.Hex(),
"value": value.String(),
}
return pubCB(ctx, tokenTransferEvent)
case "23b872dd":
var (
from common.Address
to common.Address
value big.Int
)
if err := tokenTransferFromSig.DecodeArgs(w3.B(tx.InputData), &from, &to, &value); err != nil {
return err
}
proceed, err := h.checkStables(ctx, from.Hex(), to.Hex(), tx.ContractAddress)
if err != nil {
return err
}
if !proceed {
return nil
}
tokenTransferEvent.Payload = map[string]any{
"from": from.Hex(),
"to": to.Hex(),
"value": value.String(),
}
return pubCB(ctx, tokenTransferEvent)
}
return nil
}
func (h *tokenTransferHandler) checkStables(ctx context.Context, from string, to string, contractAddress string) (bool, error) {
_, ok := stables[contractAddress]
if !ok {
return true, nil
}
// TODO: Pipeline this check on Redis with a new method
fromExists, err := h.cache.Exists(ctx, from)
if err != nil {
return false, err
}
toExists, err := h.cache.Exists(ctx, to)
if err != nil {
return false, err
}
return fromExists || toExists, nil
}

88
internal/stats/stats.go Normal file
View File

@@ -0,0 +1,88 @@
package stats
import (
"context"
"log/slog"
"sync/atomic"
"time"
"github.com/grassrootseconomics/celo-tracker/internal/cache"
"github.com/grassrootseconomics/celo-tracker/internal/pool"
)
type (
StatsOpts struct {
Cache cache.Cache
Logg *slog.Logger
Pool *pool.Pool
}
Stats struct {
cache cache.Cache
logg *slog.Logger
pool *pool.Pool
stopCh chan struct{}
latestBlock atomic.Uint64
}
)
const statsPrinterInterval = 15 * time.Second
func New(o StatsOpts) *Stats {
return &Stats{
cache: o.Cache,
logg: o.Logg,
pool: o.Pool,
stopCh: make(chan struct{}),
}
}
func (s *Stats) SetLatestBlock(v uint64) {
s.latestBlock.Store(v)
}
func (s *Stats) GetLatestBlock() uint64 {
return s.latestBlock.Load()
}
func (s *Stats) Stop() {
s.stopCh <- struct{}{}
}
func (s *Stats) APIStatsResponse(ctx context.Context) (map[string]interface{}, error) {
cacheSize, err := s.cache.Size(ctx)
if err != nil {
return nil, err
}
return map[string]interface{}{
"latestBlock": s.GetLatestBlock(),
"poolQueueSize": s.pool.Size(),
"poolActiveWorkers": s.pool.ActiveWorkers(),
"cacheSize": cacheSize,
}, nil
}
func (s *Stats) StartStatsPrinter() {
ticker := time.NewTicker(statsPrinterInterval)
for {
select {
case <-s.stopCh:
s.logg.Debug("stats shutting down")
return
case <-ticker.C:
cacheSize, err := s.cache.Size(context.Background())
if err != nil {
s.logg.Error("stats printer could not fetch cache size", "error", err)
}
s.logg.Info("block stats",
"latest_block", s.GetLatestBlock(),
"pool_queue_size", s.pool.Size(),
"pool_active_workers", s.pool.ActiveWorkers(),
"cache_size", cacheSize,
)
}
}
}

View File

@@ -0,0 +1,80 @@
package syncer
import (
"context"
"time"
"github.com/celo-org/celo-blockchain"
"github.com/celo-org/celo-blockchain/core/types"
"github.com/celo-org/celo-blockchain/event"
)
type BlockQueueFn func(uint64) error
const resubscribeInterval = 2 * time.Second
func (s *Syncer) Stop() {
if s.realtimeSub != nil {
s.realtimeSub.Unsubscribe()
}
}
func (s *Syncer) Start() {
s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn())
}
func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) {
newHeadersReceiver := make(chan *types.Header, 1)
sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
s.logg.Info("realtime syncer connected to ws endpoint")
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
eventsCtx, eventsCancel := context.WithCancel(context.Background())
defer eventsCancel()
go func() {
select {
case <-quit:
s.logg.Info("realtime syncer stopping")
eventsCancel()
case <-eventsCtx.Done():
return
}
}()
for {
select {
case header := <-newHeadersReceiver:
if err := fn(header.Number.Uint64()); err != nil {
s.logg.Error("realtime block queuer error", "error", err)
}
case <-eventsCtx.Done():
s.logg.Info("realtime syncer shutting down")
return nil
case err := <-sub.Err():
return err
}
}
}), nil
}
func (s *Syncer) queueRealtimeBlock(blockNumber uint64) error {
s.pool.Push(blockNumber)
s.stats.SetLatestBlock(blockNumber)
if err := s.db.SetUpperBound(blockNumber); err != nil {
return err
}
return nil
}
func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc {
return func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
s.logg.Error("resubscribing after failed subscription", "error", err)
}
return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock)
}
}

77
internal/syncer/syncer.go Normal file
View File

@@ -0,0 +1,77 @@
package syncer
import (
"context"
"log/slog"
"github.com/celo-org/celo-blockchain"
"github.com/celo-org/celo-blockchain/ethclient"
"github.com/grassrootseconomics/celo-tracker/db"
"github.com/grassrootseconomics/celo-tracker/internal/chain"
"github.com/grassrootseconomics/celo-tracker/internal/pool"
"github.com/grassrootseconomics/celo-tracker/internal/stats"
)
type (
SyncerOpts struct {
DB db.DB
Chain chain.Chain
Logg *slog.Logger
Pool *pool.Pool
Stats *stats.Stats
StartBlock int64
WebSocketEndpoint string
}
Syncer struct {
db db.DB
ethClient *ethclient.Client
logg *slog.Logger
realtimeSub celo.Subscription
pool *pool.Pool
stats *stats.Stats
stopCh chan struct{}
}
)
func New(o SyncerOpts) (*Syncer, error) {
latestBlock, err := o.Chain.GetLatestBlock(context.Background())
if err != nil {
return nil, err
}
lowerBound, err := o.DB.GetLowerBound()
if err != nil {
return nil, err
}
if lowerBound == 0 {
if o.StartBlock > 0 {
if err := o.DB.SetLowerBound(uint64(o.StartBlock)); err != nil {
return nil, err
}
} else {
if err := o.DB.SetLowerBound(latestBlock); err != nil {
return nil, err
}
}
}
if err := o.DB.SetUpperBound(latestBlock); err != nil {
return nil, err
}
o.Stats.SetLatestBlock(latestBlock)
ethClient, err := ethclient.Dial(o.WebSocketEndpoint)
if err != nil {
return nil, err
}
return &Syncer{
db: o.DB,
ethClient: ethClient,
logg: o.Logg,
pool: o.Pool,
stats: o.Stats,
stopCh: make(chan struct{}),
}, nil
}

53
internal/util/init.go Normal file
View File

@@ -0,0 +1,53 @@
package util
import (
"log/slog"
"os"
"strings"
"github.com/kamikazechaser/common/logg"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/v2"
)
func InitLogger() *slog.Logger {
loggOpts := logg.LoggOpts{
FormatType: logg.Logfmt,
LogLevel: slog.LevelInfo,
}
if os.Getenv("DEBUG") != "" {
loggOpts.LogLevel = slog.LevelDebug
}
if os.Getenv("DEV") != "" {
loggOpts.LogLevel = slog.LevelDebug
loggOpts.FormatType = logg.Human
}
return logg.NewLogg(loggOpts)
}
func InitConfig(lo *slog.Logger, confFilePath string) *koanf.Koanf {
var (
ko = koanf.New(".")
)
confFile := file.Provider(confFilePath)
if err := ko.Load(confFile, toml.Parser()); err != nil {
lo.Error("could not parse configuration file", "error", err)
os.Exit(1)
}
if err := ko.Load(env.Provider("TRACKER_", ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, "TRACKER_")), "__", ".")
}), nil); err != nil {
lo.Error("could not override config from env vars", "error", err)
os.Exit(1)
}
return ko
}