From 1a49f07db61dd93555f6812a05c6e4a286adfedb Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Tue, 23 Apr 2024 15:34:17 +0800 Subject: [PATCH] feat: add NATS JetStream --- cmd/tracker/main.go | 16 ++++-- config.toml | 10 +++- dev/docker-compose.yaml | 8 +++ go.mod | 5 +- go.sum | 8 +++ internal/cache/cache.go | 2 +- internal/emitter/console.go | 29 ---------- internal/emitter/emitter.go | 20 ------- internal/event/event.go | 24 ++++++++ internal/handler/faucet_give.go | 19 ++++--- internal/handler/handler.go | 16 +----- internal/handler/index_add.go | 19 ++++--- internal/handler/index_remove.go | 15 ++--- internal/handler/ownership.go | 15 ++--- internal/handler/pool_deposit.go | 15 ++--- internal/handler/pool_swap.go | 15 ++--- internal/handler/quoter_price.go | 15 ++--- internal/handler/seal.go | 15 ++--- internal/handler/token_burn.go | 15 ++--- internal/handler/token_mint.go | 15 ++--- internal/handler/token_transfer.go | 19 ++++--- internal/processor/block.go | 20 +++---- internal/processor/processor.go | 8 +-- internal/pub/console.go | 32 +++++++++++ internal/pub/jetstream.go | 91 ++++++++++++++++++++++++++++++ internal/pub/pub.go | 14 +++++ internal/syncer/historical.go | 6 +- internal/syncer/syncer.go | 44 ++++++++------- 28 files changed, 338 insertions(+), 192 deletions(-) create mode 100644 dev/docker-compose.yaml delete mode 100644 internal/emitter/console.go delete mode 100644 internal/emitter/emitter.go create mode 100644 internal/event/event.go create mode 100644 internal/pub/console.go create mode 100644 internal/pub/jetstream.go create mode 100644 internal/pub/pub.go diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index 17f4618..6ddc2b3 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -16,8 +16,8 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/celo-tracker/internal/processor" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" "github.com/grassrootseconomics/celo-tracker/internal/syncer" "github.com/knadh/koanf/v2" @@ -121,9 +121,16 @@ func main() { os.Exit(1) } - defaultEmitter := emitter.New(emitter.EmitterOpts{ - Logg: lo, + jetStreamPub, err := pub.NewJetStreamPub(pub.JetStreamOpts{ + Endpoint: ko.MustString("jetstream.endpoint"), + PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, + DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, + Logg: lo, }) + if err != nil { + lo.Error("could not initialize jetstream pub", "error", err) + os.Exit(1) + } blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ Chain: chain, @@ -132,7 +139,7 @@ func main() { Stats: stats, DB: db, Cache: cache, - Emitter: defaultEmitter, + Pub: jetStreamPub, }) if ko.Bool("chain.historical") { @@ -168,6 +175,7 @@ func main() { go func() { defer wg.Done() blockProcessor.Stop() + jetStreamPub.Close() chainSyncer.StopHistoricalSyncer() chainSyncer.StopRealtime() }() diff --git a/config.toml b/config.toml index 1b5c1b0..71f11d8 100644 --- a/config.toml +++ b/config.toml @@ -9,7 +9,7 @@ ws_endpoint = "wss://ws.celo.grassecon.net" rpc_endpoint = "https://celo.grassecon.net" testnet = false realtime = true -historical = true +historical = false start_block = 25151040 batch_size = 100 @@ -20,4 +20,10 @@ ge_registries = [ "0x0cc9f4fff962def35bb34a53691180b13e653030", ] watchlist = [""] -blacklist = ["0x765DE816845861e75A25fCA122bb6898B8B1282a"] +blacklist = [""] + +[jetstream] +enable = true +endpoint = "nats://127.0.0.1:4222" +persist_duration_hrs = 48 +dedup_duration_hrs = 6 diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml new file mode 100644 index 0000000..64a565b --- /dev/null +++ b/dev/docker-compose.yaml @@ -0,0 +1,8 @@ +services: + nats: + image: nats:2 + restart: unless-stopped + command: -js -sd /tmp/nats/data -m 8222 + ports: + - 127.0.0.1:4222:4222 + - 127.0.0.1:8222:8222 \ No newline at end of file diff --git a/go.mod b/go.mod index 82bc27e..15d8cdd 100644 --- a/go.mod +++ b/go.mod @@ -53,12 +53,15 @@ require ( github.com/holiman/uint256 v1.2.4 // indirect github.com/huin/goupnp v1.0.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect - github.com/klauspost/compress v1.12.3 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lmittmann/tint v1.0.4 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/nats-io/nats.go v1.34.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onsi/gomega v1.10.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect diff --git a/go.sum b/go.sum index 1593949..2e0e81c 100644 --- a/go.sum +++ b/go.sum @@ -325,6 +325,8 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6 github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= @@ -392,6 +394,12 @@ github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= diff --git a/internal/cache/cache.go b/internal/cache/cache.go index ba2e723..7a09385 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -58,7 +58,7 @@ func New(o CacheOpts) (Cache, error) { return nil, err } // We only watch the token and pool indexes - // If at some point we want to eatch the user index, this line should be removed + // If at some point we want to watch the user index, this line should be removed cache.SetWatchableIndex(watchableIndex) for _, address := range o.Watchlist { diff --git a/internal/emitter/console.go b/internal/emitter/console.go deleted file mode 100644 index 4be9378..0000000 --- a/internal/emitter/console.go +++ /dev/null @@ -1,29 +0,0 @@ -package emitter - -import ( - "context" - "encoding/json" - "log/slog" -) - -type ( - ConsoleEmitter struct { - logg *slog.Logger - } -) - -func NewConsoleEmitter(logg *slog.Logger) *ConsoleEmitter { - return &ConsoleEmitter{ - logg: logg, - } -} - -func (l *ConsoleEmitter) Emit(_ context.Context, payload interface{}) error { - jsonData, err := json.Marshal(payload) - if err != nil { - return err - } - - l.logg.Info("emitted event", "json_payload", string(jsonData)) - return nil -} diff --git a/internal/emitter/emitter.go b/internal/emitter/emitter.go deleted file mode 100644 index 9cb4285..0000000 --- a/internal/emitter/emitter.go +++ /dev/null @@ -1,20 +0,0 @@ -package emitter - -import ( - "context" - "log/slog" -) - -type ( - Emitter interface { - Emit(context.Context, interface{}) error - } - - EmitterOpts struct { - Logg *slog.Logger - } -) - -func New(o EmitterOpts) Emitter { - return NewConsoleEmitter(o.Logg) -} diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..f62dd27 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,24 @@ +package event + +import "encoding/json" + +type ( + Event struct { + Block uint64 `json:"block"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + Timestamp uint64 `json:"timestamp"` + TxHash string `json:"transactionHash"` + TxType string `json:"transactionType"` + Payload map[string]any `json:"payload"` + } +) + +func (e Event) Serialize() ([]byte, error) { + jsonData, err := json.Marshal(e) + if err != nil { + return nil, err + } + + return jsonData, err +} diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go index 6bb6ee0..7e301ff 100644 --- a/internal/handler/faucet_give.go +++ b/internal/handler/faucet_give.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -31,7 +32,7 @@ func (h *FaucetGiveHandler) Name() string { return faucetGiveEventName } -func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == faucetGiveTopicHash { var ( recipient common.Address @@ -43,7 +44,7 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitt return err } - faucetGiveEvent := Event{ + faucetGiveEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -57,13 +58,13 @@ func (h *FaucetGiveHandler) HandleLog(ctx context.Context, msg LogMessage, emitt }, } - return emitter.Emit(ctx, faucetGiveEvent) + return pub.Send(ctx, faucetGiveEvent) } return nil } -func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -78,7 +79,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, return err } - faucetGiveEvent := Event{ + faucetGiveEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -93,9 +94,9 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, }, } - return emitter.Emit(ctx, faucetGiveEvent) + return pub.Send(ctx, faucetGiveEvent) case "de82efb4": - faucetGiveEvent := Event{ + faucetGiveEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -110,7 +111,7 @@ func (h *FaucetGiveHandler) HandleRevert(ctx context.Context, msg RevertMessage, }, } - return emitter.Emit(ctx, faucetGiveEvent) + return pub.Send(ctx, faucetGiveEvent) } return nil diff --git a/internal/handler/handler.go b/internal/handler/handler.go index f73bbd8..b83d1f8 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -5,14 +5,14 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/pub" ) type ( Handler interface { Name() string - HandleLog(context.Context, LogMessage, emitter.Emitter) error - HandleRevert(context.Context, RevertMessage, emitter.Emitter) error + HandleLog(context.Context, LogMessage, pub.Pub) error + HandleRevert(context.Context, RevertMessage, pub.Pub) error } LogMessage struct { @@ -29,16 +29,6 @@ type ( Timestamp uint64 TxHash string } - - Event struct { - Block uint64 `json:"block"` - ContractAddress string `json:"contractAddress"` - Success bool `json:"success"` - Timestamp uint64 `json:"timestamp"` - TxHash string `json:"transactionHash"` - TxType string `json:"transactionType"` - Payload map[string]any `json:"payload"` - } ) func New(cache cache.Cache) []Handler { diff --git a/internal/handler/index_add.go b/internal/handler/index_add.go index 9cfbc91..6dc83c7 100644 --- a/internal/handler/index_add.go +++ b/internal/handler/index_add.go @@ -5,7 +5,8 @@ import ( "github.com/celo-org/celo-blockchain/common" "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -32,7 +33,7 @@ func (h *IndexAddHandler) Name() string { return indexAddEventName } -func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == indexAddTopicHash { var ( address common.Address @@ -42,7 +43,7 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter return err } - indexAddEvent := Event{ + indexAddEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -58,13 +59,13 @@ func (h *IndexAddHandler) HandleLog(ctx context.Context, msg LogMessage, emitter h.cache.Add(address.Hex()) } - return emitter.Emit(ctx, indexAddEvent) + return pub.Send(ctx, indexAddEvent) } return nil } -func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -79,7 +80,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e return err } - indexAddEvent := Event{ + indexAddEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -92,7 +93,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e }, } - return emitter.Emit(ctx, indexAddEvent) + return pub.Send(ctx, indexAddEvent) case "4420e486": var ( address common.Address @@ -102,7 +103,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e return err } - indexAddEvent := Event{ + indexAddEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -115,7 +116,7 @@ func (h *IndexAddHandler) HandleRevert(ctx context.Context, msg RevertMessage, e }, } - return emitter.Emit(ctx, indexAddEvent) + return pub.Send(ctx, indexAddEvent) } return nil diff --git a/internal/handler/index_remove.go b/internal/handler/index_remove.go index 891aab5..9c7b37e 100644 --- a/internal/handler/index_remove.go +++ b/internal/handler/index_remove.go @@ -5,7 +5,8 @@ import ( "github.com/celo-org/celo-blockchain/common" "github.com/grassrootseconomics/celo-tracker/internal/cache" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -31,7 +32,7 @@ func (h *IndexRemoveHandler) Name() string { return indexRemoveEventName } -func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == indexRemoveTopicHash { var ( address common.Address @@ -41,7 +42,7 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emit return err } - indexRemoveEvent := Event{ + indexRemoveEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -57,13 +58,13 @@ func (h *IndexRemoveHandler) HandleLog(ctx context.Context, msg LogMessage, emit h.cache.Remove(address.Hex()) } - return emitter.Emit(ctx, indexRemoveEvent) + return pub.Send(ctx, indexRemoveEvent) } return nil } -func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -78,7 +79,7 @@ func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage return err } - indexRemoveEvent := Event{ + indexRemoveEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -91,7 +92,7 @@ func (h *IndexRemoveHandler) HandleRevert(ctx context.Context, msg RevertMessage }, } - return emitter.Emit(ctx, indexRemoveEvent) + return pub.Send(ctx, indexRemoveEvent) } return nil diff --git a/internal/handler/ownership.go b/internal/handler/ownership.go index af6ec7b..4710466 100644 --- a/internal/handler/ownership.go +++ b/internal/handler/ownership.go @@ -4,7 +4,8 @@ import ( "context" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -29,7 +30,7 @@ func (h *OwnershipHandler) Name() string { return ownershipEventName } -func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == ownershipTopicHash { var ( previousOwner common.Address @@ -40,7 +41,7 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitte return err } - ownershipEvent := Event{ + ownershipEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -53,13 +54,13 @@ func (h *OwnershipHandler) HandleLog(ctx context.Context, msg LogMessage, emitte }, } - return emitter.Emit(ctx, ownershipEvent) + return pub.Send(ctx, ownershipEvent) } return nil } -func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -74,7 +75,7 @@ func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage, return err } - ownershipEvent := Event{ + ownershipEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -88,7 +89,7 @@ func (h *OwnershipHandler) HandleRevert(ctx context.Context, msg RevertMessage, }, } - return emitter.Emit(ctx, ownershipEvent) + return pub.Send(ctx, ownershipEvent) } return nil diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go index 0525563..5acea19 100644 --- a/internal/handler/pool_deposit.go +++ b/internal/handler/pool_deposit.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *PoolDepositHandler) Name() string { return poolDepositEventName } -func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == poolDepositTopicHash { var ( initiator common.Address @@ -47,7 +48,7 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emit return err } - poolDepositEvent := Event{ + poolDepositEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -61,13 +62,13 @@ func (h *PoolDepositHandler) HandleLog(ctx context.Context, msg LogMessage, emit }, } - return emitter.Emit(ctx, poolDepositEvent) + return pub.Send(ctx, poolDepositEvent) } return nil } -func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -83,7 +84,7 @@ func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage return err } - poolDepositEvent := Event{ + poolDepositEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -98,7 +99,7 @@ func (h *PoolDepositHandler) HandleRevert(ctx context.Context, msg RevertMessage }, } - return emitter.Emit(ctx, poolDepositEvent) + return pub.Send(ctx, poolDepositEvent) } return nil diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go index 21f41d3..e3247f1 100644 --- a/internal/handler/pool_swap.go +++ b/internal/handler/pool_swap.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *PoolSwapHandler) Name() string { return poolSwapEventName } -func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == poolSwapTopicHash { var ( initiator common.Address @@ -53,7 +54,7 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter return err } - poolSwapEvent := Event{ + poolSwapEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -70,13 +71,13 @@ func (h *PoolSwapHandler) HandleLog(ctx context.Context, msg LogMessage, emitter }, } - return emitter.Emit(ctx, poolSwapEvent) + return pub.Send(ctx, poolSwapEvent) } return nil } -func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -93,7 +94,7 @@ func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, e return err } - poolSwapEvent := Event{ + poolSwapEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -111,7 +112,7 @@ func (h *PoolSwapHandler) HandleRevert(ctx context.Context, msg RevertMessage, e }, } - return emitter.Emit(ctx, poolSwapEvent) + return pub.Send(ctx, poolSwapEvent) } return nil diff --git a/internal/handler/quoter_price.go b/internal/handler/quoter_price.go index cf0d734..a03c859 100644 --- a/internal/handler/quoter_price.go +++ b/internal/handler/quoter_price.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *QuoterPriceHandler) Name() string { return quoterPriceEventName } -func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == quoterPriceTopicHash { var ( token common.Address @@ -41,7 +42,7 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emit return err } - quoterPriceEvent := Event{ + quoterPriceEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -54,13 +55,13 @@ func (h *QuoterPriceHandler) HandleLog(ctx context.Context, msg LogMessage, emit }, } - return emitter.Emit(ctx, quoterPriceEvent) + return pub.Send(ctx, quoterPriceEvent) } return nil } -func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -76,7 +77,7 @@ func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage return err } - quoterPriceEvent := Event{ + quoterPriceEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -90,7 +91,7 @@ func (h *QuoterPriceHandler) HandleRevert(ctx context.Context, msg RevertMessage }, } - return emitter.Emit(ctx, quoterPriceEvent) + return pub.Send(ctx, quoterPriceEvent) } return nil diff --git a/internal/handler/seal.go b/internal/handler/seal.go index 245106f..3a98e38 100644 --- a/internal/handler/seal.go +++ b/internal/handler/seal.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *SealHandler) Name() string { return sealEventName } -func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == sealTopicHash { var ( final bool @@ -41,7 +42,7 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emi return err } - sealEvent := Event{ + sealEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -54,13 +55,13 @@ func (h *SealHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emi }, } - return emitter.Emit(ctx, sealEvent) + return pub.Send(ctx, sealEvent) } return nil } -func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -75,7 +76,7 @@ func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitt return err } - sealEvent := Event{ + sealEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -88,7 +89,7 @@ func (h *SealHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitt }, } - return emitter.Emit(ctx, sealEvent) + return pub.Send(ctx, sealEvent) } return nil diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go index f70802b..da0330e 100644 --- a/internal/handler/token_burn.go +++ b/internal/handler/token_burn.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *TokenBurnHandler) Name() string { return burnEventName } -func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == tokenBurnTopicHash { var ( tokenBurner common.Address @@ -41,7 +42,7 @@ func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, emitte return err } - tokenBurnEvent := Event{ + tokenBurnEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -54,13 +55,13 @@ func (h *TokenBurnHandler) HandleLog(ctx context.Context, msg LogMessage, emitte }, } - return emitter.Emit(ctx, tokenBurnEvent) + return pub.Send(ctx, tokenBurnEvent) } return nil } -func (h *TokenBurnHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *TokenBurnHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -75,7 +76,7 @@ func (h *TokenBurnHandler) HandleRevert(ctx context.Context, msg RevertMessage, return err } - tokenBurnEvent := Event{ + tokenBurnEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -89,7 +90,7 @@ func (h *TokenBurnHandler) HandleRevert(ctx context.Context, msg RevertMessage, }, } - return emitter.Emit(ctx, tokenBurnEvent) + return pub.Send(ctx, tokenBurnEvent) } return nil diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go index bb2b5d9..1572447 100644 --- a/internal/handler/token_mint.go +++ b/internal/handler/token_mint.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -30,7 +31,7 @@ func (h *TokenMintHandler) Name() string { return mintEventName } -func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == tokenMintTopicHash { var ( tokenMinter common.Address @@ -42,7 +43,7 @@ func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, emitte return err } - tokenMintEvent := Event{ + tokenMintEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -56,13 +57,13 @@ func (h *TokenMintHandler) HandleLog(ctx context.Context, msg LogMessage, emitte }, } - return emitter.Emit(ctx, tokenMintEvent) + return pub.Send(ctx, tokenMintEvent) } return nil } -func (h *TokenMintHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *TokenMintHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -78,7 +79,7 @@ func (h *TokenMintHandler) HandleRevert(ctx context.Context, msg RevertMessage, return err } - tokenMintEvent := Event{ + tokenMintEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -93,7 +94,7 @@ func (h *TokenMintHandler) HandleRevert(ctx context.Context, msg RevertMessage, }, } - return emitter.Emit(ctx, tokenMintEvent) + return pub.Send(ctx, tokenMintEvent) } return nil diff --git a/internal/handler/token_transfer.go b/internal/handler/token_transfer.go index 1e83700..7372105 100644 --- a/internal/handler/token_transfer.go +++ b/internal/handler/token_transfer.go @@ -5,7 +5,8 @@ import ( "math/big" "github.com/celo-org/celo-blockchain/common" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/w3-celo" ) @@ -31,7 +32,7 @@ func (h *TokenTransferHandler) Name() string { return transferEventName } -func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, emitter emitter.Emitter) error { +func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, pub pub.Pub) error { if msg.Log.Topics[0] == tokenTransferTopicHash { var ( from common.Address @@ -43,7 +44,7 @@ func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, em return err } - tokenTransferEvent := Event{ + tokenTransferEvent := event.Event{ Block: msg.Log.BlockNumber, ContractAddress: msg.Log.Address.Hex(), Success: true, @@ -57,13 +58,13 @@ func (h *TokenTransferHandler) HandleLog(ctx context.Context, msg LogMessage, em }, } - return emitter.Emit(ctx, tokenTransferEvent) + return pub.Send(ctx, tokenTransferEvent) } return nil } -func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessage, emitter emitter.Emitter) error { +func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessage, pub pub.Pub) error { if len(msg.InputData) < 8 { return nil } @@ -79,7 +80,7 @@ func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessa return err } - tokenTransferEvent := Event{ + tokenTransferEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -94,7 +95,7 @@ func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessa }, } - return emitter.Emit(ctx, tokenTransferEvent) + return pub.Send(ctx, tokenTransferEvent) case "23b872dd": var ( from common.Address @@ -106,7 +107,7 @@ func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessa return err } - tokenTransferEvent := Event{ + tokenTransferEvent := event.Event{ Block: msg.Block, ContractAddress: msg.ContractAddress, Success: false, @@ -121,7 +122,7 @@ func (h *TokenTransferHandler) HandleRevert(ctx context.Context, msg RevertMessa }, } - return emitter.Emit(ctx, tokenTransferEvent) + return pub.Send(ctx, tokenTransferEvent) } return nil diff --git a/internal/processor/block.go b/internal/processor/block.go index 0bc43c8..c48b7df 100644 --- a/internal/processor/block.go +++ b/internal/processor/block.go @@ -22,10 +22,6 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { return err } - if len(receiptsResp) != len(txs) { - return fmt.Errorf("block txs receipts len mismatch %d", blockNumber) - } - for i, receipt := range receiptsResp { if receipt.Status > 0 { for _, log := range receipt.Logs { @@ -36,7 +32,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { } if err := p.handleLogs(ctx, msg); err != nil { - p.logg.Error("handler error", "handler_type", "log", "handler_name", "error", err) + return err } } } @@ -44,12 +40,12 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { if txs[i].To() != nil && p.cache.Exists(txs[i].To().Hex()) { from, err := types.Sender(types.LatestSignerForChainID(txs[i].ChainId()), &txs[i]) if err != nil { - p.logg.Error("handler error", "handler_type", "revert", "error", err) + return err } revertReason, err := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) if err != nil { - p.logg.Error("handler error", "handler_type", "revert", "error", err) + return err } msg := handler.RevertMessage{ @@ -63,7 +59,7 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { } if err := p.handleRevert(ctx, msg); err != nil { - p.logg.Error("handler error", "handler_type", "revert", "error", err) + return err } } } @@ -79,8 +75,8 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) error { for _, handler := range p.handlers { - if err := handler.HandleLog(ctx, msg, p.emitter); err != nil { - return fmt.Errorf("handler: %s err: %v", handler.Name(), err) + if err := handler.HandleLog(ctx, msg, p.pub); err != nil { + return fmt.Errorf("log handler: %s err: %v", handler.Name(), err) } } @@ -89,8 +85,8 @@ func (p *Processor) handleLogs(ctx context.Context, msg handler.LogMessage) erro func (p *Processor) handleRevert(ctx context.Context, msg handler.RevertMessage) error { for _, handler := range p.handlers { - if err := handler.HandleRevert(ctx, msg, p.emitter); err != nil { - return fmt.Errorf("handler: %s err: %v", handler.Name(), err) + if err := handler.HandleRevert(ctx, msg, p.pub); err != nil { + return fmt.Errorf("revert handler: %s err: %v", handler.Name(), err) } } diff --git a/internal/processor/processor.go b/internal/processor/processor.go index df1e0f2..2406075 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -11,9 +11,9 @@ import ( "github.com/grassrootseconomics/celo-tracker/internal/cache" "github.com/grassrootseconomics/celo-tracker/internal/chain" "github.com/grassrootseconomics/celo-tracker/internal/db" - "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/celo-tracker/internal/handler" "github.com/grassrootseconomics/celo-tracker/internal/pool" + "github.com/grassrootseconomics/celo-tracker/internal/pub" "github.com/grassrootseconomics/celo-tracker/internal/stats" ) @@ -25,7 +25,7 @@ type ( Stats *stats.Stats DB *db.DB Cache cache.Cache - Emitter emitter.Emitter + Pub pub.Pub } Processor struct { @@ -38,7 +38,7 @@ type ( quit chan struct{} handlers []handler.Handler cache cache.Cache - emitter emitter.Emitter + pub pub.Pub } ) @@ -57,7 +57,7 @@ func NewProcessor(o ProcessorOpts) *Processor { quit: make(chan struct{}), handlers: handler.New(o.Cache), cache: o.Cache, - emitter: o.Emitter, + pub: o.Pub, } } diff --git a/internal/pub/console.go b/internal/pub/console.go new file mode 100644 index 0000000..31f4806 --- /dev/null +++ b/internal/pub/console.go @@ -0,0 +1,32 @@ +package pub + +import ( + "context" + "log/slog" + + "github.com/grassrootseconomics/celo-tracker/internal/event" +) + +type ( + ConsolePub struct { + logg *slog.Logger + } +) + +func NewConsolePub(logg *slog.Logger) Pub { + return &ConsolePub{ + logg: logg, + } +} + +func (p *ConsolePub) Send(_ context.Context, payload event.Event) error { + data, err := payload.Serialize() + if err != nil { + return err + } + + p.logg.Info("emitted event", "json_payload", string(data)) + return nil +} + +func (p *ConsolePub) Close() {} diff --git a/internal/pub/jetstream.go b/internal/pub/jetstream.go new file mode 100644 index 0000000..4b610c2 --- /dev/null +++ b/internal/pub/jetstream.go @@ -0,0 +1,91 @@ +package pub + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/grassrootseconomics/celo-tracker/internal/event" + "github.com/nats-io/nats.go" +) + +type ( + JetStreamOpts struct { + Logg *slog.Logger + Endpoint string + DedupDuration time.Duration + PersistDuration time.Duration + } + + JetStreamEmitter struct { + natsConn *nats.Conn + jsCtx nats.JetStreamContext + } +) + +const ( + streamName string = "TRACKER" + streamSubjects string = "TRACKER.*" +) + +func NewJetStreamPub(o JetStreamOpts) (Pub, error) { + natsConn, err := nats.Connect(o.Endpoint) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + o.Logg.Info("successfully connected to NATS server") + + stream, err := js.StreamInfo(streamName) + if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { + return nil, err + } + if stream == nil { + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + MaxAge: o.PersistDuration, + Storage: nats.FileStorage, + Subjects: []string{streamSubjects}, + Duplicates: o.DedupDuration, + }) + if err != nil { + return nil, err + } + o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName) + } + + return &JetStreamEmitter{ + natsConn: natsConn, + jsCtx: js, + }, nil +} + +func (p *JetStreamEmitter) Close() { + if p.natsConn != nil { + p.natsConn.Close() + } +} + +func (p *JetStreamEmitter) Send(_ context.Context, payload event.Event) error { + data, err := payload.Serialize() + if err != nil { + return err + } + + _, err = p.jsCtx.Publish( + fmt.Sprintf("%s.%s", streamName, payload.TxType), + data, + nats.MsgId(fmt.Sprintf("%s:%s", payload.TxType, payload.TxHash)), + ) + if err != nil { + return err + } + + return nil +} diff --git a/internal/pub/pub.go b/internal/pub/pub.go new file mode 100644 index 0000000..5ad9c51 --- /dev/null +++ b/internal/pub/pub.go @@ -0,0 +1,14 @@ +package pub + +import ( + "context" + + "github.com/grassrootseconomics/celo-tracker/internal/event" +) + +type ( + Pub interface { + Send(context.Context, event.Event) error + Close() + } +) diff --git a/internal/syncer/historical.go b/internal/syncer/historical.go index 991bf4b..105c51e 100644 --- a/internal/syncer/historical.go +++ b/internal/syncer/historical.go @@ -76,6 +76,8 @@ func (s *Syncer) StartHistoricalSyncer() error { } func (s *Syncer) StopHistoricalSyncer() { - s.logg.Info("signaling historical syncer shutdown") - s.quit <- struct{}{} + if s.historicalEnabled { + s.logg.Info("signaling historical syncer shutdown") + s.quit <- struct{}{} + } } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 0aad43c..ff45544 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -28,17 +28,18 @@ type ( } Syncer struct { - batchQueue *deque.Deque[uint64] - blocksQueue *deque.Deque[types.Block] - chain *chain.Chain - logg *slog.Logger - stats *stats.Stats - ethClient *ethclient.Client - batchSize int - db *db.DB - quit chan struct{} - startBlock uint64 - realtimeSub celo.Subscription + batchQueue *deque.Deque[uint64] + blocksQueue *deque.Deque[types.Block] + chain *chain.Chain + logg *slog.Logger + stats *stats.Stats + ethClient *ethclient.Client + batchSize int + db *db.DB + quit chan struct{} + startBlock uint64 + realtimeSub celo.Subscription + historicalEnabled bool } ) @@ -68,15 +69,16 @@ func New(o SyncerOpts) (*Syncer, error) { } return &Syncer{ - batchQueue: o.BatchQueue, - blocksQueue: o.BlocksQueue, - chain: o.Chain, - logg: o.Logg, - stats: o.Stats, - ethClient: ethClient, - db: o.DB, - batchSize: o.BatchSize, - quit: make(chan struct{}), - startBlock: o.StartBlock, + batchQueue: o.BatchQueue, + blocksQueue: o.BlocksQueue, + chain: o.Chain, + logg: o.Logg, + stats: o.Stats, + ethClient: ethClient, + db: o.DB, + batchSize: o.BatchSize, + quit: make(chan struct{}), + startBlock: o.StartBlock, + historicalEnabled: o.EnableHistorical, }, nil }