fix: queries
This commit is contained in:
parent
73d14ec021
commit
7e0fc95918
5
.env.example
Normal file
5
.env.example
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
REMOTE_DB_HOST=
|
||||||
|
REMOTE_DB_PORT=
|
||||||
|
REMOTE_DB_NAME=
|
||||||
|
REMOTE_DB_USER=
|
||||||
|
REMOTE_DB_PASSWORD=
|
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
tracker_db
|
||||||
|
db
|
||||||
|
.vscode
|
||||||
|
.idx
|
||||||
|
.env
|
@ -23,6 +23,7 @@ type (
|
|||||||
}
|
}
|
||||||
|
|
||||||
Pg struct {
|
Pg struct {
|
||||||
|
logg *slog.Logger
|
||||||
db *pgxpool.Pool
|
db *pgxpool.Pool
|
||||||
queries *queries
|
queries *queries
|
||||||
}
|
}
|
||||||
@ -62,6 +63,7 @@ func NewPgStore(o PgOpts) (Store, error) {
|
|||||||
o.Logg.Info("migrations ran successfully")
|
o.Logg.Info("migrations ran successfully")
|
||||||
|
|
||||||
return &Pg{
|
return &Pg{
|
||||||
|
logg: o.Logg,
|
||||||
db: dbPool,
|
db: dbPool,
|
||||||
queries: queries,
|
queries: queries,
|
||||||
}, nil
|
}, nil
|
||||||
@ -69,20 +71,21 @@ func NewPgStore(o PgOpts) (Store, error) {
|
|||||||
|
|
||||||
func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error {
|
func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error {
|
||||||
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
||||||
var addressExists bool
|
// var addressExists bool
|
||||||
|
|
||||||
if err := tx.QueryRow(
|
// if err := tx.QueryRow(
|
||||||
ctx,
|
// ctx,
|
||||||
pg.queries.CheckAddressExists,
|
// pg.queries.CheckAddressExists,
|
||||||
eventPayload.Payload["from"].(string),
|
// eventPayload.Payload["from"].(string),
|
||||||
eventPayload.Payload["to"].(string),
|
// eventPayload.Payload["to"].(string),
|
||||||
).Scan(&addressExists); err != nil {
|
// ).Scan(&addressExists); err != nil {
|
||||||
return err
|
// return err
|
||||||
}
|
// }
|
||||||
|
|
||||||
if !addressExists {
|
// if !addressExists {
|
||||||
return nil
|
// return nil
|
||||||
}
|
// }
|
||||||
|
// pg.logg.Debug("transfer address exists", "hash", eventPayload.TxHash)
|
||||||
|
|
||||||
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -278,7 +281,7 @@ func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const migratorTimeout = 5 * time.Second
|
const migratorTimeout = 15 * time.Second
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, migratorTimeout)
|
ctx, cancel := context.WithTimeout(ctx, migratorTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -55,9 +55,8 @@ func NewJetStreamSub(o JetStreamOpts) (Sub, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||||
Durable: o.JetStreamID,
|
Durable: o.JetStreamID,
|
||||||
AckPolicy: jetstream.AckExplicitPolicy,
|
AckPolicy: jetstream.AckExplicitPolicy,
|
||||||
FilterSubject: pullStream,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -4,9 +4,9 @@ CREATE EXTENSION IF NOT EXISTS postgres_fdw;
|
|||||||
-- link sarafu_network db
|
-- link sarafu_network db
|
||||||
CREATE SCHEMA IF NOT EXISTS sarafu_network;
|
CREATE SCHEMA IF NOT EXISTS sarafu_network;
|
||||||
CREATE SERVER IF NOT EXISTS sarafu_network_remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS
|
CREATE SERVER IF NOT EXISTS sarafu_network_remote FOREIGN DATA WRAPPER postgres_fdw OPTIONS
|
||||||
(host '{{.remote_db_host }}', port '{{.remote_db_port }}', dbname '{{.remote_db_name }}');
|
(host '{{env "REMOTE_DB_HOST" }}', port '{{env "REMOTE_DB_PORT" }}', dbname '{{env "REMOTE_DB_NAME" }}');
|
||||||
CREATE USER MAPPING IF NOT EXISTS FOR postgres SERVER sarafu_network_remote OPTIONS
|
CREATE USER MAPPING IF NOT EXISTS FOR postgres SERVER sarafu_network_remote OPTIONS
|
||||||
(user '{{.remote_db_user }}', password '{{.remote_db_password }}');
|
(user '{{env "REMOTE_DB_USER" }}', password '{{env "REMOTE_DB_PASSWORD" }}');
|
||||||
IMPORT FOREIGN SCHEMA public LIMIT TO (accounts) FROM SERVER sarafu_network_remote INTO sarafu_network;
|
IMPORT FOREIGN SCHEMA public LIMIT TO (accounts) FROM SERVER sarafu_network_remote INTO sarafu_network;
|
||||||
|
|
||||||
--
|
--
|
||||||
|
@ -1,6 +0,0 @@
|
|||||||
[data]
|
|
||||||
remote_db_host = {{env "REMOTE_DB_HOST"}}
|
|
||||||
remote_db_port = {{env "REMOTE_DB_PORT"}}
|
|
||||||
remote_db_name = {{env "REMOTE_DB_NAME"}}
|
|
||||||
remote_db_user = {{env "REMOTE_DB_USER"}}
|
|
||||||
remote_db_password = {{env "REMOTE_DB_PASSWORD"}}
|
|
@ -104,7 +104,7 @@ INSERT INTO pool_deposit(
|
|||||||
INSERT INTO price_index_updates(
|
INSERT INTO price_index_updates(
|
||||||
tx_id,
|
tx_id,
|
||||||
token,
|
token,
|
||||||
exchange_rate,
|
exchange_rate
|
||||||
) VALUES($1, $2, $3) ON CONFLICT DO NOTHING
|
) VALUES($1, $2, $3) ON CONFLICT DO NOTHING
|
||||||
|
|
||||||
--name: address-exists
|
--name: address-exists
|
||||||
|
Loading…
Reference in New Issue
Block a user