mirror of
				https://github.com/grassrootseconomics/eth-tracker.git
				synced 2025-11-03 18:11:53 +01:00 
			
		
		
		
	cel2: merge v1.4.0 changes
This commit is contained in:
		
						commit
						b650e44392
					
				
							
								
								
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							@ -3,4 +3,5 @@ tracker_db
 | 
			
		||||
.idx
 | 
			
		||||
**/*.env
 | 
			
		||||
eth-tracker
 | 
			
		||||
eth-tracker-cache-bootstrap
 | 
			
		||||
eth-tracker-cache-bootstrap
 | 
			
		||||
*.pprof
 | 
			
		||||
							
								
								
									
										53
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										53
									
								
								README.md
									
									
									
									
									
								
							@ -2,35 +2,45 @@
 | 
			
		||||
 | 
			
		||||

 | 
			
		||||
 | 
			
		||||
A fast and lightweight tracker designed to monitor EVM blockchains for live and historical transaction events, including reverted transactions. It filters these events and publishes them to NATS for further processing.
 | 
			
		||||
A fast and lightweight tracker designed to monitor EVM blockchains for live and
 | 
			
		||||
historical transaction events, including reverted transactions. It filters these
 | 
			
		||||
events and publishes them to NATS for further processing.
 | 
			
		||||
 | 
			
		||||
It applies deduplication at the NATS level, making it safe to run in a distributed fashion.
 | 
			
		||||
It applies deduplication at the NATS level, making it safe to run in a
 | 
			
		||||
distributed fashion.
 | 
			
		||||
 | 
			
		||||
Note: To run it against an L2/EVM chain, you will need to manually add a replace directive in the `go.mod` file pointing to the EVM chain's `*geth` compatible source code. This will allow the tracker to process transaction types other than Ethereum's `0x0, 0x1 and 0x2`.
 | 
			
		||||
Note: To run it against an L2/EVM chain, you will need to manually add a replace
 | 
			
		||||
directive in the `go.mod` file pointing to the EVM chain's `*geth` compatible
 | 
			
		||||
source code. This will allow the tracker to process transaction types other than
 | 
			
		||||
Ethereum's `0x0, 0x1 and 0x2`.
 | 
			
		||||
 | 
			
		||||
### CEL2
 | 
			
		||||
 | 
			
		||||
We maintain a CEL2 compatible tracker (source and container image) on the `cel2` branch.
 | 
			
		||||
We maintain a CEL2 compatible tracker (source and container image) on the `cel2`
 | 
			
		||||
branch.
 | 
			
		||||
 | 
			
		||||
## Getting Started
 | 
			
		||||
 | 
			
		||||
A `Makefile` is also provided to build the required binaries to run eth-tracker.
 | 
			
		||||
 | 
			
		||||
### Bootstrap Cache
 | 
			
		||||
### Cache Bootstrap
 | 
			
		||||
 | 
			
		||||
An optional binary, `eth-tracker-cache-bootstrap`, is included to build the Redis cache with all relevant Grassroots Economics smart contract and user addresses to allow filtering on very busy smart contracts e.g. cUSD.
 | 
			
		||||
During startup `eth-tracker` will always build the cache with all relevant
 | 
			
		||||
Grassroots Economics smart contract and user addresses to allow filtering on
 | 
			
		||||
very busy smart contracts e.g. cUSD.
 | 
			
		||||
 | 
			
		||||
The cache will auto-update based on any additions/removals from all indexes.
 | 
			
		||||
 | 
			
		||||
### Prerequisites
 | 
			
		||||
 | 
			
		||||
* Git
 | 
			
		||||
* Docker
 | 
			
		||||
* NATS server
 | 
			
		||||
* Redis server
 | 
			
		||||
* Access to a Celo RPC node
 | 
			
		||||
- Git
 | 
			
		||||
- Docker
 | 
			
		||||
- NATS server
 | 
			
		||||
- Redis server (Optional)
 | 
			
		||||
- Access to a Celo RPC node
 | 
			
		||||
 | 
			
		||||
See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run and deploy a single instance.
 | 
			
		||||
See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run
 | 
			
		||||
and deploy a single instance.
 | 
			
		||||
 | 
			
		||||
### 1. Build the Docker image
 | 
			
		||||
 | 
			
		||||
@ -48,9 +58,12 @@ docker images
 | 
			
		||||
### 2. Run NATS and Redis
 | 
			
		||||
 | 
			
		||||
For an example, see `dev/docker-compose.yaml`.
 | 
			
		||||
 | 
			
		||||
### 3. Update config values
 | 
			
		||||
 | 
			
		||||
See `.env.example` on how to override default values defined in `config.toml` using env variables. Alternatively, mount your own config.toml either during build time or Docker runtime.
 | 
			
		||||
See `.env.example` on how to override default values defined in `config.toml`
 | 
			
		||||
using env variables. Alternatively, mount your own config.toml either during
 | 
			
		||||
build time or Docker runtime.
 | 
			
		||||
 | 
			
		||||
```bash
 | 
			
		||||
# Override only specific config values
 | 
			
		||||
@ -58,8 +71,8 @@ nano .env.example
 | 
			
		||||
mv .env.example .env
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Refer to [`config.toml`](config.toml) to understand different config value settings.
 | 
			
		||||
 | 
			
		||||
Refer to [`config.toml`](config.toml) to understand different config value
 | 
			
		||||
settings.
 | 
			
		||||
 | 
			
		||||
### 4. Run the tracker
 | 
			
		||||
 | 
			
		||||
@ -86,7 +99,8 @@ docker compose up
 | 
			
		||||
 | 
			
		||||
### Monitoring with NATS CLI
 | 
			
		||||
 | 
			
		||||
Install NATS CLI from [here](https://github.com/nats-io/natscli?tab=readme-ov-file#installation).
 | 
			
		||||
Install NATS CLI from
 | 
			
		||||
[here](https://github.com/nats-io/natscli?tab=readme-ov-file#installation).
 | 
			
		||||
 | 
			
		||||
```bash
 | 
			
		||||
nats subscribe "TRACKER.*"
 | 
			
		||||
@ -94,8 +108,11 @@ nats subscribe "TRACKER.*"
 | 
			
		||||
 | 
			
		||||
### DB File
 | 
			
		||||
 | 
			
		||||
A `tracker_db` file is created on the first run. This keeps track of all blocks missed by the processor to attempt a retry later on. This file should not be deleted if you want to maintain resume support for historical tracking across restarts.
 | 
			
		||||
A `tracker_db` file is created on the first run. This keeps track of all blocks
 | 
			
		||||
missed by the processor to attempt a retry later on. This file should not be
 | 
			
		||||
deleted if you want to maintain resume support for historical tracking across
 | 
			
		||||
restarts.
 | 
			
		||||
 | 
			
		||||
## License
 | 
			
		||||
 | 
			
		||||
[AGPL-3.0](LICENSE).
 | 
			
		||||
[AGPL-3.0](LICENSE).
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,7 @@ import (
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/syncer"
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/util"
 | 
			
		||||
	"github.com/knadh/koanf/v2"
 | 
			
		||||
	"github.com/knadh/profiler"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const defaultGracefulShutdownPeriod = time.Second * 30
 | 
			
		||||
@ -49,6 +50,14 @@ func init() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	// PROFILE
 | 
			
		||||
	p := profiler.New(profiler.Conf{
 | 
			
		||||
		MemProfileRate: 1,
 | 
			
		||||
		NoShutdownHook: true,
 | 
			
		||||
	}, profiler.Cpu, profiler.Mem)
 | 
			
		||||
	p.Start()
 | 
			
		||||
	// PROFILE
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	ctx, stop := notifyShutdown()
 | 
			
		||||
 | 
			
		||||
@ -71,9 +80,13 @@ func main() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cache, err := cache.New(cache.CacheOpts{
 | 
			
		||||
		Logg:      lo,
 | 
			
		||||
		CacheType: ko.MustString("core.cache_type"),
 | 
			
		||||
		RedisDSN:  ko.MustString("redis.dsn"),
 | 
			
		||||
		Chain:      chain,
 | 
			
		||||
		Registries: ko.Strings("bootstrap.ge_registries"),
 | 
			
		||||
		Watchlist:  ko.Strings("bootstrap.watchlist"),
 | 
			
		||||
		Blacklist:  ko.Strings("bootstrap.blacklist"),
 | 
			
		||||
		CacheType:  ko.MustString("core.cache_type"),
 | 
			
		||||
		RedisDSN:   ko.MustString("redis.dsn"),
 | 
			
		||||
		Logg:       lo,
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Error("could not initialize cache", "error", err)
 | 
			
		||||
@ -185,6 +198,12 @@ func main() {
 | 
			
		||||
		lo.Info("graceful shutdown routine complete")
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// PROFILE
 | 
			
		||||
	runtime.GC()
 | 
			
		||||
	p.Stop()
 | 
			
		||||
	time.Sleep(time.Second * 10)
 | 
			
		||||
	// PROFILE
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
		stop()
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ address = ":5001"
 | 
			
		||||
 | 
			
		||||
[core]
 | 
			
		||||
# Use a specific cache implementation
 | 
			
		||||
cache_type = "redis"
 | 
			
		||||
cache_type = "internal"
 | 
			
		||||
# Use a specific db implementation
 | 
			
		||||
db_type = "bolt"
 | 
			
		||||
# Tune max go routines that can process blocks
 | 
			
		||||
@ -28,7 +28,7 @@ start_block = 0
 | 
			
		||||
[bootstrap]
 | 
			
		||||
# This will bootstrap the cache on which addresses to track
 | 
			
		||||
ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"]
 | 
			
		||||
watchlist = ["0x14dc79964da2c08b23698b3d3cc7ca32193d9955"]
 | 
			
		||||
watchlist = [""]
 | 
			
		||||
blacklist = [""]
 | 
			
		||||
 | 
			
		||||
[jetstream]
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.mod
									
									
									
									
									
								
							@ -15,10 +15,11 @@ require (
 | 
			
		||||
	github.com/knadh/koanf/providers/env v1.0.0
 | 
			
		||||
	github.com/knadh/koanf/providers/file v1.1.2
 | 
			
		||||
	github.com/knadh/koanf/v2 v2.1.1
 | 
			
		||||
	github.com/knadh/profiler v0.2.0
 | 
			
		||||
	github.com/lmittmann/w3 v0.17.1
 | 
			
		||||
	github.com/nats-io/nats.go v1.36.0
 | 
			
		||||
	github.com/puzpuzpuz/xsync/v3 v3.4.0
 | 
			
		||||
	github.com/redis/rueidis v1.0.47
 | 
			
		||||
	github.com/redis/rueidis v1.0.48
 | 
			
		||||
	github.com/stretchr/testify v1.9.0
 | 
			
		||||
	github.com/uptrace/bunrouter v1.0.22
 | 
			
		||||
	go.etcd.io/bbolt v1.3.11
 | 
			
		||||
@ -63,6 +64,7 @@ require (
 | 
			
		||||
	golang.org/x/mod v0.20.0 // indirect
 | 
			
		||||
	golang.org/x/sync v0.8.0 // indirect
 | 
			
		||||
	golang.org/x/sys v0.24.0 // indirect
 | 
			
		||||
	golang.org/x/text v0.17.0 // indirect
 | 
			
		||||
	golang.org/x/time v0.7.0 // indirect
 | 
			
		||||
	gopkg.in/yaml.v3 v3.0.1 // indirect
 | 
			
		||||
	rsc.io/tmplfunc v0.0.3 // indirect
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										6
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								go.sum
									
									
									
									
									
								
							@ -118,6 +118,8 @@ github.com/knadh/koanf/providers/file v1.1.2 h1:aCC36YGOgV5lTtAFz2qkgtWdeQsgfxUk
 | 
			
		||||
github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI=
 | 
			
		||||
github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM=
 | 
			
		||||
github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es=
 | 
			
		||||
github.com/knadh/profiler v0.2.0 h1:jaY0xlQs8iaWxKdvGHOftaZnX7d8l7yrCGQPSecwnng=
 | 
			
		||||
github.com/knadh/profiler v0.2.0/go.mod h1:LqNkAu++MfFkbEDA63AmRaIf6UkGrLXyZ5VQQdekZiI=
 | 
			
		||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
 | 
			
		||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
 | 
			
		||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 | 
			
		||||
@ -175,8 +177,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
 | 
			
		||||
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 | 
			
		||||
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
 | 
			
		||||
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
 | 
			
		||||
github.com/redis/rueidis v1.0.47 h1:41UdeXOo4eJuW+cfpUJuLtVGyO0QJY3A2rEYgJWlfHs=
 | 
			
		||||
github.com/redis/rueidis v1.0.47/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM=
 | 
			
		||||
github.com/redis/rueidis v1.0.48 h1:ggZHjEtc/echUmPkGTfssRisnc3p/mIUEwrpbNsZ1mQ=
 | 
			
		||||
github.com/redis/rueidis v1.0.48/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM=
 | 
			
		||||
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
 | 
			
		||||
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
 | 
			
		||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 | 
			
		||||
 | 
			
		||||
@ -1,80 +1,38 @@
 | 
			
		||||
package main
 | 
			
		||||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/ethereum/go-ethereum/common"
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/cache"
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/chain"
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/util"
 | 
			
		||||
	"github.com/grassrootseconomics/ethutils"
 | 
			
		||||
	"github.com/knadh/koanf/v2"
 | 
			
		||||
	"github.com/lmittmann/w3"
 | 
			
		||||
	"github.com/lmittmann/w3/module/eth"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	build = "dev"
 | 
			
		||||
 | 
			
		||||
	confFlag string
 | 
			
		||||
 | 
			
		||||
	lo *slog.Logger
 | 
			
		||||
	ko *koanf.Koanf
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
	lo = util.InitLogger()
 | 
			
		||||
	ko = util.InitConfig(lo, confFlag)
 | 
			
		||||
 | 
			
		||||
	lo.Info("starting GE redis cache bootstrapper", "build", build)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	if err := bootstrapCache(); err != nil {
 | 
			
		||||
		lo.Error("critical error bootstrapping cache", "error", err)
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func bootstrapCache() error {
 | 
			
		||||
func bootstrapCache(
 | 
			
		||||
	chain chain.Chain,
 | 
			
		||||
	cache Cache,
 | 
			
		||||
	registries []string,
 | 
			
		||||
	watchlist []string,
 | 
			
		||||
	blacklist []string,
 | 
			
		||||
	lo *slog.Logger,
 | 
			
		||||
) error {
 | 
			
		||||
	var (
 | 
			
		||||
		tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address")
 | 
			
		||||
		quoterGetter        = w3.MustNewFunc("quoter()", "address")
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	chain, err := chain.NewRPCFetcher(chain.EthRPCOpts{
 | 
			
		||||
		RPCEndpoint: ko.MustString("chain.rpc_endpoint"),
 | 
			
		||||
		ChainID:     ko.MustInt64("chain.chainid"),
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Error("could not initialize chain client", "error", err)
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cache, err := cache.New(cache.CacheOpts{
 | 
			
		||||
		Logg:      lo,
 | 
			
		||||
		CacheType: ko.MustString("core.cache_type"),
 | 
			
		||||
		RedisDSN:  ko.MustString("redis.dsn"),
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Error("could not initialize cache", "error", err)
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	for _, registry := range ko.MustStrings("bootstrap.ge_registries") {
 | 
			
		||||
	for _, registry := range registries {
 | 
			
		||||
		registryMap, err := chain.Provider().RegistryMap(ctx, ethutils.HexToAddress(registry))
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			lo.Error("could not fetch registry", "error", err)
 | 
			
		||||
			lo.Error("could not fetch registry", "registry", registry, "error", err)
 | 
			
		||||
			os.Exit(1)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -229,12 +187,12 @@ func bootstrapCache() error {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, address := range ko.MustStrings("bootstrap.watchlist") {
 | 
			
		||||
		for _, address := range watchlist {
 | 
			
		||||
			if err := cache.Add(ctx, ethutils.HexToAddress(address).Hex()); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		for _, address := range ko.MustStrings("bootstrap.blacklist") {
 | 
			
		||||
		for _, address := range blacklist {
 | 
			
		||||
			if err := cache.Remove(ctx, ethutils.HexToAddress(address).Hex()); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
							
								
								
									
										25
									
								
								internal/cache/cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										25
									
								
								internal/cache/cache.go
									
									
									
									
										vendored
									
									
								
							@ -3,6 +3,8 @@ package cache
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"log/slog"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/internal/chain"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
@ -14,9 +16,13 @@ type (
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	CacheOpts struct {
 | 
			
		||||
		Logg      *slog.Logger
 | 
			
		||||
		RedisDSN  string
 | 
			
		||||
		CacheType string
 | 
			
		||||
		RedisDSN   string
 | 
			
		||||
		CacheType  string
 | 
			
		||||
		Registries []string
 | 
			
		||||
		Watchlist  []string
 | 
			
		||||
		Blacklist  []string
 | 
			
		||||
		Chain      chain.Chain
 | 
			
		||||
		Logg       *slog.Logger
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -24,7 +30,7 @@ func New(o CacheOpts) (Cache, error) {
 | 
			
		||||
	var cache Cache
 | 
			
		||||
 | 
			
		||||
	switch o.CacheType {
 | 
			
		||||
	case "map":
 | 
			
		||||
	case "internal":
 | 
			
		||||
		cache = NewMapCache()
 | 
			
		||||
	case "redis":
 | 
			
		||||
		redisCache, err := NewRedisCache(redisOpts{
 | 
			
		||||
@ -39,5 +45,16 @@ func New(o CacheOpts) (Cache, error) {
 | 
			
		||||
		o.Logg.Warn("invalid cache type, using default type (map)")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := bootstrapCache(
 | 
			
		||||
		o.Chain,
 | 
			
		||||
		cache,
 | 
			
		||||
		o.Registries,
 | 
			
		||||
		o.Watchlist,
 | 
			
		||||
		o.Blacklist,
 | 
			
		||||
		o.Logg,
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		return cache, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return cache, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										4
									
								
								internal/cache/xmap.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								internal/cache/xmap.go
									
									
									
									
										vendored
									
									
								
							@ -7,12 +7,12 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type mapCache struct {
 | 
			
		||||
	xmap *xsync.Map
 | 
			
		||||
	xmap *xsync.MapOf[string, bool]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMapCache() Cache {
 | 
			
		||||
	return &mapCache{
 | 
			
		||||
		xmap: xsync.NewMap(),
 | 
			
		||||
		xmap: xsync.NewMapOf[string, bool](),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -2,25 +2,25 @@ package pub
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log/slog"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/eth-tracker/pkg/event"
 | 
			
		||||
	"github.com/nats-io/nats.go"
 | 
			
		||||
	"github.com/nats-io/nats.go/jetstream"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	JetStreamOpts struct {
 | 
			
		||||
		Logg            *slog.Logger
 | 
			
		||||
		Endpoint        string
 | 
			
		||||
		PersistDuration time.Duration
 | 
			
		||||
		Logg            *slog.Logger
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	jetStreamPub struct {
 | 
			
		||||
		js       jetstream.JetStream
 | 
			
		||||
		natsConn *nats.Conn
 | 
			
		||||
		jsCtx    nats.JetStreamContext
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -36,33 +36,25 @@ func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	js, err := natsConn.JetStream()
 | 
			
		||||
	js, err := jetstream.New(natsConn)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	o.Logg.Info("successfully connected to NATS server")
 | 
			
		||||
 | 
			
		||||
	stream, err := js.StreamInfo(streamName)
 | 
			
		||||
	if err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if stream == nil {
 | 
			
		||||
		_, err := js.AddStream(&nats.StreamConfig{
 | 
			
		||||
			Name:       streamName,
 | 
			
		||||
			MaxAge:     o.PersistDuration,
 | 
			
		||||
			Storage:    nats.FileStorage,
 | 
			
		||||
			Subjects:   streamSubjects,
 | 
			
		||||
			Duplicates: time.Minute,
 | 
			
		||||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName)
 | 
			
		||||
	}
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	js.CreateStream(ctx, jetstream.StreamConfig{
 | 
			
		||||
		Name:       streamName,
 | 
			
		||||
		Subjects:   streamSubjects,
 | 
			
		||||
		MaxAge:     o.PersistDuration,
 | 
			
		||||
		Storage:    jetstream.FileStorage,
 | 
			
		||||
		Duplicates: time.Minute,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return &jetStreamPub{
 | 
			
		||||
		natsConn: natsConn,
 | 
			
		||||
		jsCtx:    js,
 | 
			
		||||
		js:       js,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -72,16 +64,17 @@ func (p *jetStreamPub) Close() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *jetStreamPub) Send(_ context.Context, payload event.Event) error {
 | 
			
		||||
func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
 | 
			
		||||
	data, err := payload.Serialize()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = p.jsCtx.Publish(
 | 
			
		||||
	_, err = p.js.Publish(
 | 
			
		||||
		ctx,
 | 
			
		||||
		fmt.Sprintf("%s.%s", streamName, payload.TxType),
 | 
			
		||||
		data,
 | 
			
		||||
		nats.MsgId(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)),
 | 
			
		||||
		jetstream.WithMsgID(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)),
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user