diff --git a/cmd/service/main.go b/cmd/service/main.go index c29c20f..a0dc559 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -13,10 +13,13 @@ import ( "time" "github.com/grassrootseconomics/eth-indexer/internal/api" + "github.com/grassrootseconomics/eth-indexer/internal/cache" "github.com/grassrootseconomics/eth-indexer/internal/handler" "github.com/grassrootseconomics/eth-indexer/internal/store" "github.com/grassrootseconomics/eth-indexer/internal/sub" + "github.com/grassrootseconomics/eth-indexer/internal/telegram" "github.com/grassrootseconomics/eth-indexer/internal/util" + "github.com/grassrootseconomics/ethutils" "github.com/knadh/koanf/v2" ) @@ -60,14 +63,31 @@ func main() { os.Exit(1) } - handler := handler.NewHandler(handler.HandlerOpts{ - Store: store, + cache := cache.New() + + chainProvider := ethutils.NewProvider( + ko.MustString("chain.rpc_endpoint"), + ko.MustInt64("chain.chainid"), + ) + + telegram := telegram.New(telegram.TelegramOpts{ + BotToken: ko.MustString("telegram.bot_token"), + NotificationChannel: ko.MustInt64("telegram.notification_channel"), }) + handlerContainer := handler.NewHandler(handler.HandlerOpts{ + Store: store, + Cache: cache, + ChainProvider: chainProvider, + Telegram: telegram, + Logg: lo, + }) + + router := bootstrapRouter(handlerContainer) + jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ Logg: lo, - Store: store, - Handler: handler, + Router: router, Endpoint: ko.MustString("jetstream.endpoint"), JetStreamID: ko.MustString("jetstream.id"), }) diff --git a/cmd/service/router.go b/cmd/service/router.go new file mode 100644 index 0000000..fc8757b --- /dev/null +++ b/cmd/service/router.go @@ -0,0 +1,43 @@ +package main + +import ( + "github.com/grassrootseconomics/eth-indexer/internal/handler" + "github.com/grassrootseconomics/eth-indexer/pkg/router" +) + +func bootstrapRouter(handlerContainer *handler.Handler) *router.Router { + router := router.New() + + router.RegisterRoute( + "TRACKER.TOKEN_TRANSFER", + handlerContainer.IndexTransfer, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.TOKEN_MINT", + handlerContainer.IndexTokenMint, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.TOKEN_BURN", + handlerContainer.IndexTokenMint, + handlerContainer.AddToken, + ) + router.RegisterRoute( + "TRACKER.POOL_SWAP", + handlerContainer.IndexPoolSwap, + handlerContainer.AddPool, + ) + router.RegisterRoute( + "TRACKER.POOL_DEPOSIT", + handlerContainer.IndexPoolDeposit, + handlerContainer.AddPool, + ) + router.RegisterRoute( + "TRACKER.FAUCET_GIVE", + handlerContainer.IndexFaucetGive, + handlerContainer.FaucetHealthCheck, + ) + + return router +} diff --git a/go.mod b/go.mod index 517b2b6..4bf5a98 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,8 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect github.com/google/uuid v1.6.0 // indirect @@ -45,24 +47,34 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/kalbhor/tasqueue/v2 v2.2.1 // indirect github.com/klauspost/compress v1.17.2 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/lmittmann/tint v1.0.4 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/mmcloughlin/addchain v0.4.0 // indirect + github.com/mr-linch/go-tg v0.15.0 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect + github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/shopspring/decimal v1.4.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/cast v1.7.0 // indirect github.com/supranational/blst v0.3.11 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect + github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + go.opentelemetry.io/otel v1.9.0 // indirect + go.opentelemetry.io/otel/sdk v1.9.0 // indirect + go.opentelemetry.io/otel/trace v1.9.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/go.sum b/go.sum index e668124..315bec4 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,11 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= @@ -112,6 +117,8 @@ github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFr github.com/jackc/tern/v2 v2.2.3 h1:UWD24+m3zP7eRSlX9vYg2tb6Bf0V161IdOuo4YWWyd4= github.com/jackc/tern/v2 v2.2.3/go.mod h1:EStqJVUowhII9OpCTcZISE1BfpGlwE4oq0oQtHAGuuI= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/kalbhor/tasqueue/v2 v2.2.1 h1:jSRcbPYdF1qMeuizgdcVBCcFKx0fInMOxTPHoc6A//w= +github.com/kalbhor/tasqueue/v2 v2.2.1/go.mod h1:OOPWDU65QhGlzq9fpyW2pBvXrsPzpHiVBtrIaDgn+Rc= github.com/kamikazechaser/common v0.2.0 h1:bqi5UaMTDm/wtZlJEvQDNhsLVJP4Beg+HKWeQ+dhpss= github.com/kamikazechaser/common v0.2.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= @@ -153,6 +160,8 @@ github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= +github.com/mr-linch/go-tg v0.15.0 h1:OKI+SAk9aT3sU9iGNO1Hd209pyE8ILwiblfeVFeszoA= +github.com/mr-linch/go-tg v0.15.0/go.mod h1:poS5Fu8a76VHcoG0QOjYzytuMIs0570DkITujOtdOI8= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -176,18 +185,25 @@ github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuI github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= +github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible h1:Bn1aCHHRnjv4Bl16T8rcaFjYSrGrIZvpiGO6P3Q4GpU= github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= @@ -205,6 +221,16 @@ github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ= github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ= github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw= +go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo= +go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo= +go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4= +go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc= +go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..d22a203 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,26 @@ +package cache + +import "github.com/puzpuzpuz/xsync/v3" + +type Cache struct { + provider *xsync.MapOf[string, bool] +} + +func New() *Cache { + return &Cache{ + provider: xsync.NewMapOf[string, bool](), + } +} + +func (c *Cache) Set(key string) { + c.provider.Store(key, true) +} + +func (c *Cache) Get(key string) bool { + v, _ := c.provider.Load(key) + return v +} + +func (c *Cache) Size() int { + return c.provider.Size() +} diff --git a/internal/handler/add_contract.go b/internal/handler/add_contract.go new file mode 100644 index 0000000..19c50f1 --- /dev/null +++ b/internal/handler/add_contract.go @@ -0,0 +1,67 @@ +package handler + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" +) + +var ( + nameGetter = w3.MustNewFunc("name()", "string") + symbolGetter = w3.MustNewFunc("symbol()", "string") + decimalsGetter = w3.MustNewFunc("decimals()", "uint8") + sinkAddressGetter = w3.MustNewFunc("sinkAddress", "address") +) + +func (h *Handler) AddToken(ctx context.Context, event event.Event) error { + if h.cache.Get(event.ContractAddress) { + return nil + } + + var ( + tokenName string + tokenSymbol string + tokenDecimals uint8 + sinkAddress common.Address + ) + + contractAddress := w3.A(event.ContractAddress) + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), + eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), + eth.CallFunc(contractAddress, decimalsGetter).Returns(&tokenDecimals), + eth.CallFunc(contractAddress, sinkAddressGetter).Returns(&sinkAddress), + ); err != nil { + return err + } + + return h.store.InsertToken(ctx, event.ContractAddress, tokenName, tokenSymbol, tokenDecimals, sinkAddress.Hex()) +} + +func (h *Handler) AddPool(ctx context.Context, event event.Event) error { + if h.cache.Get(event.ContractAddress) { + return nil + } + + var ( + tokenName string + tokenSymbol string + ) + + contractAddress := w3.A(event.ContractAddress) + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.CallFunc(contractAddress, nameGetter).Returns(&tokenName), + eth.CallFunc(contractAddress, symbolGetter).Returns(&tokenSymbol), + ); err != nil { + return err + } + + return h.store.InsertPool(ctx, event.ContractAddress, tokenName, tokenSymbol) +} diff --git a/internal/handler/faucet_give.go b/internal/handler/faucet_give.go new file mode 100644 index 0000000..5a136d4 --- /dev/null +++ b/internal/handler/faucet_give.go @@ -0,0 +1,35 @@ +package handler + +import ( + "context" + "fmt" + "math/big" + + "github.com/grassrootseconomics/eth-indexer/internal/telegram" + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" +) + +const balanceThreshold = 50 + +func (h *Handler) IndexFaucetGive(ctx context.Context, event event.Event) error { + return h.store.InsertFaucetGive(ctx, event) +} + +func (h *Handler) FaucetHealthCheck(ctx context.Context, event event.Event) error { + var balance *big.Int + + if err := h.chainProvider.Client.CallCtx( + ctx, + eth.Balance(w3.A(event.ContractAddress), nil).Returns(&balance), + ); err != nil { + return err + } + + if balance.Cmp(new(big.Int).Mul(w3.BigEther, big.NewInt(balanceThreshold))) < 0 { + return h.telegram.Notify(ctx, fmt.Sprintf("%s: %s", event.ContractAddress, telegram.NOTIFY_LOW_BALANCE_ON_GAS_FAUCET)) + } + + return nil +} diff --git a/internal/handler/handler.go b/internal/handler/handler.go index cec1ab2..79f3505 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -1,52 +1,38 @@ package handler import ( - "context" - "encoding/json" + "log/slog" + "github.com/grassrootseconomics/eth-indexer/internal/cache" "github.com/grassrootseconomics/eth-indexer/internal/store" - "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/grassrootseconomics/eth-indexer/internal/telegram" + "github.com/grassrootseconomics/ethutils" ) type ( HandlerOpts struct { - Store store.Store + Store store.Store + Cache *cache.Cache + ChainProvider *ethutils.Provider + Telegram *telegram.Telegram + Logg *slog.Logger } Handler struct { - store store.Store + store store.Store + cache *cache.Cache + chainProvider *ethutils.Provider + telegram *telegram.Telegram + logg *slog.Logger } ) func NewHandler(o HandlerOpts) *Handler { return &Handler{ - store: o.Store, + store: o.Store, + cache: o.Cache, + chainProvider: o.ChainProvider, + telegram: o.Telegram, + logg: o.Logg, } } - -func (h *Handler) Handle(ctx context.Context, msgSubject string, msgData []byte) error { - var chainEvent event.Event - - if err := json.Unmarshal(msgData, &chainEvent); err != nil { - return err - } - - switch msgSubject { - case "TRACKER.TOKEN_TRANSFER": - return h.store.InsertTokenTransfer(ctx, chainEvent) - case "TRACKER.POOL_SWAP": - return h.store.InsertPoolSwap(ctx, chainEvent) - case "TRACKER.FAUCET_GIVE": - return h.store.InsertFaucetGive(ctx, chainEvent) - case "TRACKER.POOL_DEPOSIT": - return h.store.InsertPoolDeposit(ctx, chainEvent) - case "TRACKER.TOKEN_MINT": - return h.store.InsertTokenMint(ctx, chainEvent) - case "TRACKER.TOKEN_BURN": - return h.store.InsertTokenBurn(ctx, chainEvent) - case "TRACKER.QUOTER_PRICE_INDEX_UPDATED": - return h.store.InsertPriceQuoteUpdate(ctx, chainEvent) - } - - return nil -} diff --git a/internal/handler/pool_deposit.go b/internal/handler/pool_deposit.go new file mode 100644 index 0000000..50d4cc3 --- /dev/null +++ b/internal/handler/pool_deposit.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexPoolDeposit(ctx context.Context, event event.Event) error { + return h.store.InsertPoolDeposit(ctx, event) +} diff --git a/internal/handler/pool_swap.go b/internal/handler/pool_swap.go new file mode 100644 index 0000000..a19317b --- /dev/null +++ b/internal/handler/pool_swap.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexPoolSwap(ctx context.Context, event event.Event) error { + return h.store.InsertPoolSwap(ctx, event) +} diff --git a/internal/handler/token_burn.go b/internal/handler/token_burn.go new file mode 100644 index 0000000..ab15e6a --- /dev/null +++ b/internal/handler/token_burn.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTokenBurn(ctx context.Context, event event.Event) error { + return h.store.InsertTokenBurn(ctx, event) +} diff --git a/internal/handler/token_mint.go b/internal/handler/token_mint.go new file mode 100644 index 0000000..f32fe9c --- /dev/null +++ b/internal/handler/token_mint.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTokenMint(ctx context.Context, event event.Event) error { + return h.store.InsertTokenMint(ctx, event) +} diff --git a/internal/handler/transfer.go b/internal/handler/transfer.go new file mode 100644 index 0000000..323fbcf --- /dev/null +++ b/internal/handler/transfer.go @@ -0,0 +1,11 @@ +package handler + +import ( + "context" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" +) + +func (h *Handler) IndexTransfer(ctx context.Context, event event.Event) error { + return h.store.InsertTokenTransfer(ctx, event) +} diff --git a/internal/store/pg.go b/internal/store/pg.go index be30992..820ffc2 100644 --- a/internal/store/pg.go +++ b/internal/store/pg.go @@ -29,14 +29,15 @@ type ( } queries struct { - InsertTx string `query:"insert-tx"` - InsertTokenTransfer string `query:"insert-token-transfer"` - InsertTokenMint string `query:"insert-token-mint"` - InsertTokenBurn string `query:"insert-token-burn"` - InsertFaucetGive string `query:"insert-faucet-give"` - InsertPoolSwap string `query:"insert-pool-swap"` - InsertPoolDeposit string `query:"insert-pool-deposit"` - InsertPriceQuoteUpdate string `query:"insert-price-quote-update"` + InsertTx string `query:"insert-tx"` + InsertTokenTransfer string `query:"insert-token-transfer"` + InsertTokenMint string `query:"insert-token-mint"` + InsertTokenBurn string `query:"insert-token-burn"` + InsertFaucetGive string `query:"insert-faucet-give"` + InsertPoolSwap string `query:"insert-pool-swap"` + InsertPoolDeposit string `query:"insert-pool-deposit"` + InsertToken string `query:"insert-token"` + InsertPool string `query:"insert-pool"` } ) @@ -198,20 +199,29 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e }) } -func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error { +func (pg *Pg) InsertToken(ctx context.Context, contractAddress string, name string, symbol string, decimals uint8, sinkAddress string) error { return pg.executeTransaction(ctx, func(tx pgx.Tx) error { - txID, err := pg.insertTx(ctx, tx, eventPayload) - if err != nil { - return err - } - - _, err = tx.Exec( + _, err := tx.Exec( ctx, - pg.queries.InsertPriceQuoteUpdate, - txID, - eventPayload.Payload["token"].(string), - eventPayload.Payload["exchangeRate"].(string), - eventPayload.ContractAddress, + pg.queries.InsertToken, + contractAddress, + name, + symbol, + decimals, + sinkAddress, + ) + return err + }) +} + +func (pg *Pg) InsertPool(ctx context.Context, contractAddress string, name string, symbol string) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + _, err := tx.Exec( + ctx, + pg.queries.InsertPool, + contractAddress, + name, + symbol, ) return err }) diff --git a/internal/store/store.go b/internal/store/store.go index fe57a3d..8beab2a 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -15,7 +15,8 @@ type ( InsertFaucetGive(context.Context, event.Event) error InsertPoolSwap(context.Context, event.Event) error InsertPoolDeposit(context.Context, event.Event) error - InsertPriceQuoteUpdate(context.Context, event.Event) error + InsertToken(context.Context, string, string, string, uint8, string) error + InsertPool(context.Context, string, string, string) error Pool() *pgxpool.Pool Close() } diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go index 147156e..126f2da 100644 --- a/internal/sub/jetstream.go +++ b/internal/sub/jetstream.go @@ -6,27 +6,24 @@ import ( "log/slog" "time" - "github.com/grassrootseconomics/eth-indexer/internal/handler" - "github.com/grassrootseconomics/eth-indexer/internal/store" + "github.com/grassrootseconomics/eth-indexer/pkg/router" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) type ( JetStreamOpts struct { - Store store.Store - Logg *slog.Logger - Handler *handler.Handler Endpoint string JetStreamID string + Logg *slog.Logger + Router *router.Router } JetStreamSub struct { jsConsumer jetstream.Consumer - store store.Store - handler *handler.Handler - natsConn *nats.Conn logg *slog.Logger + natsConn *nats.Conn + router *router.Router durableID string } ) @@ -66,8 +63,7 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) { return &JetStreamSub{ jsConsumer: consumer, - store: o.Store, - handler: o.Handler, + router: o.Router, natsConn: natsConn, logg: o.Logg, durableID: o.JetStreamID, @@ -81,10 +77,17 @@ func (s *JetStreamSub) Close() { } func (s *JetStreamSub) Process() error { + iter, err := s.jsConsumer.Messages(jetstream.WithMessagesErrOnMissingHeartbeat(false)) + if err != nil { + return err + } + defer iter.Stop() + for { - events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second)) + msg, err := iter.Next() if err != nil { if errors.Is(err, nats.ErrTimeout) { + s.logg.Error("jetstream: iter fetch timeout") continue } else if errors.Is(err, nats.ErrConnectionClosed) { return nil @@ -93,13 +96,8 @@ func (s *JetStreamSub) Process() error { } } - for msg := range events.Messages() { - if err := s.handler.Handle(context.Background(), msg.Subject(), msg.Data()); err != nil { - s.logg.Error("error processing nats message", "error", err) - msg.Nak() - } else { - msg.Ack() - } + if err := s.router.Handle(context.Background(), msg); err != nil { + s.logg.Error("router: error processing nats message", "error", err) } } } diff --git a/internal/sub/sub.go b/internal/sub/sub.go index 18a6067..6bb5fd5 100644 --- a/internal/sub/sub.go +++ b/internal/sub/sub.go @@ -1,8 +1,6 @@ package sub -type ( - Sub interface { - Process() error - Close() - } -) +type Sub interface { + Process() error + Close() +} diff --git a/internal/telegram/telegram.go b/internal/telegram/telegram.go new file mode 100644 index 0000000..17515f4 --- /dev/null +++ b/internal/telegram/telegram.go @@ -0,0 +1,37 @@ +package telegram + +import ( + "context" + + "github.com/mr-linch/go-tg" +) + +type ( + TelegramOpts struct { + BotToken string + NotificationChannel int64 + } + + Telegram struct { + client *tg.Client + notificaationChannel int64 + } +) + +const ( + NOTIFY_LOW_BALANCE_ON_GAS_FAUCET = ` + Gas faucet balance is low. Top is required soon!` +) + +func New(o TelegramOpts) *Telegram { + return &Telegram{ + client: tg.New(o.BotToken, nil), + notificaationChannel: o.NotificationChannel, + } +} + +func (t *Telegram) Notify(ctx context.Context, message string) error { + _, err := t.client.SendMessage(tg.ChatID(t.notificaationChannel), message).Do(ctx) + return err + +} diff --git a/migrations/001_indexer_base.sql b/migrations/001_indexer_base.sql index f490459..35a45e3 100644 --- a/migrations/001_indexer_base.sql +++ b/migrations/001_indexer_base.sql @@ -2,7 +2,6 @@ CREATE TABLE IF NOT EXISTS tx ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, tx_hash VARCHAR(66) NOT NULL UNIQUE, block_number INT NOT NULL, - contract_address VARCHAR(42) NOT NULL, date_block TIMESTAMP NOT NULL, success BOOLEAN NOT NULL ); @@ -12,6 +11,7 @@ CREATE TABLE IF NOT EXISTS token_transfer ( tx_id INT REFERENCES tx(id), sender_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', transfer_value NUMERIC NOT NULL ); @@ -20,6 +20,7 @@ CREATE TABLE IF NOT EXISTS token_mint ( tx_id INT REFERENCES tx(id), minter_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', mint_value NUMERIC NOT NULL ); @@ -27,6 +28,7 @@ CREATE TABLE IF NOT EXISTS token_burn ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, tx_id INT REFERENCES tx(id), burner_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', burn_value NUMERIC NOT NULL ); @@ -35,6 +37,7 @@ CREATE TABLE IF NOT EXISTS faucet_give ( tx_id INT REFERENCES tx(id), token_address VARCHAR(42) NOT NULL, recipient_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', give_value NUMERIC NOT NULL ); @@ -47,6 +50,7 @@ CREATE TABLE IF NOT EXISTS pool_swap ( token_out_address VARCHAR(42) NOT NULL, in_value NUMERIC NOT NULL, out_value NUMERIC NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', fee NUMERIC NOT NULL ); @@ -55,29 +59,22 @@ CREATE TABLE IF NOT EXISTS pool_deposit ( tx_id INT REFERENCES tx(id), initiator_address VARCHAR(42) NOT NULL, token_in_address VARCHAR(42) NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', in_value NUMERIC NOT NULL ); -CREATE TABLE IF NOT EXISTS price_index_updates ( - id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - tx_id INT REFERENCES tx(id), - token VARCHAR(42) NOT NULL, - exchange_rate NUMERIC NOT NULL -); - -CREATE TABLE IF NOT EXISTS contracts ( - id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) UNIQUE NOT NULL, - contract_description TEXT NOT NULL, - is_token BOOLEAN NOT NULL -); - CREATE TABLE IF NOT EXISTS tokens ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, - contract_address VARCHAR(42) UNIQUE NOT NULL, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', token_name TEXT NOT NULL, token_symbol TEXT NOT NULL, token_decimals INT NOT NULL, - token_version TEXT NOT NULL, - token_type TEXT NOT NULL + sink_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000' +); + +CREATE TABLE IF NOT EXISTS pools ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + contract_address VARCHAR(42) NOT NULL DEFAULT '0x0000000000000000000000000000000000000000', + pool_name TEXT NOT NULL, + pool_symbol TEXT NOT NULL ); \ No newline at end of file diff --git a/migrations/002_fix_contract_address.sql b/migrations/002_fix_contract_address.sql deleted file mode 100644 index 64f9fe1..0000000 --- a/migrations/002_fix_contract_address.sql +++ /dev/null @@ -1,29 +0,0 @@ -ALTER TABLE tx DROP COLUMN contract_address; - -ALTER TABLE token_transfer ADD COLUMN contract_address VARCHAR(42); -UPDATE token_transfer SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_transfer ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE token_mint ADD COLUMN contract_address VARCHAR(42); -UPDATE token_mint SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_mint ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE token_burn ADD COLUMN contract_address VARCHAR(42); -UPDATE token_burn SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE token_burn ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE faucet_give ADD COLUMN contract_address VARCHAR(42); -UPDATE faucet_give SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE faucet_give ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE pool_swap ADD COLUMN contract_address VARCHAR(42); -UPDATE pool_swap SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE pool_swap ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE pool_deposit ADD COLUMN contract_address VARCHAR(42); -UPDATE pool_deposit SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE pool_deposit ALTER COLUMN contract_address SET NOT NULL; - -ALTER TABLE price_index_updates ADD COLUMN contract_address VARCHAR(42); -UPDATE price_index_updates SET contract_address = '0x0000000000000000000000000000000000000000'; -ALTER TABLE price_index_updates ALTER COLUMN contract_address SET NOT NULL; \ No newline at end of file diff --git a/migrations/003_fix_tokens_constraint.sql b/migrations/003_fix_tokens_constraint.sql deleted file mode 100644 index 4d838a0..0000000 --- a/migrations/003_fix_tokens_constraint.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE tokens ALTER token_version DROP NOT NULL, ALTER token_type DROP NOT NULL; \ No newline at end of file diff --git a/pkg/router/router.go b/pkg/router/router.go new file mode 100644 index 0000000..bcdfeeb --- /dev/null +++ b/pkg/router/router.go @@ -0,0 +1,56 @@ +package router + +import ( + "context" + "encoding/json" + "log/slog" + + "github.com/grassrootseconomics/eth-tracker/pkg/event" + "github.com/nats-io/nats.go/jetstream" + "github.com/sourcegraph/conc/pool" +) + +type ( + HandlerFunc func(context.Context, event.Event) error + + Router struct { + logg *slog.Logger + handlers map[string][]HandlerFunc + } +) + +func New() *Router { + return &Router{ + handlers: make(map[string][]HandlerFunc), + } +} + +func (r *Router) RegisterRoute(subject string, handlerFunc ...HandlerFunc) { + r.handlers[subject] = handlerFunc +} + +func (r *Router) Handle(ctx context.Context, msg jetstream.Msg) error { + handlers, ok := r.handlers[msg.Subject()] + if !ok { + return nil + } + + var chainEvent event.Event + if err := json.Unmarshal(msg.Data(), &chainEvent); err != nil { + return err + } + + p := pool.New().WithErrors() + + for _, handler := range handlers { + p.Go(func() error { + return handler(ctx, chainEvent) + }) + } + + if err := p.Wait(); err != nil { + return msg.Nak() + } + + return msg.Ack() +} diff --git a/queries.sql b/queries.sql index 2c7998d..842b213 100644 --- a/queries.sql +++ b/queries.sql @@ -104,14 +104,48 @@ INSERT INTO pool_deposit( contract_address ) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING ---name: insert-price-quote-update --- $1: tx_id --- $2: token --- $3: exchange_rate --- $4: contract_address -INSERT INTO price_index_updates( - tx_id, - token, - exchange_rate, - contract_address -) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING +--name: insert-token +-- $1: contract_address +-- $2: token_name +-- $3: token_symbol +-- $4: token_decimals +-- $5: sink_address +INSERT INTO tokens( + contract_address, + token_name, + token_symbol, + token_decimals, + sink_address +) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING + +--name: insert-pool +-- $1: contract_address +-- $2: pool_name +-- $3: pool_symbol +INSERT INTO tokens( + contract_address, + pool_name, + pool_symbol +) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING + +--name: last-10-tx +-- Fetches an account's last 10 transfers +-- $1: public_key +SELECT +token_transfer.sender_address AS sender, token_transfer.recipient_address AS recipient, token_transfer.transfer_value, token_transfer.contract_address, +tx.tx_hash, tx.date_block, +tokens.token_symbol, tokens.token_decimals +FROM token_transfer +INNER JOIN tx ON token_transfer.tx_id = tx.id +INNER JOIN tokens ON token_transfer.contract_address = tokens.contract_address +WHERE token_transfer.sender_address = $1 OR token_transfer.recipient_address = $1 +ORDER BY tx.date_block DESC +LIMIT 10; + +--name: token-holdings +-- Fetches an account's token holdings +-- $1: public_key +SELECT DISTINCT tokens.token_symbol, tokens.contract_address, tokens.token_decimals FROM tokens +INNER JOIN token_transfer on tokens.contract_address = token_transfer.contract_address +WHERE token_transfer.sender_address = $1 +OR token_transfer.recipient_address = $1; \ No newline at end of file