feat: add all handlers, use new jetstream simplified client

This commit is contained in:
Mohamed Sohail 2024-05-30 15:55:24 +08:00
parent d0f21b2fdd
commit ccfc2843b2
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
10 changed files with 261 additions and 259 deletions

View File

@ -42,16 +42,14 @@ func init() {
} }
func main() { func main() {
var ( var wg sync.WaitGroup
wg sync.WaitGroup
)
ctx, stop := notifyShutdown() ctx, stop := notifyShutdown()
store, err := store.NewPgStore(store.PgOpts{ store, err := store.NewPgStore(store.PgOpts{
Logg: lo,
DSN: ko.MustString("postgres.dsn"), DSN: ko.MustString("postgres.dsn"),
MigrationsFolderPath: migrationsFolderFlag, MigrationsFolderPath: migrationsFolderFlag,
QueriesFolderPath: queriesFlag, QueriesFolderPath: queriesFlag,
Logg: lo,
}) })
if err != nil { if err != nil {
lo.Error("could not initialize postgres store", "error", err) lo.Error("could not initialize postgres store", "error", err)
@ -59,9 +57,10 @@ func main() {
} }
jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{ jetStreamSub, err := sub.NewJetStreamSub(sub.JetStreamOpts{
Endpoint: ko.MustString("jetstream.endpoint"), Logg: lo,
Logg: lo, Store: store,
Store: store, Endpoint: ko.MustString("jetstream.endpoint"),
JetStreamID: ko.MustString("jetstream.id"),
}) })
if err != nil { if err != nil {
lo.Error("could not initialize jetstream sub", "error", err) lo.Error("could not initialize jetstream sub", "error", err)

View File

@ -9,3 +9,4 @@ dsn = "postgres://postgres:postgres@127.0.0.1:5432/ge_celo_data"
[jetstream] [jetstream]
endpoint = "nats://127.0.0.1:4222" endpoint = "nats://127.0.0.1:4222"
id = "celo-indexer-1"

13
go.mod
View File

@ -1,9 +1,10 @@
module github.com/grassrootseconomics/celo-indexer module github.com/grassrootseconomics/celo-indexer
go 1.22.1 go 1.22.3
require ( require (
github.com/jackc/pgx/v5 v5.5.5 github.com/grassrootseconomics/celo-tracker v1.0.2-beta
github.com/jackc/pgx/v5 v5.6.0
github.com/jackc/tern/v2 v2.1.1 github.com/jackc/tern/v2 v2.1.1
github.com/kamikazechaser/common v0.2.0 github.com/kamikazechaser/common v0.2.0
github.com/knadh/goyesql/v2 v2.2.0 github.com/knadh/goyesql/v2 v2.2.0
@ -36,8 +37,8 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml v1.9.5 // indirect
github.com/shopspring/decimal v1.3.1 // indirect github.com/shopspring/decimal v1.3.1 // indirect
github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cast v1.5.0 // indirect
golang.org/x/crypto v0.18.0 // indirect golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sync v0.1.0 // indirect golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.16.0 // indirect golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.15.0 // indirect
) )

26
go.sum
View File

@ -20,6 +20,8 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grassrootseconomics/celo-tracker v1.0.2-beta h1:iB5JC/GQDMQrml9A85myvxXp8HC2aGiiLO7VXRfbH0E=
github.com/grassrootseconomics/celo-tracker v1.0.2-beta/go.mod h1:+byt6N9CSgaLgVfWfMxTKH9HguRFNW5uHv+jz668bFs=
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
@ -30,8 +32,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/tern/v2 v2.1.1 h1:qDo41wTtDHrTgkN7lhcoMQ6oiAWqiD8xKgslxyoKHNQ= github.com/jackc/tern/v2 v2.1.1 h1:qDo41wTtDHrTgkN7lhcoMQ6oiAWqiD8xKgslxyoKHNQ=
@ -96,8 +98,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -106,8 +108,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -115,8 +117,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
@ -125,14 +127,14 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df h1:5Pf6pFKu98ODmgnpvkJ3kFUOQGGLIzLIkbzUHp47618=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -1,36 +0,0 @@
package event
import "encoding/json"
type (
Event struct {
Block uint64 `json:"block"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
Timestamp int64 `json:"timestamp"`
TxHash string `json:"transactionHash"`
TxType string `json:"transactionType"`
Payload map[string]any `json:"payload"`
}
)
func (e Event) Serialize() ([]byte, error) {
jsonData, err := json.Marshal(e)
if err != nil {
return nil, err
}
return jsonData, err
}
func Deserialize(jsonData []byte) (Event, error) {
var (
event Event
)
if err := json.Unmarshal(jsonData, &event); err != nil {
return event, err
}
return event, nil
}

View File

@ -7,7 +7,8 @@ import (
"os" "os"
"time" "time"
"github.com/grassrootseconomics/celo-indexer/internal/event" "github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate" "github.com/jackc/tern/v2/migrate"
"github.com/knadh/goyesql/v2" "github.com/knadh/goyesql/v2"
@ -15,31 +16,29 @@ import (
type ( type (
PgOpts struct { PgOpts struct {
Logg *slog.Logger
DSN string DSN string
MigrationsFolderPath string MigrationsFolderPath string
QueriesFolderPath string QueriesFolderPath string
Logg *slog.Logger
} }
Pg struct { Pg struct {
db *pgxpool.Pool db *pgxpool.Pool
queries *queries queries *queries
logg *slog.Logger
} }
queries struct { queries struct {
InsertTx string `query:"insert-tx"` InsertTx string `query:"insert-tx"`
InsertTokenTransfer string `query:"insert-token-transfer"` InsertTokenTransfer string `query:"insert-token-transfer"`
InsertTokenMint string `query:"insert-token-mint"` InsertTokenMint string `query:"insert-token-mint"`
InsertPoolSwap string `query:"insert-pool-swap"` InsertTokenBurn string `query:"insert-token-burn"`
InsertPoolDeposit string `query:"insert-pool-deposit"` InsertFaucetGive string `query:"insert-faucet-give"`
InsertPoolSwap string `query:"insert-pool-swap"`
InsertPoolDeposit string `query:"insert-pool-deposit"`
InsertPriceQuoteUpdate string `query:"insert-price-quote-update"`
} }
) )
const (
migratorTimeout = 5 * time.Second
)
func NewPgStore(o PgOpts) (Store, error) { func NewPgStore(o PgOpts) (Store, error) {
parsedConfig, err := pgxpool.ParseConfig(o.DSN) parsedConfig, err := pgxpool.ParseConfig(o.DSN)
if err != nil { if err != nil {
@ -59,150 +58,165 @@ func NewPgStore(o PgOpts) (Store, error) {
if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil { if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil {
return nil, err return nil, err
} }
o.Logg.Info("migrations ran successfully")
return &Pg{ return &Pg{
db: dbPool, db: dbPool,
queries: queries, queries: queries,
logg: o.Logg,
}, nil }, nil
} }
func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error { func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error {
tx, err := pg.db.Begin(ctx) return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
if err != nil { txID, err := pg.insertTx(ctx, tx, eventPayload)
pg.logg.Error("ERR0")
return err
}
defer func() {
if err != nil { if err != nil {
tx.Rollback(ctx) return err
} else {
tx.Commit(ctx)
} }
}()
var ( _, err = tx.Exec(
txID int ctx,
) pg.queries.InsertTokenTransfer,
if err := tx.QueryRow( txID,
ctx, eventPayload.Payload["from"].(string),
pg.queries.InsertTx, eventPayload.Payload["to"].(string),
eventPayload.TxHash, eventPayload.Payload["value"].(string),
eventPayload.Block, )
eventPayload.ContractAddress,
time.Unix(eventPayload.Timestamp, 0).UTC(),
eventPayload.Success,
).Scan(&txID); err != nil {
pg.logg.Error("ERR1")
return err return err
} })
_, err = tx.Exec(
ctx,
pg.queries.InsertTokenTransfer,
txID,
eventPayload.Payload["from"].(string),
eventPayload.Payload["to"].(string),
eventPayload.Payload["value"].(string),
)
if err != nil {
pg.logg.Error("ERR2")
return err
}
return nil
} }
func (pg *Pg) InsertTokenMint(ctx context.Context, eventPayload event.Event) error { func (pg *Pg) InsertTokenMint(ctx context.Context, eventPayload event.Event) error {
tx, err := pg.db.Begin(ctx) return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
if err != nil { txID, err := pg.insertTx(ctx, tx, eventPayload)
return err
}
defer func() {
if err != nil { if err != nil {
tx.Rollback(ctx) return err
} else {
tx.Commit(ctx)
} }
}()
var ( _, err = tx.Exec(
txID int ctx,
) pg.queries.InsertTokenMint,
if err := tx.QueryRow( txID,
ctx, eventPayload.Payload["tokenMinter"].(string),
pg.queries.InsertTx, eventPayload.Payload["to"].(string),
eventPayload.TxHash, eventPayload.Payload["value"].(string),
eventPayload.Block, )
eventPayload.ContractAddress,
time.Unix(eventPayload.Timestamp, 0).UTC(),
eventPayload.Success,
).Scan(&txID); err != nil {
return err return err
} })
}
_, err = tx.Exec( func (pg *Pg) InsertTokenBurn(ctx context.Context, eventPayload event.Event) error {
ctx, return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
pg.queries.InsertTokenMint, txID, err := pg.insertTx(ctx, tx, eventPayload)
txID, if err != nil {
eventPayload.Payload["tokenMinter"].(string), return err
eventPayload.Payload["to"].(string), }
eventPayload.Payload["value"].(string),
) _, err = tx.Exec(
if err != nil { ctx,
pg.queries.InsertTokenBurn,
txID,
eventPayload.Payload["tokenBurner"].(string),
eventPayload.Payload["value"].(string),
)
return err return err
} })
}
return nil func (pg *Pg) InsertFaucetGive(ctx context.Context, eventPayload event.Event) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
txID, err := pg.insertTx(ctx, tx, eventPayload)
if err != nil {
return err
}
_, err = tx.Exec(
ctx,
pg.queries.InsertFaucetGive,
txID,
eventPayload.Payload["token"].(string),
eventPayload.Payload["recipient"].(string),
eventPayload.Payload["amount"].(string),
)
return err
})
} }
func (pg *Pg) InsertPoolSwap(ctx context.Context, eventPayload event.Event) error { func (pg *Pg) InsertPoolSwap(ctx context.Context, eventPayload event.Event) error {
tx, err := pg.db.Begin(ctx) return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
if err != nil { txID, err := pg.insertTx(ctx, tx, eventPayload)
return err
}
defer func() {
if err != nil { if err != nil {
tx.Rollback(ctx) return err
} else {
tx.Commit(ctx)
} }
}()
var ( _, err = tx.Exec(
txID int ctx,
) pg.queries.InsertPoolSwap,
if err := tx.QueryRow( txID,
ctx, eventPayload.Payload["initiator"].(string),
pg.queries.InsertTx, eventPayload.Payload["tokenIn"].(string),
eventPayload.TxHash, eventPayload.Payload["tokenOut"].(string),
eventPayload.Block, eventPayload.Payload["amountIn"].(string),
eventPayload.ContractAddress, eventPayload.Payload["amountOut"].(string),
time.Unix(eventPayload.Timestamp, 0).UTC(), eventPayload.Payload["fee"].(string),
eventPayload.Success, )
).Scan(&txID); err != nil {
return err return err
} })
_, err = tx.Exec(
ctx,
pg.queries.InsertPoolSwap,
txID,
eventPayload.Payload["initiator"].(string),
eventPayload.Payload["tokenIn"].(string),
eventPayload.Payload["tokenOut"].(string),
eventPayload.Payload["amountIn"].(string),
eventPayload.Payload["amountOut"].(string),
eventPayload.Payload["fee"].(string),
)
if err != nil {
return err
}
return nil
} }
func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) error { func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
txID, err := pg.insertTx(ctx, tx, eventPayload)
if err != nil {
return err
}
_, err = tx.Exec(
ctx,
pg.queries.InsertPoolDeposit,
txID,
eventPayload.Payload["initiator"].(string),
eventPayload.Payload["tokenIn"].(string),
eventPayload.Payload["amountIn"].(string),
)
return err
})
}
func (pg *Pg) InsertPriceQuoteUpdate(ctx context.Context, eventPayload event.Event) error {
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
txID, err := pg.insertTx(ctx, tx, eventPayload)
if err != nil {
return err
}
_, err = tx.Exec(
ctx,
pg.queries.InsertPriceQuoteUpdate,
txID,
eventPayload.Payload["token"].(string),
eventPayload.Payload["exchangeRate"].(string),
)
return err
})
}
func (pg *Pg) insertTx(ctx context.Context, tx pgx.Tx, eventPayload event.Event) (int, error) {
var txID int
if err := tx.QueryRow(
ctx,
pg.queries.InsertTx,
eventPayload.TxHash,
eventPayload.Block,
eventPayload.ContractAddress,
time.Unix(int64(eventPayload.Timestamp), 0).UTC(),
eventPayload.Success,
).Scan(&txID); err != nil {
return 0, err
}
return txID, nil
}
func (pg *Pg) executeTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := pg.db.Begin(ctx) tx, err := pg.db.Begin(ctx)
if err != nil { if err != nil {
return err return err
@ -215,30 +229,7 @@ func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) e
} }
}() }()
var ( if err = fn(tx); err != nil {
txID int
)
if err := tx.QueryRow(
ctx,
pg.queries.InsertTx,
eventPayload.TxHash,
eventPayload.Block,
eventPayload.ContractAddress,
time.Unix(eventPayload.Timestamp, 0).UTC(),
eventPayload.Success,
).Scan(&txID); err != nil {
return err
}
_, err = tx.Exec(
ctx,
pg.queries.InsertPoolDeposit,
txID,
eventPayload.Payload["initiator"].(string),
eventPayload.Payload["tokenIn"].(string),
eventPayload.Payload["amountIn"].(string),
)
if err != nil {
return err return err
} }
@ -261,6 +252,8 @@ func loadQueries(queriesPath string) (*queries, error) {
} }
func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error { func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error {
const migratorTimeout = 5 * time.Second
ctx, cancel := context.WithTimeout(ctx, migratorTimeout) ctx, cancel := context.WithTimeout(ctx, migratorTimeout)
defer cancel() defer cancel()

View File

@ -3,14 +3,17 @@ package store
import ( import (
"context" "context"
"github.com/grassrootseconomics/celo-indexer/internal/event" "github.com/grassrootseconomics/celo-tracker/pkg/event"
) )
type ( type (
Store interface { Store interface {
InsertTokenTransfer(context.Context, event.Event) error InsertTokenTransfer(context.Context, event.Event) error
InsertTokenMint(context.Context, event.Event) error InsertTokenMint(context.Context, event.Event) error
InsertTokenBurn(context.Context, event.Event) error
InsertFaucetGive(context.Context, event.Event) error
InsertPoolSwap(context.Context, event.Event) error InsertPoolSwap(context.Context, event.Event) error
InsertPoolDeposit(context.Context, event.Event) error InsertPoolDeposit(context.Context, event.Event) error
InsertPriceQuoteUpdate(context.Context, event.Event) error
} }
) )

View File

@ -5,59 +5,71 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"log/slog" "log/slog"
"time"
"github.com/grassrootseconomics/celo-indexer/internal/event"
"github.com/grassrootseconomics/celo-indexer/internal/store" "github.com/grassrootseconomics/celo-indexer/internal/store"
"github.com/grassrootseconomics/celo-tracker/pkg/event"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) "github.com/nats-io/nats.go/jetstream"
const (
durableId = "celo-indexer-6"
pullStream = "TRACKER"
pullSubject = "TRACKER.*"
) )
type ( type (
JetStreamOpts struct { JetStreamOpts struct {
Logg *slog.Logger Store store.Store
Endpoint string Logg *slog.Logger
Store store.Store Endpoint string
JetStreamID string
} }
JetStreamSub struct { JetStreamSub struct {
natsConn *nats.Conn jsConsumer jetstream.Consumer
jsCtx nats.JetStreamContext store store.Store
store store.Store natsConn *nats.Conn
logg *slog.Logger logg *slog.Logger
durableID string
} }
) )
const (
pullStream = "TRACKER"
pullSubject = "TRACKER.*"
)
func NewJetStreamSub(o JetStreamOpts) (Sub, error) { func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
natsConn, err := nats.Connect(o.Endpoint) natsConn, err := nats.Connect(o.Endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
js, err := natsConn.JetStream() js, err := jetstream.New(natsConn)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := js.Stream(ctx, pullStream)
if err != nil {
return nil, err
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: o.JetStreamID,
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: pullStream,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
o.Logg.Info("successfully connected to NATS server") o.Logg.Info("successfully connected to NATS server")
_, err = js.AddConsumer(pullStream, &nats.ConsumerConfig{
Durable: durableId,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
return &JetStreamSub{ return &JetStreamSub{
natsConn: natsConn, jsConsumer: consumer,
jsCtx: js, store: o.Store,
store: o.Store, natsConn: natsConn,
logg: o.Logg, logg: o.Logg,
durableID: o.JetStreamID,
}, nil }, nil
} }
@ -68,18 +80,8 @@ func (s *JetStreamSub) Close() {
} }
func (s *JetStreamSub) Process() error { func (s *JetStreamSub) Process() error {
subOpts := []nats.SubOpt{
nats.ManualAck(),
nats.Bind(pullStream, durableId),
}
natsSub, err := s.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...)
if err != nil {
return err
}
for { for {
events, err := natsSub.Fetch(1) events, err := s.jsConsumer.Fetch(100, jetstream.FetchMaxWait(1*time.Second))
if err != nil { if err != nil {
if errors.Is(err, nats.ErrTimeout) { if errors.Is(err, nats.ErrTimeout) {
continue continue
@ -90,9 +92,8 @@ func (s *JetStreamSub) Process() error {
} }
} }
if len(events) > 0 { for msg := range events.Messages() {
msg := events[0] if err := s.processEventHandler(context.Background(), msg.Subject(), msg.Data()); err != nil {
if err := s.processEventHandler(context.Background(), msg); err != nil {
s.logg.Error("error processing nats message", "error", err) s.logg.Error("error processing nats message", "error", err)
msg.Nak() msg.Nak()
} else { } else {
@ -102,16 +103,14 @@ func (s *JetStreamSub) Process() error {
} }
} }
func (s *JetStreamSub) processEventHandler(ctx context.Context, msg *nats.Msg) error { func (s *JetStreamSub) processEventHandler(ctx context.Context, msgSubject string, msgData []byte) error {
var ( var chainEvent event.Event
chainEvent event.Event
)
if err := json.Unmarshal(msg.Data, &chainEvent); err != nil { if err := json.Unmarshal(msgData, &chainEvent); err != nil {
return err return err
} }
switch msg.Subject { switch msgSubject {
case "TRACKER.TOKEN_TRANSFER": case "TRACKER.TOKEN_TRANSFER":
if err := s.store.InsertTokenTransfer(ctx, chainEvent); err != nil { if err := s.store.InsertTokenTransfer(ctx, chainEvent); err != nil {
return err return err

View File

@ -58,6 +58,14 @@ CREATE TABLE IF NOT EXISTS pool_deposit (
in_value NUMERIC NOT NULL in_value NUMERIC NOT NULL
); );
CREATE TABLE IF NOT EXISTS price_index_updates (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tx_id INT REFERENCES tx(id),
token VARCHAR(42) NOT NULL,
exchange_rate NUMERIC NOT NULL
);
CREATE TABLE IF NOT EXISTS contracts ( CREATE TABLE IF NOT EXISTS contracts (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
contract_address VARCHAR(42) UNIQUE NOT NULL, contract_address VARCHAR(42) UNIQUE NOT NULL,

View File

@ -42,6 +42,28 @@ INSERT INTO token_mint(
mint_value mint_value
) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING ) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING
--name: insert-token-burn
-- $1: tx_id
-- $2: burner_address
-- $3: burn_value
INSERT INTO token_burn(
tx_id,
burner_address,
burn_value
) VALUES($1, $2, $3) ON CONFLICT DO NOTHING
--name: insert-faucet-give
-- $1: tx_id
-- $2: token_address
-- $3: recipient_address
-- $4: give_value
INSERT INTO faucet_give(
tx_id,
token_address,
recipient_address,
give_value
) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING
--name: insert-pool-swap --name: insert-pool-swap
-- $1: tx_id -- $1: tx_id
-- $2: initiator_address -- $2: initiator_address
@ -73,4 +95,14 @@ INSERT INTO pool_deposit(
initiator_address, initiator_address,
token_in_address, token_in_address,
in_value in_value
) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING ) VALUES($1, $2, $3, $4) ON CONFLICT DO NOTHING
--name: insert-price-quote-update
-- $1: tx_id
-- $2: token
-- $3: exchange_rate
INSERT INTO price_index_updates(
tx_id,
token,
exchange_rate,
) VALUES($1, $2, $3) ON CONFLICT DO NOTHING