diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 8c5e11f..42aeae3 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -42,16 +42,14 @@ func init() { } func main() { - var ( - wg sync.WaitGroup - ) + var wg sync.WaitGroup ctx, stop := notifyShutdown() store, err := store.NewPgStore(store.PgOpts{ + Logg: lo, DSN: ko.MustString("postgres.dsn"), MigrationsFolderPath: migrationsFolderFlag, QueriesFolderPath: queriesFlag, - Logg: lo, }) if err != nil { lo.Error("could not initialize postgres store", "error", err) @@ -59,9 +57,10 @@ func main() { } jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ - Endpoint: ko.MustString("jetstream.endpoint"), - Logg: lo, - Store: store, + Logg: lo, + Store: store, + Endpoint: ko.MustString("jetstream.endpoint"), + JetStreamID: ko.MustString("jetstream.id"), }) if err != nil { lo.Error("could not initialize jetstream sub", "error", err) diff --git a/config.toml b/config.toml index 115ec48..2096fe3 100644 --- a/config.toml +++ b/config.toml @@ -9,3 +9,4 @@ dsn = "postgres://postgres:postgres@127.0.0.1:5432/ge_celo_data" [jetstream] endpoint = "nats://127.0.0.1:4222" +id = "celo-indexer-1" diff --git a/go.mod b/go.mod index 5948e64..578fa14 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,10 @@ module github.com/grassrootseconomics/celo-indexer -go 1.22.1 +go 1.22.3 require ( - github.com/jackc/pgx/v5 v5.5.5 + github.com/grassrootseconomics/celo-tracker v1.0.2-beta + github.com/jackc/pgx/v5 v5.6.0 github.com/jackc/tern/v2 v2.1.1 github.com/kamikazechaser/common v0.2.0 github.com/knadh/goyesql/v2 v2.2.0 @@ -36,8 +37,8 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/spf13/cast v1.5.0 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.16.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect ) diff --git a/go.sum b/go.sum index 6ced154..87c9d96 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grassrootseconomics/celo-tracker v1.0.2-beta h1:iB5JC/GQDMQrml9A85myvxXp8HC2aGiiLO7VXRfbH0E= +github.com/grassrootseconomics/celo-tracker v1.0.2-beta/go.mod h1:+byt6N9CSgaLgVfWfMxTKH9HguRFNW5uHv+jz668bFs= github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= @@ -30,8 +32,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= -github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/tern/v2 v2.1.1 h1:qDo41wTtDHrTgkN7lhcoMQ6oiAWqiD8xKgslxyoKHNQ= @@ -96,8 +98,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= -golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -106,8 +108,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -115,8 +117,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -125,14 +127,14 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/event/event.go b/internal/event/event.go deleted file mode 100644 index 05aaa7c..0000000 --- a/internal/event/event.go +++ /dev/null @@ -1,36 +0,0 @@ -package event - -import "encoding/json" - -type ( - Event struct { - Block uint64 `json:"block"` - ContractAddress string `json:"contractAddress"` - Success bool `json:"success"` - Timestamp int64 `json:"timestamp"` - TxHash string `json:"transactionHash"` - TxType string `json:"transactionType"` - Payload map[string]any `json:"payload"` - } -) - -func (e Event) Serialize() ([]byte, error) { - jsonData, err := json.Marshal(e) - if err != nil { - return nil, err - } - - return jsonData, err -} - -func Deserialize(jsonData []byte) (Event, error) { - var ( - event Event - ) - - if err := json.Unmarshal(jsonData, &event); err != nil { - return event, err - } - - return event, nil -} diff --git a/internal/store/pg.go b/internal/store/pg.go index d9db5a2..3687fd4 100644 --- a/internal/store/pg.go +++ b/internal/store/pg.go @@ -7,7 +7,8 @@ import ( "os" "time" - "github.com/grassrootseconomics/celo-indexer/internal/event" + "github.com/grassrootseconomics/celo-tracker/pkg/event" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/tern/v2/migrate" "github.com/knadh/goyesql/v2" @@ -15,31 +16,29 @@ import ( type ( PgOpts struct { + Logg *slog.Logger DSN string MigrationsFolderPath string QueriesFolderPath string - Logg *slog.Logger } Pg struct { db *pgxpool.Pool queries *queries - logg *slog.Logger } queries struct { - InsertTx string `query:"insert-tx"` - InsertTokenTransfer string `query:"insert-token-transfer"` - InsertTokenMint string `query:"insert-token-mint"` - InsertPoolSwap string `query:"insert-pool-swap"` - InsertPoolDeposit string `query:"insert-pool-deposit"` + InsertTx string `query:"insert-tx"` + InsertTokenTransfer string `query:"insert-token-transfer"` + InsertTokenMint string `query:"insert-token-mint"` + InsertTokenBurn string `query:"insert-token-burn"` + InsertFaucetGive string `query:"insert-faucet-give"` + InsertPoolSwap string `query:"insert-pool-swap"` + InsertPoolDeposit string `query:"insert-pool-deposit"` + InsertPriceQuoteUpdate string `query:"insert-price-quote-update"` } ) -const ( - migratorTimeout = 5 * time.Second -) - func NewPgStore(o PgOpts) (Store, error) { parsedConfig, err := pgxpool.ParseConfig(o.DSN) if err != nil { @@ -59,150 +58,165 @@ func NewPgStore(o PgOpts) (Store, error) { if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil { return nil, err } + o.Logg.Info("migrations ran successfully") return &Pg{ db: dbPool, queries: queries, - logg: o.Logg, }, nil } func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error { - tx, err := pg.db.Begin(ctx) - if err != nil { - pg.logg.Error("ERR0") - return err - } - defer func() { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) if err != nil { - tx.Rollback(ctx) - } else { - tx.Commit(ctx) + return err } - }() - var ( - txID int - ) - if err := tx.QueryRow( - ctx, - pg.queries.InsertTx, - eventPayload.TxHash, - eventPayload.Block, - eventPayload.ContractAddress, - time.Unix(eventPayload.Timestamp, 0).UTC(), - eventPayload.Success, - ).Scan(&txID); err != nil { - pg.logg.Error("ERR1") + _, err = tx.Exec( + ctx, + pg.queries.InsertTokenTransfer, + txID, + eventPayload.Payload["from"].(string), + eventPayload.Payload["to"].(string), + eventPayload.Payload["value"].(string), + ) return err - } - - _, err = tx.Exec( - ctx, - pg.queries.InsertTokenTransfer, - txID, - eventPayload.Payload["from"].(string), - eventPayload.Payload["to"].(string), - eventPayload.Payload["value"].(string), - ) - if err != nil { - pg.logg.Error("ERR2") - return err - } - - return nil + }) } func (pg *Pg) InsertTokenMint(ctx context.Context, eventPayload event.Event) error { - tx, err := pg.db.Begin(ctx) - if err != nil { - return err - } - defer func() { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) if err != nil { - tx.Rollback(ctx) - } else { - tx.Commit(ctx) + return err } - }() - var ( - txID int - ) - if err := tx.QueryRow( - ctx, - pg.queries.InsertTx, - eventPayload.TxHash, - eventPayload.Block, - eventPayload.ContractAddress, - time.Unix(eventPayload.Timestamp, 0).UTC(), - eventPayload.Success, - ).Scan(&txID); err != nil { + _, err = tx.Exec( + ctx, + pg.queries.InsertTokenMint, + txID, + eventPayload.Payload["tokenMinter"].(string), + eventPayload.Payload["to"].(string), + eventPayload.Payload["value"].(string), + ) return err - } + }) +} - _, err = tx.Exec( - ctx, - pg.queries.InsertTokenMint, - txID, - eventPayload.Payload["tokenMinter"].(string), - eventPayload.Payload["to"].(string), - eventPayload.Payload["value"].(string), - ) - if err != nil { +func (pg *Pg) InsertTokenBurn(ctx context.Context, eventPayload event.Event) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) + if err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertTokenBurn, + txID, + eventPayload.Payload["tokenBurner"].(string), + eventPayload.Payload["value"].(string), + ) return err - } + }) +} - return nil +func (pg *Pg) InsertFaucetGive(ctx context.Context, eventPayload event.Event) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) + if err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertFaucetGive, + txID, + eventPayload.Payload["token"].(string), + eventPayload.Payload["recipient"].(string), + eventPayload.Payload["amount"].(string), + ) + return err + }) } func (pg *Pg) InsertPoolSwap(ctx context.Context, eventPayload event.Event) error { - tx, err := pg.db.Begin(ctx) - if err != nil { - return err - } - defer func() { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) if err != nil { - tx.Rollback(ctx) - } else { - tx.Commit(ctx) + return err } - }() - var ( - txID int - ) - if err := tx.QueryRow( - ctx, - pg.queries.InsertTx, - eventPayload.TxHash, - eventPayload.Block, - eventPayload.ContractAddress, - time.Unix(eventPayload.Timestamp, 0).UTC(), - eventPayload.Success, - ).Scan(&txID); err != nil { + _, err = tx.Exec( + ctx, + pg.queries.InsertPoolSwap, + txID, + eventPayload.Payload["initiator"].(string), + eventPayload.Payload["tokenIn"].(string), + eventPayload.Payload["tokenOut"].(string), + eventPayload.Payload["amountIn"].(string), + eventPayload.Payload["amountOut"].(string), + eventPayload.Payload["fee"].(string), + ) return err - } - - _, err = tx.Exec( - ctx, - pg.queries.InsertPoolSwap, - txID, - eventPayload.Payload["initiator"].(string), - eventPayload.Payload["tokenIn"].(string), - eventPayload.Payload["tokenOut"].(string), - eventPayload.Payload["amountIn"].(string), - eventPayload.Payload["amountOut"].(string), - eventPayload.Payload["fee"].(string), - ) - if err != nil { - return err - } - - return nil + }) } func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) + if err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertPoolDeposit, + txID, + eventPayload.Payload["initiator"].(string), + eventPayload.Payload["tokenIn"].(string), + eventPayload.Payload["amountIn"].(string), + ) + return err + }) +} + +func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error { + return pg.executeTransaction(ctx, func(tx pgx.Tx) error { + txID, err := pg.insertTx(ctx, tx, eventPayload) + if err != nil { + return err + } + + _, err = tx.Exec( + ctx, + pg.queries.InsertPriceQuoteUpdate, + txID, + eventPayload.Payload["token"].(string), + eventPayload.Payload["exchangeRate"].(string), + ) + return err + }) +} + +func (pg *Pg) insertTx(ctx context.Context, tx pgx.Tx, eventPayload event.Event) (int, error) { + var txID int + if err := tx.QueryRow( + ctx, + pg.queries.InsertTx, + eventPayload.TxHash, + eventPayload.Block, + eventPayload.ContractAddress, + time.Unix(int64(eventPayload.Timestamp), 0).UTC(), + eventPayload.Success, + ).Scan(&txID); err != nil { + return 0, err + } + return txID, nil +} + +func (pg *Pg) executeTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error { tx, err := pg.db.Begin(ctx) if err != nil { return err @@ -215,30 +229,7 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e } }() - var ( - txID int - ) - if err := tx.QueryRow( - ctx, - pg.queries.InsertTx, - eventPayload.TxHash, - eventPayload.Block, - eventPayload.ContractAddress, - time.Unix(eventPayload.Timestamp, 0).UTC(), - eventPayload.Success, - ).Scan(&txID); err != nil { - return err - } - - _, err = tx.Exec( - ctx, - pg.queries.InsertPoolDeposit, - txID, - eventPayload.Payload["initiator"].(string), - eventPayload.Payload["tokenIn"].(string), - eventPayload.Payload["amountIn"].(string), - ) - if err != nil { + if err = fn(tx); err != nil { return err } @@ -261,6 +252,8 @@ func loadQueries(queriesPath string) (*queries, error) { } func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error { + const migratorTimeout = 5 * time.Second + ctx, cancel := context.WithTimeout(ctx, migratorTimeout) defer cancel() diff --git a/internal/store/store.go b/internal/store/store.go index fbca34c..33a2cd3 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -3,14 +3,17 @@ package store import ( "context" - "github.com/grassrootseconomics/celo-indexer/internal/event" + "github.com/grassrootseconomics/celo-tracker/pkg/event" ) type ( Store interface { InsertTokenTransfer(context.Context, event.Event) error InsertTokenMint(context.Context, event.Event) error + InsertTokenBurn(context.Context, event.Event) error + InsertFaucetGive(context.Context, event.Event) error InsertPoolSwap(context.Context, event.Event) error InsertPoolDeposit(context.Context, event.Event) error + InsertPriceQuoteUpdate(context.Context, event.Event) error } ) diff --git a/internal/sub/jetstream.go b/internal/sub/jetstream.go index e78e414..8bbaca6 100644 --- a/internal/sub/jetstream.go +++ b/internal/sub/jetstream.go @@ -5,59 +5,71 @@ import ( "encoding/json" "errors" "log/slog" + "time" - "github.com/grassrootseconomics/celo-indexer/internal/event" "github.com/grassrootseconomics/celo-indexer/internal/store" + "github.com/grassrootseconomics/celo-tracker/pkg/event" "github.com/nats-io/nats.go" -) - -const ( - durableId = "celo-indexer-6" - pullStream = "TRACKER" - pullSubject = "TRACKER.*" + "github.com/nats-io/nats.go/jetstream" ) type ( JetStreamOpts struct { - Logg *slog.Logger - Endpoint string - Store store.Store + Store store.Store + Logg *slog.Logger + Endpoint string + JetStreamID string } JetStreamSub struct { - natsConn *nats.Conn - jsCtx nats.JetStreamContext - store store.Store - logg *slog.Logger + jsConsumer jetstream.Consumer + store store.Store + natsConn *nats.Conn + logg *slog.Logger + durableID string } ) +const ( + pullStream = "TRACKER" + pullSubject = "TRACKER.*" +) + func NewJetStreamSub(o JetStreamOpts) (Sub, error) { natsConn, err := nats.Connect(o.Endpoint) if err != nil { return nil, err } - js, err := natsConn.JetStream() + js, err := jetstream.New(natsConn) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := js.Stream(ctx, pullStream) + if err != nil { + return nil, err + } + + consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: o.JetStreamID, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: pullStream, + }) if err != nil { return nil, err } o.Logg.Info("successfully connected to NATS server") - _, err = js.AddConsumer(pullStream, &nats.ConsumerConfig{ - Durable: durableId, - AckPolicy: nats.AckExplicitPolicy, - FilterSubject: pullSubject, - }) - if err != nil { - return nil, err - } - return &JetStreamSub{ - natsConn: natsConn, - jsCtx: js, - store: o.Store, - logg: o.Logg, + jsConsumer: consumer, + store: o.Store, + natsConn: natsConn, + logg: o.Logg, + durableID: o.JetStreamID, }, nil } @@ -68,18 +80,8 @@ func (s *JetStreamSub) Close() { } func (s *JetStreamSub) Process() error { - subOpts := []nats.SubOpt{ - nats.ManualAck(), - nats.Bind(pullStream, durableId), - } - - natsSub, err := s.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...) - if err != nil { - return err - } - for { - events, err := natsSub.Fetch(1) + events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second)) if err != nil { if errors.Is(err, nats.ErrTimeout) { continue @@ -90,9 +92,8 @@ func (s *JetStreamSub) Process() error { } } - if len(events) > 0 { - msg := events[0] - if err := s.processEventHandler(context.Background(), msg); err != nil { + for msg := range events.Messages() { + if err := s.processEventHandler(context.Background(), msg.Subject(), msg.Data()); err != nil { s.logg.Error("error processing nats message", "error", err) msg.Nak() } else { @@ -102,16 +103,14 @@ func (s *JetStreamSub) Process() error { } } -func (s *JetStreamSub) processEventHandler(ctx context.Context, msg *nats.Msg) error { - var ( - chainEvent event.Event - ) +func (s *JetStreamSub) processEventHandler(ctx context.Context, msgSubject string, msgData []byte) error { + var chainEvent event.Event - if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { + if err := json.Unmarshal(msgData, &chainEvent); err != nil { return err } - switch msg.Subject { + switch msgSubject { case "TRACKER.TOKEN_TRANSFER": if err := s.store.InsertTokenTransfer(ctx, chainEvent); err != nil { return err diff --git a/migrations/001_indexer_base.sql b/migrations/001_indexer_base.sql index 2898c05..f66ff38 100644 --- a/migrations/001_indexer_base.sql +++ b/migrations/001_indexer_base.sql @@ -58,6 +58,14 @@ CREATE TABLE IF NOT EXISTS pool_deposit ( in_value NUMERIC NOT NULL ); +CREATE TABLE IF NOT EXISTS price_index_updates ( + id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + tx_id INT REFERENCES tx(id), + token VARCHAR(42) NOT NULL, + exchange_rate NUMERIC NOT NULL +); + + CREATE TABLE IF NOT EXISTS contracts ( id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, contract_address VARCHAR(42) UNIQUE NOT NULL, diff --git a/queries.sql b/queries.sql index 4ec39d4..2b2e4c5 100644 --- a/queries.sql +++ b/queries.sql @@ -42,6 +42,28 @@ INSERT INTO token_mint( mint_value ) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING +--name: insert-token-burn +-- $1: tx_id +-- $2: burner_address +-- $3: burn_value +INSERT INTO token_burn( + tx_id, + burner_address, + burn_value +) VALUES($1, $2, $3) ON CONFLICT DO NOTHING + +--name: insert-faucet-give +-- $1: tx_id +-- $2: token_address +-- $3: recipient_address +-- $4: give_value +INSERT INTO faucet_give( + tx_id, + token_address, + recipient_address, + give_value +) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING + --name: insert-pool-swap -- $1: tx_id -- $2: initiator_address @@ -73,4 +95,14 @@ INSERT INTO pool_deposit( initiator_address, token_in_address, in_value -) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING \ No newline at end of file +) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING + +--name: insert-price-quote-update +-- $1: tx_id +-- $2: token +-- $3: exchange_rate +INSERT INTO price_index_updates( + tx_id, + token, + exchange_rate, +) VALUES($1, $2, $3) ON CONFLICT DO NOTHING \ No newline at end of file