feat: self-bootstrapping tracker, jetstream updates

* This removes redis as a hard dependency
* add profiler utils (temp)
This commit is contained in:
Mohamed Sohail 2024-10-31 14:52:51 +03:00
parent e03dabdf0f
commit 75b402bbf6
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
9 changed files with 85 additions and 93 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@ tracker_db
.idx .idx
**/*.env **/*.env
eth-tracker eth-tracker
eth-tracker-cache-bootstrap eth-tracker-cache-bootstrap
*.pprof

View File

@ -25,6 +25,7 @@ import (
"github.com/grassrootseconomics/eth-tracker/internal/syncer" "github.com/grassrootseconomics/eth-tracker/internal/syncer"
"github.com/grassrootseconomics/eth-tracker/internal/util" "github.com/grassrootseconomics/eth-tracker/internal/util"
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
"github.com/knadh/profiler"
) )
const defaultGracefulShutdownPeriod = time.Second * 30 const defaultGracefulShutdownPeriod = time.Second * 30
@ -49,6 +50,14 @@ func init() {
} }
func main() { func main() {
// PROFILE
p := profiler.New(profiler.Conf{
MemProfileRate: 1,
NoShutdownHook: true,
}, profiler.Cpu, profiler.Mem)
p.Start()
// PROFILE
var wg sync.WaitGroup var wg sync.WaitGroup
ctx, stop := notifyShutdown() ctx, stop := notifyShutdown()
@ -71,9 +80,13 @@ func main() {
} }
cache, err := cache.New(cache.CacheOpts{ cache, err := cache.New(cache.CacheOpts{
Logg: lo, Chain: chain,
CacheType: ko.MustString("core.cache_type"), Registries: ko.Strings("bootstrap.ge_registries"),
RedisDSN: ko.MustString("redis.dsn"), Watchlist: ko.Strings("bootstrap.watchlist"),
Blacklist: ko.Strings("bootstrap.blacklist"),
CacheType: ko.MustString("core.cache_type"),
RedisDSN: ko.MustString("redis.dsn"),
Logg: lo,
}) })
if err != nil { if err != nil {
lo.Error("could not initialize cache", "error", err) lo.Error("could not initialize cache", "error", err)
@ -185,6 +198,12 @@ func main() {
lo.Info("graceful shutdown routine complete") lo.Info("graceful shutdown routine complete")
}() }()
// PROFILE
runtime.GC()
p.Stop()
time.Sleep(time.Second * 10)
// PROFILE
go func() { go func() {
wg.Wait() wg.Wait()
stop() stop()

View File

@ -4,7 +4,7 @@ address = ":5001"
[core] [core]
# Use a specific cache implementation # Use a specific cache implementation
cache_type = "redis" cache_type = "internal"
# Use a specific db implementation # Use a specific db implementation
db_type = "bolt" db_type = "bolt"
# Tune max go routines that can process blocks # Tune max go routines that can process blocks
@ -28,7 +28,7 @@ start_block = 0
[bootstrap] [bootstrap]
# This will bootstrap the cache on which addresses to track # This will bootstrap the cache on which addresses to track
ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"] ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"]
watchlist = ["0x14dc79964da2c08b23698b3d3cc7ca32193d9955"] watchlist = [""]
blacklist = [""] blacklist = [""]
[jetstream] [jetstream]

2
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/env v1.0.0
github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/providers/file v1.1.2
github.com/knadh/koanf/v2 v2.1.1 github.com/knadh/koanf/v2 v2.1.1
github.com/knadh/profiler v0.2.0
github.com/lmittmann/w3 v0.17.1 github.com/lmittmann/w3 v0.17.1
github.com/nats-io/nats.go v1.36.0 github.com/nats-io/nats.go v1.36.0
github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/puzpuzpuz/xsync/v3 v3.4.0
@ -59,6 +60,7 @@ require (
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/sync v0.8.0 // indirect golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.7.0 // indirect golang.org/x/time v0.7.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect rsc.io/tmplfunc v0.0.3 // indirect

2
go.sum
View File

@ -114,6 +114,8 @@ github.com/knadh/koanf/providers/file v1.1.2 h1:aCC36YGOgV5lTtAFz2qkgtWdeQsgfxUk
github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI= github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI=
github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM=
github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es=
github.com/knadh/profiler v0.2.0 h1:jaY0xlQs8iaWxKdvGHOftaZnX7d8l7yrCGQPSecwnng=
github.com/knadh/profiler v0.2.0/go.mod h1:LqNkAu++MfFkbEDA63AmRaIf6UkGrLXyZ5VQQdekZiI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=

View File

@ -1,80 +1,38 @@
package main package cache
import ( import (
"context" "context"
"flag"
"log/slog" "log/slog"
"os" "os"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/grassrootseconomics/eth-tracker/internal/cache"
"github.com/grassrootseconomics/eth-tracker/internal/chain" "github.com/grassrootseconomics/eth-tracker/internal/chain"
"github.com/grassrootseconomics/eth-tracker/internal/util"
"github.com/grassrootseconomics/ethutils" "github.com/grassrootseconomics/ethutils"
"github.com/knadh/koanf/v2"
"github.com/lmittmann/w3" "github.com/lmittmann/w3"
"github.com/lmittmann/w3/module/eth" "github.com/lmittmann/w3/module/eth"
) )
var ( func bootstrapCache(
build = "dev" chain chain.Chain,
cache Cache,
confFlag string registries []string,
watchlist []string,
lo *slog.Logger blacklist []string,
ko *koanf.Koanf lo *slog.Logger,
) ) error {
func init() {
flag.StringVar(&confFlag, "config", "config.toml", "Config file location")
flag.Parse()
lo = util.InitLogger()
ko = util.InitConfig(lo, confFlag)
lo.Info("starting GE redis cache bootstrapper", "build", build)
}
func main() {
if err := bootstrapCache(); err != nil {
lo.Error("critical error bootstrapping cache", "error", err)
os.Exit(1)
}
}
func bootstrapCache() error {
var ( var (
tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address") tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address")
quoterGetter = w3.MustNewFunc("quoter()", "address") quoterGetter = w3.MustNewFunc("quoter()", "address")
) )
chain, err := chain.NewRPCFetcher(chain.EthRPCOpts{
RPCEndpoint: ko.MustString("chain.rpc_endpoint"),
ChainID: ko.MustInt64("chain.chainid"),
})
if err != nil {
lo.Error("could not initialize chain client", "error", err)
os.Exit(1)
}
cache, err := cache.New(cache.CacheOpts{
Logg: lo,
CacheType: ko.MustString("core.cache_type"),
RedisDSN: ko.MustString("redis.dsn"),
})
if err != nil {
lo.Error("could not initialize cache", "error", err)
os.Exit(1)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel() defer cancel()
for _, registry := range ko.MustStrings("bootstrap.ge_registries") { for _, registry := range registries {
registryMap, err := chain.Provider().RegistryMap(ctx, ethutils.HexToAddress(registry)) registryMap, err := chain.Provider().RegistryMap(ctx, ethutils.HexToAddress(registry))
if err != nil { if err != nil {
lo.Error("could not fetch registry", "error", err) lo.Error("could not fetch registry", "registry", registry, "error", err)
os.Exit(1) os.Exit(1)
} }
@ -229,12 +187,12 @@ func bootstrapCache() error {
} }
} }
for _, address := range ko.MustStrings("bootstrap.watchlist") { for _, address := range watchlist {
if err := cache.Add(ctx, ethutils.HexToAddress(address).Hex()); err != nil { if err := cache.Add(ctx, ethutils.HexToAddress(address).Hex()); err != nil {
return err return err
} }
} }
for _, address := range ko.MustStrings("bootstrap.blacklist") { for _, address := range blacklist {
if err := cache.Remove(ctx, ethutils.HexToAddress(address).Hex()); err != nil { if err := cache.Remove(ctx, ethutils.HexToAddress(address).Hex()); err != nil {
return err return err
} }

View File

@ -3,6 +3,8 @@ package cache
import ( import (
"context" "context"
"log/slog" "log/slog"
"github.com/grassrootseconomics/eth-tracker/internal/chain"
) )
type ( type (
@ -14,9 +16,13 @@ type (
} }
CacheOpts struct { CacheOpts struct {
Logg *slog.Logger RedisDSN string
RedisDSN string CacheType string
CacheType string Registries []string
Watchlist []string
Blacklist []string
Chain chain.Chain
Logg *slog.Logger
} }
) )
@ -24,7 +30,7 @@ func New(o CacheOpts) (Cache, error) {
var cache Cache var cache Cache
switch o.CacheType { switch o.CacheType {
case "map": case "internal":
cache = NewMapCache() cache = NewMapCache()
case "redis": case "redis":
redisCache, err := NewRedisCache(redisOpts{ redisCache, err := NewRedisCache(redisOpts{
@ -39,5 +45,16 @@ func New(o CacheOpts) (Cache, error) {
o.Logg.Warn("invalid cache type, using default type (map)") o.Logg.Warn("invalid cache type, using default type (map)")
} }
if err := bootstrapCache(
o.Chain,
cache,
o.Registries,
o.Watchlist,
o.Blacklist,
o.Logg,
); err != nil {
return cache, err
}
return cache, nil return cache, nil
} }

View File

@ -7,12 +7,12 @@ import (
) )
type mapCache struct { type mapCache struct {
xmap *xsync.Map xmap *xsync.MapOf[string, bool]
} }
func NewMapCache() Cache { func NewMapCache() Cache {
return &mapCache{ return &mapCache{
xmap: xsync.NewMap(), xmap: xsync.NewMapOf[string, bool](),
} }
} }

View File

@ -2,25 +2,25 @@ package pub
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"time" "time"
"github.com/grassrootseconomics/eth-tracker/pkg/event" "github.com/grassrootseconomics/eth-tracker/pkg/event"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
) )
type ( type (
JetStreamOpts struct { JetStreamOpts struct {
Logg *slog.Logger
Endpoint string Endpoint string
PersistDuration time.Duration PersistDuration time.Duration
Logg *slog.Logger
} }
jetStreamPub struct { jetStreamPub struct {
js jetstream.JetStream
natsConn *nats.Conn natsConn *nats.Conn
jsCtx nats.JetStreamContext
} }
) )
@ -36,33 +36,25 @@ func NewJetStreamPub(o JetStreamOpts) (Pub, error) {
return nil, err return nil, err
} }
js, err := natsConn.JetStream() js, err := jetstream.New(natsConn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
o.Logg.Info("successfully connected to NATS server")
stream, err := js.StreamInfo(streamName) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { defer cancel()
return nil, err
} js.CreateStream(ctx, jetstream.StreamConfig{
if stream == nil { Name: streamName,
_, err := js.AddStream(&nats.StreamConfig{ Subjects: streamSubjects,
Name: streamName, MaxAge: o.PersistDuration,
MaxAge: o.PersistDuration, Storage: jetstream.FileStorage,
Storage: nats.FileStorage, Duplicates: time.Minute,
Subjects: streamSubjects, })
Duplicates: time.Minute,
})
if err != nil {
return nil, err
}
o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName)
}
return &jetStreamPub{ return &jetStreamPub{
natsConn: natsConn, natsConn: natsConn,
jsCtx: js, js: js,
}, nil }, nil
} }
@ -72,16 +64,17 @@ func (p *jetStreamPub) Close() {
} }
} }
func (p *jetStreamPub) Send(_ context.Context, payload event.Event) error { func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error {
data, err := payload.Serialize() data, err := payload.Serialize()
if err != nil { if err != nil {
return err return err
} }
_, err = p.jsCtx.Publish( _, err = p.js.Publish(
ctx,
fmt.Sprintf("%s.%s", streamName, payload.TxType), fmt.Sprintf("%s.%s", streamName, payload.TxType),
data, data,
nats.MsgId(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)), jetstream.WithMsgID(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)),
) )
if err != nil { if err != nil {
return err return err