From d0f21b2fddcedd695055feadc29988d797f12113 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Tue, 23 Apr 2024 19:33:05 +0800 Subject: [PATCH] init: wip indexer --- cmd/indexer/init.go | 53 ++++++ cmd/indexer/main.go | 105 ++++++++++++ config.toml | 11 ++ dev/docker-compose.yaml | 22 +++ dev/init_db.sql | 1 + go.mod | 43 +++++ go.sum | 143 ++++++++++++++++ internal/event/event.go | 36 ++++ internal/store/pg.go | 287 ++++++++++++++++++++++++++++++++ internal/store/store.go | 16 ++ internal/sub/jetstream.go | 134 +++++++++++++++ internal/sub/sub.go | 8 + migrations/001_indexer_base.sql | 76 +++++++++ queries.sql | 76 +++++++++ 14 files changed, 1011 insertions(+) create mode 100644 cmd/indexer/init.go create mode 100644 cmd/indexer/main.go create mode 100644 config.toml create mode 100644 dev/docker-compose.yaml create mode 100644 dev/init_db.sql create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/event/event.go create mode 100644 internal/store/pg.go create mode 100644 internal/store/store.go create mode 100644 internal/sub/jetstream.go create mode 100644 internal/sub/sub.go create mode 100644 migrations/001_indexer_base.sql create mode 100644 queries.sql diff --git a/cmd/indexer/init.go b/cmd/indexer/init.go new file mode 100644 index 0000000..e8df0a8 --- /dev/null +++ b/cmd/indexer/init.go @@ -0,0 +1,53 @@ +package main + +import ( + "log/slog" + "os" + "strings" + + "github.com/kamikazechaser/common/logg" + "github.com/knadh/koanf/parsers/toml" + "github.com/knadh/koanf/providers/env" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/v2" +) + +func initLogger() *slog.Logger { + loggOpts := logg.LoggOpts{ + FormatType: logg.Logfmt, + LogLevel: slog.LevelInfo, + } + + if os.Getenv("DEBUG") != "" { + loggOpts.LogLevel = slog.LevelDebug + } + + if os.Getenv("DEV") != "" { + loggOpts.LogLevel = slog.LevelDebug + loggOpts.FormatType = logg.Human + } + + return logg.NewLogg(loggOpts) +} + +func initConfig() *koanf.Koanf { + var ( + ko = koanf.New(".") + ) + + confFile := file.Provider(confFlag) + if err := ko.Load(confFile, toml.Parser()); err != nil { + lo.Error("could not parse configuration file", "error", err) + os.Exit(1) + } + + if err := ko.Load(env.Provider("INDEXER_", ".", func(s string) string { + return strings.ReplaceAll(strings.ToLower( + strings.TrimPrefix(s, "INDEXER_")), "__", ".") + }), nil); err != nil { + lo.Error("could not override config from env vars", "error", err) + os.Exit(1) + } + + return ko +} diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go new file mode 100644 index 0000000..8c5e11f --- /dev/null +++ b/cmd/indexer/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "errors" + "flag" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/grassrootseconomics/celo-indexer/internal/store" + "github.com/grassrootseconomics/celo-indexer/internal/sub" + "github.com/knadh/koanf/v2" +) + +const defaultGracefulShutdownPeriod = time.Second * 20 + +var ( + build = "dev" + + confFlag string + migrationsFolderFlag string + queriesFlag string + + lo *slog.Logger + ko *koanf.Koanf +) + +func init() { + flag.StringVar(&confFlag, "config", "config.toml", "Config file location") + flag.StringVar(&migrationsFolderFlag, "migrations", "migrations/", "Migrations folder location") + flag.StringVar(&queriesFlag, "queries", "queries.sql", "Queries file location") + flag.Parse() + + lo = initLogger() + ko = initConfig() + + lo.Info("starting celo indexer", "build", build) +} + +func main() { + var ( + wg sync.WaitGroup + ) + ctx, stop := notifyShutdown() + + store, err := store.NewPgStore(store.PgOpts{ + DSN: ko.MustString("postgres.dsn"), + MigrationsFolderPath: migrationsFolderFlag, + QueriesFolderPath: queriesFlag, + Logg: lo, + }) + if err != nil { + lo.Error("could not initialize postgres store", "error", err) + os.Exit(1) + } + + jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ + Endpoint: ko.MustString("jetstream.endpoint"), + Logg: lo, + Store: store, + }) + if err != nil { + lo.Error("could not initialize jetstream sub", "error", err) + os.Exit(1) + } + + wg.Add(1) + go func() { + defer wg.Done() + jetStreamSub.Process() + }() + + <-ctx.Done() + lo.Info("shutdown signal received") + shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) + + wg.Add(1) + go func() { + defer wg.Done() + jetStreamSub.Close() + }() + + go func() { + wg.Wait() + stop() + cancel() + os.Exit(0) + }() + + <-shutdownCtx.Done() + if errors.Is(shutdownCtx.Err(), context.DeadlineExceeded) { + stop() + cancel() + lo.Error("graceful shutdown period exceeded, forcefully shutting down") + } + os.Exit(1) +} + +func notifyShutdown() (context.Context, context.CancelFunc) { + return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..115ec48 --- /dev/null +++ b/config.toml @@ -0,0 +1,11 @@ +[metrics] +go_process = true + +[api] +address = ":5001" + +[postgres] +dsn = "postgres://postgres:postgres@127.0.0.1:5432/ge_celo_data" + +[jetstream] +endpoint = "nats://127.0.0.1:4222" diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml new file mode 100644 index 0000000..be9ebec --- /dev/null +++ b/dev/docker-compose.yaml @@ -0,0 +1,22 @@ +services: + postgres: + image: postgres:16-alpine + restart: unless-stopped + user: postgres + environment: + - POSTGRES_PASSWORD=postgres + - POSTGRES_USER=postgres + volumes: + - ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql + - indexer-pg:/var/lib/postgresql/data + ports: + - "127.0.0.1:5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready"] + interval: 10s + timeout: 5s + retries: 5 + +volumes: + indexer-pg: + driver: local \ No newline at end of file diff --git a/dev/init_db.sql b/dev/init_db.sql new file mode 100644 index 0000000..beffa89 --- /dev/null +++ b/dev/init_db.sql @@ -0,0 +1 @@ +CREATE DATABASE ge_celo_data; \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5948e64 --- /dev/null +++ b/go.mod @@ -0,0 +1,43 @@ +module github.com/grassrootseconomics/celo-indexer + +go 1.22.1 + +require ( + github.com/jackc/pgx/v5 v5.5.5 + github.com/jackc/tern/v2 v2.1.1 + github.com/kamikazechaser/common v0.2.0 + github.com/knadh/goyesql/v2 v2.2.0 + github.com/knadh/koanf/parsers/toml v0.1.0 + github.com/knadh/koanf/providers/env v0.1.0 + github.com/knadh/koanf/providers/file v0.1.0 + github.com/knadh/koanf/v2 v2.1.1 + github.com/nats-io/nats.go v1.34.1 +) + +require ( + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver/v3 v3.2.0 // indirect + github.com/Masterminds/sprig/v3 v3.2.3 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/huandu/xstrings v1.4.0 // indirect + github.com/imdario/mergo v0.3.13 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/lmittmann/tint v1.0.4 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + github.com/spf13/cast v1.5.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6ced154 --- /dev/null +++ b/go.sum @@ -0,0 +1,143 @@ +github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= +github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= +github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj9n6YA= +github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= +github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= +github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/tern/v2 v2.1.1 h1:qDo41wTtDHrTgkN7lhcoMQ6oiAWqiD8xKgslxyoKHNQ= +github.com/jackc/tern/v2 v2.1.1/go.mod h1:xnRalAguscgir18eW/wscn/QTEoWwFqrpW+5S+CREWM= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/kamikazechaser/common v0.2.0 h1:bqi5UaMTDm/wtZlJEvQDNhsLVJP4Beg+HKWeQ+dhpss= +github.com/kamikazechaser/common v0.2.0/go.mod h1:I1LEc8+W+g/KHZWARc1gMhuSa2STbQgfL4Hao6I/ZwY= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/knadh/goyesql/v2 v2.2.0 h1:DNQIzgITmMTXA+z+jDzbXCpgr7fGD6Hp0AJ7ZLEAem4= +github.com/knadh/goyesql/v2 v2.2.0/go.mod h1:is+wK/XQBukYK3DdKfpJRyDH9U/ZTMyX2u6DFijjRnI= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6ODgOub/6LCI= +github.com/knadh/koanf/parsers/toml v0.1.0/go.mod h1:yUprhq6eo3GbyVXFFMdbfZSo928ksS+uo0FFqNMnO18= +github.com/knadh/koanf/providers/env v0.1.0 h1:LqKteXqfOWyx5Ab9VfGHmjY9BvRXi+clwyZozgVRiKg= +github.com/knadh/koanf/providers/env v0.1.0/go.mod h1:RE8K9GbACJkeEnkl8L/Qcj8p4ZyPXZIQ191HJi44ZaQ= +github.com/knadh/koanf/providers/file v0.1.0 h1:fs6U7nrV58d3CFAFh8VTde8TM262ObYf3ODrc//Lp+c= +github.com/knadh/koanf/providers/file v0.1.0/go.mod h1:rjJ/nHQl64iYCtAW2QQnF0eSmDEX/YZ/eNFj5yR6BvA= +github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= +github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lmittmann/tint v1.0.4 h1:LeYihpJ9hyGvE0w+K2okPTGUdVLfng1+nDNVR4vWISc= +github.com/lmittmann/tint v1.0.4/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= +github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= +github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..05aaa7c --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,36 @@ +package event + +import "encoding/json" + +type ( + Event struct { + Block uint64 `json:"block"` + ContractAddress string `json:"contractAddress"` + Success bool `json:"success"` + Timestamp int64 `json:"timestamp"` + TxHash string `json:"transactionHash"` + TxType string `json:"transactionType"` + Payload map[string]any `json:"payload"` + } +) + +func (e Event) Serialize() ([]byte, error) { + jsonData, err := json.Marshal(e) + if err != nil { + return nil, err + } + + return jsonData, err +} + +func Deserialize(jsonData []byte) (Event, error) { + var ( + event Event + ) + + if err := json.Unmarshal(jsonData, &event); err != nil { + return event, err + } + + return event, nil +} diff --git a/internal/store/pg.go b/internal/store/pg.go new file mode 100644 index 0000000..d9db5a2 --- /dev/null +++ b/internal/store/pg.go @@ -0,0 +1,287 @@ +package store + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/grassrootseconomics/celo-indexer/internal/event" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/tern/v2/migrate" + "github.com/knadh/goyesql/v2" +) + +type ( + PgOpts struct { + DSN string + MigrationsFolderPath string + QueriesFolderPath string + Logg *slog.Logger + } + + Pg struct { + db *pgxpool.Pool + queries *queries + logg *slog.Logger + } + + queries struct { + InsertTx string `query:"insert-tx"` + InsertTokenTransfer string `query:"insert-token-transfer"` + InsertTokenMint string `query:"insert-token-mint"` + InsertPoolSwap string `query:"insert-pool-swap"` + InsertPoolDeposit string `query:"insert-pool-deposit"` + } +) + +const ( + migratorTimeout = 5 * time.Second +) + +func NewPgStore(o PgOpts) (Store, error) { + parsedConfig, err := pgxpool.ParseConfig(o.DSN) + if err != nil { + return nil, err + } + + dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig) + if err != nil { + return nil, err + } + + queries, err := loadQueries(o.QueriesFolderPath) + if err != nil { + return nil, err + } + + if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil { + return nil, err + } + + return &Pg{ + db: dbPool, + queries: queries, + logg: o.Logg, + }, nil +} + +func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error { + tx, err := pg.db.Begin(ctx) + if err != nil { + pg.logg.Error("ERR0") + return err + } + defer func() { + if err != nil { + tx.Rollback(ctx) + } else { + tx.Commit(ctx) + } + }() + + var ( + txID int + ) + if err := tx.QueryRow( + ctx, + pg.queries.InsertTx, + eventPayload.TxHash, + eventPayload.Block, + eventPayload.ContractAddress, + time.Unix(eventPayload.Timestamp, 0).UTC(), + eventPayload.Success, + ).Scan(&txID); err != nil { + pg.logg.Error("ERR1") + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertTokenTransfer, + txID, + eventPayload.Payload["from"].(string), + eventPayload.Payload["to"].(string), + eventPayload.Payload["value"].(string), + ) + if err != nil { + pg.logg.Error("ERR2") + return err + } + + return nil +} + +func (pg *Pg) InsertTokenMint(ctx context.Context, eventPayload event.Event) error { + tx, err := pg.db.Begin(ctx) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback(ctx) + } else { + tx.Commit(ctx) + } + }() + + var ( + txID int + ) + if err := tx.QueryRow( + ctx, + pg.queries.InsertTx, + eventPayload.TxHash, + eventPayload.Block, + eventPayload.ContractAddress, + time.Unix(eventPayload.Timestamp, 0).UTC(), + eventPayload.Success, + ).Scan(&txID); err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertTokenMint, + txID, + eventPayload.Payload["tokenMinter"].(string), + eventPayload.Payload["to"].(string), + eventPayload.Payload["value"].(string), + ) + if err != nil { + return err + } + + return nil +} + +func (pg *Pg) InsertPoolSwap(ctx context.Context, eventPayload event.Event) error { + tx, err := pg.db.Begin(ctx) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback(ctx) + } else { + tx.Commit(ctx) + } + }() + + var ( + txID int + ) + if err := tx.QueryRow( + ctx, + pg.queries.InsertTx, + eventPayload.TxHash, + eventPayload.Block, + eventPayload.ContractAddress, + time.Unix(eventPayload.Timestamp, 0).UTC(), + eventPayload.Success, + ).Scan(&txID); err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertPoolSwap, + txID, + eventPayload.Payload["initiator"].(string), + eventPayload.Payload["tokenIn"].(string), + eventPayload.Payload["tokenOut"].(string), + eventPayload.Payload["amountIn"].(string), + eventPayload.Payload["amountOut"].(string), + eventPayload.Payload["fee"].(string), + ) + if err != nil { + return err + } + + return nil +} + +func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) error { + tx, err := pg.db.Begin(ctx) + if err != nil { + return err + } + defer func() { + if err != nil { + tx.Rollback(ctx) + } else { + tx.Commit(ctx) + } + }() + + var ( + txID int + ) + if err := tx.QueryRow( + ctx, + pg.queries.InsertTx, + eventPayload.TxHash, + eventPayload.Block, + eventPayload.ContractAddress, + time.Unix(eventPayload.Timestamp, 0).UTC(), + eventPayload.Success, + ).Scan(&txID); err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertPoolDeposit, + txID, + eventPayload.Payload["initiator"].(string), + eventPayload.Payload["tokenIn"].(string), + eventPayload.Payload["amountIn"].(string), + ) + if err != nil { + return err + } + + return nil +} + +func loadQueries(queriesPath string) (*queries, error) { + parsedQueries, err := goyesql.ParseFile(queriesPath) + if err != nil { + return nil, err + } + + loadedQueries := &queries{} + + if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil { + return nil, fmt.Errorf("failed to scan queries %v", err) + } + + return loadedQueries, nil +} + +func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error { + ctx, cancel := context.WithTimeout(ctx, migratorTimeout) + defer cancel() + + conn, err := dbPool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version") + if err != nil { + return err + } + + if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil { + return err + } + + if err := migrator.Migrate(ctx); err != nil { + return err + } + + return nil +} diff --git a/internal/store/store.go b/internal/store/store.go new file mode 100644 index 0000000..fbca34c --- /dev/null +++ b/internal/store/store.go @@ -0,0 +1,16 @@ +package store + +import ( + "context" + + "github.com/grassrootseconomics/celo-indexer/internal/event" +) + +type ( + Store interface { + InsertTokenTransfer(context.Context, event.Event) error + InsertTokenMint(context.Context, event.Event) error + InsertPoolSwap(context.Context, event.Event) error + InsertPoolDeposit(context.Context, event.Event) error + } +) diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go new file mode 100644 index 0000000..e78e414 --- /dev/null +++ b/internal/sub/jetstream.go @@ -0,0 +1,134 @@ +package sub + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + + "github.com/grassrootseconomics/celo-indexer/internal/event" + "github.com/grassrootseconomics/celo-indexer/internal/store" + "github.com/nats-io/nats.go" +) + +const ( + durableId = "celo-indexer-6" + pullStream = "TRACKER" + pullSubject = "TRACKER.*" +) + +type ( + JetStreamOpts struct { + Logg *slog.Logger + Endpoint string + Store store.Store + } + + JetStreamSub struct { + natsConn *nats.Conn + jsCtx nats.JetStreamContext + store store.Store + logg *slog.Logger + } +) + +func NewJetStreamSub(o JetStreamOpts) (Sub, error) { + natsConn, err := nats.Connect(o.Endpoint) + if err != nil { + return nil, err + } + + js, err := natsConn.JetStream() + if err != nil { + return nil, err + } + o.Logg.Info("successfully connected to NATS server") + + _, err = js.AddConsumer(pullStream, &nats.ConsumerConfig{ + Durable: durableId, + AckPolicy: nats.AckExplicitPolicy, + FilterSubject: pullSubject, + }) + if err != nil { + return nil, err + } + + return &JetStreamSub{ + natsConn: natsConn, + jsCtx: js, + store: o.Store, + logg: o.Logg, + }, nil +} + +func (s *JetStreamSub) Close() { + if s.natsConn != nil { + s.natsConn.Close() + } +} + +func (s *JetStreamSub) Process() error { + subOpts := []nats.SubOpt{ + nats.ManualAck(), + nats.Bind(pullStream, durableId), + } + + natsSub, err := s.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...) + if err != nil { + return err + } + + for { + events, err := natsSub.Fetch(1) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + continue + } else if errors.Is(err, nats.ErrConnectionClosed) { + return nil + } else { + return err + } + } + + if len(events) > 0 { + msg := events[0] + if err := s.processEventHandler(context.Background(), msg); err != nil { + s.logg.Error("error processing nats message", "error", err) + msg.Nak() + } else { + msg.Ack() + } + } + } +} + +func (s *JetStreamSub) processEventHandler(ctx context.Context, msg *nats.Msg) error { + var ( + chainEvent event.Event + ) + + if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { + return err + } + + switch msg.Subject { + case "TRACKER.TOKEN_TRANSFER": + if err := s.store.InsertTokenTransfer(ctx, chainEvent); err != nil { + return err + } + case "TRACKER.TOKEN_MINT": + if err := s.store.InsertTokenMint(ctx, chainEvent); err != nil { + return err + } + case "TRACKER.POOL_SWAP": + if err := s.store.InsertPoolSwap(ctx, chainEvent); err != nil { + return err + } + case "TRACKER.POOL_DEPOSIT": + if err := s.store.InsertPoolDeposit(ctx, chainEvent); err != nil { + return err + } + } + + return nil +} diff --git a/internal/sub/sub.go b/internal/sub/sub.go new file mode 100644 index 0000000..18a6067 --- /dev/null +++ b/internal/sub/sub.go @@ -0,0 +1,8 @@ +package sub + +type ( + Sub interface { + Process() error + Close() + } +) diff --git a/migrations/001_indexer_base.sql b/migrations/001_indexer_base.sql new file mode 100644 index 0000000..2898c05 --- /dev/null +++ b/migrations/001_indexer_base.sql @@ -0,0 +1,76 @@ +CREATE TABLE IF NOT EXISTS tx ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_hash VARCHAR(66) NOT NULL UNIQUE, + block_number INT NOT NULL, + contract_address VARCHAR(42) NOT NULL, + date_block TIMESTAMP NOT NULL, + success BOOLEAN NOT NULL +); + +CREATE TABLE IF NOT EXISTS token_transfer ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + sender_address VARCHAR(42) NOT NULL, + recipient_address VARCHAR(42) NOT NULL, + transfer_value NUMERIC NOT NULL +); + +CREATE TABLE IF NOT EXISTS token_mint ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + minter_address VARCHAR(42) NOT NULL, + recipient_address VARCHAR(42) NOT NULL, + mint_value NUMERIC NOT NULL +); + +CREATE TABLE IF NOT EXISTS token_burn ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + burner_address VARCHAR(42) NOT NULL, + burn_value NUMERIC NOT NULL +); + +CREATE TABLE IF NOT EXISTS faucet_give ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + token_address VARCHAR(42) NOT NULL, + recipient_address VARCHAR(42) NOT NULL, + give_value NUMERIC NOT NULL + +); + +CREATE TABLE IF NOT EXISTS pool_swap ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + initiator_address VARCHAR(42) NOT NULL, + token_in_address VARCHAR(42) NOT NULL, + token_out_address VARCHAR(42) NOT NULL, + in_value NUMERIC NOT NULL, + out_value NUMERIC NOT NULL, + fee NUMERIC NOT NULL +); + +CREATE TABLE IF NOT EXISTS pool_deposit ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + initiator_address VARCHAR(42) NOT NULL, + token_in_address VARCHAR(42) NOT NULL, + in_value NUMERIC NOT NULL +); + +CREATE TABLE IF NOT EXISTS contracts ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + contract_address VARCHAR(42) UNIQUE NOT NULL, + contract_description TEXT NOT NULL, + is_token BOOLEAN NOT NULL +); + +CREATE TABLE IF NOT EXISTS tokens ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + contract_address VARCHAR(42) UNIQUE NOT NULL, + token_name TEXT NOT NULL, + token_symbol TEXT NOT NULL, + token_decimals INT NOT NULL, + token_version TEXT NOT NULL, + token_type TEXT NOT NULL +); \ No newline at end of file diff --git a/queries.sql b/queries.sql new file mode 100644 index 0000000..4ec39d4 --- /dev/null +++ b/queries.sql @@ -0,0 +1,76 @@ +--name: insert-tx +-- $1: tx_hash +-- $2: block_number +-- $3: contract_address +-- $4: date_block +-- $5: success +WITH insert_tx AS ( + INSERT INTO tx( + tx_hash, + block_number, + contract_address, + date_block, + success + ) VALUES($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING RETURNING id +) +SELECT id FROM insert_tx +UNION ALL +SELECT id FROM tx WHERE tx_hash = $1 AND id IS NOT NULL +LIMIT 1 + +--name: insert-token-transfer +-- $1: tx_id +-- $2: sender_address +-- $3: recipient_address +-- $4: transfer_value +INSERT INTO token_transfer( + tx_id, + sender_address, + recipient_address, + transfer_value +) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING + +--name: insert-token-mint +-- $1: tx_id +-- $2: minter_address +-- $3: recipient_address +-- $4: mint_value +INSERT INTO token_mint( + tx_id, + minter_address, + recipient_address, + mint_value +) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING + +--name: insert-pool-swap +-- $1: tx_id +-- $2: initiator_address +-- $3: token_in_address +-- $4: token_out_address +-- $5: in_value +-- $6: out_value +-- $7: fee +INSERT INTO pool_swap( + tx_id, + initiator_address, + token_in_address, + token_out_address, + in_value, + out_value, + fee +) VALUES($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING + +--name: insert-pool-deposit +-- $1: tx_id +-- $2: initiator_address +-- $3: token_in_address +-- $4: token_out_address +-- $5: in_value +-- $6: out_value +-- $7: fee +INSERT INTO pool_deposit( + tx_id, + initiator_address, + token_in_address, + in_value +) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING \ No newline at end of file