307 lines
6.8 KiB
Go
307 lines
6.8 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/grassrootseconomics/eth-tracker/pkg/event"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/jackc/tern/v2/migrate"
|
|
"github.com/knadh/goyesql/v2"
|
|
)
|
|
|
|
type (
|
|
PgOpts struct {
|
|
Logg *slog.Logger
|
|
DSN string
|
|
MigrationsFolderPath string
|
|
QueriesFolderPath string
|
|
}
|
|
|
|
Pg struct {
|
|
logg *slog.Logger
|
|
db *pgxpool.Pool
|
|
queries *queries
|
|
}
|
|
|
|
queries struct {
|
|
InsertTx string `query:"insert-tx"`
|
|
InsertTokenTransfer string `query:"insert-token-transfer"`
|
|
InsertTokenMint string `query:"insert-token-mint"`
|
|
InsertTokenBurn string `query:"insert-token-burn"`
|
|
InsertFaucetGive string `query:"insert-faucet-give"`
|
|
InsertPoolSwap string `query:"insert-pool-swap"`
|
|
InsertPoolDeposit string `query:"insert-pool-deposit"`
|
|
InsertToken string `query:"insert-token"`
|
|
InsertPool string `query:"insert-pool"`
|
|
}
|
|
)
|
|
|
|
func NewPgStore(o PgOpts) (Store, error) {
|
|
parsedConfig, err := pgxpool.ParseConfig(o.DSN)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queries, err := loadQueries(o.QueriesFolderPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil {
|
|
return nil, err
|
|
}
|
|
o.Logg.Info("migrations ran successfully")
|
|
|
|
return &Pg{
|
|
logg: o.Logg,
|
|
db: dbPool,
|
|
queries: queries,
|
|
}, nil
|
|
}
|
|
|
|
func (pg *Pg) Close() {
|
|
pg.db.Close()
|
|
}
|
|
|
|
func (pg *Pg) Pool() *pgxpool.Pool {
|
|
return pg.db
|
|
}
|
|
|
|
func (pg *Pg) InsertTokenTransfer(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertTokenTransfer,
|
|
txID,
|
|
eventPayload.Payload["from"].(string),
|
|
eventPayload.Payload["to"].(string),
|
|
eventPayload.Payload["value"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertTokenMint(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertTokenMint,
|
|
txID,
|
|
eventPayload.Payload["tokenMinter"].(string),
|
|
eventPayload.Payload["to"].(string),
|
|
eventPayload.Payload["value"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertTokenBurn(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertTokenBurn,
|
|
txID,
|
|
eventPayload.Payload["tokenBurner"].(string),
|
|
eventPayload.Payload["value"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertFaucetGive(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertFaucetGive,
|
|
txID,
|
|
eventPayload.Payload["token"].(string),
|
|
eventPayload.Payload["recipient"].(string),
|
|
eventPayload.Payload["amount"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertPoolSwap(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertPoolSwap,
|
|
txID,
|
|
eventPayload.Payload["initiator"].(string),
|
|
eventPayload.Payload["tokenIn"].(string),
|
|
eventPayload.Payload["tokenOut"].(string),
|
|
eventPayload.Payload["amountIn"].(string),
|
|
eventPayload.Payload["amountOut"].(string),
|
|
eventPayload.Payload["fee"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertPoolDeposit(ctx context.Context, eventPayload event.Event) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
txID, err := pg.insertTx(ctx, tx, eventPayload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertPoolDeposit,
|
|
txID,
|
|
eventPayload.Payload["initiator"].(string),
|
|
eventPayload.Payload["tokenIn"].(string),
|
|
eventPayload.Payload["amountIn"].(string),
|
|
eventPayload.ContractAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertToken(ctx context.Context, contractAddress string, name string, symbol string, decimals uint8, sinkAddress string) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
_, err := tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertToken,
|
|
contractAddress,
|
|
name,
|
|
symbol,
|
|
decimals,
|
|
sinkAddress,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) InsertPool(ctx context.Context, contractAddress string, name string, symbol string) error {
|
|
return pg.executeTransaction(ctx, func(tx pgx.Tx) error {
|
|
_, err := tx.Exec(
|
|
ctx,
|
|
pg.queries.InsertPool,
|
|
contractAddress,
|
|
name,
|
|
symbol,
|
|
)
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (pg *Pg) insertTx(ctx context.Context, tx pgx.Tx, eventPayload event.Event) (int, error) {
|
|
var txID int
|
|
if err := tx.QueryRow(
|
|
ctx,
|
|
pg.queries.InsertTx,
|
|
eventPayload.TxHash,
|
|
eventPayload.Block,
|
|
time.Unix(int64(eventPayload.Timestamp), 0).UTC(),
|
|
eventPayload.Success,
|
|
).Scan(&txID); err != nil {
|
|
return 0, err
|
|
}
|
|
return txID, nil
|
|
}
|
|
|
|
func (pg *Pg) executeTransaction(ctx context.Context, fn func(tx pgx.Tx) error) error {
|
|
tx, err := pg.db.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
tx.Rollback(ctx)
|
|
} else {
|
|
tx.Commit(ctx)
|
|
}
|
|
}()
|
|
|
|
if err = fn(tx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func loadQueries(queriesPath string) (*queries, error) {
|
|
parsedQueries, err := goyesql.ParseFile(queriesPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
loadedQueries := &queries{}
|
|
|
|
if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil {
|
|
return nil, fmt.Errorf("failed to scan queries %v", err)
|
|
}
|
|
|
|
return loadedQueries, nil
|
|
}
|
|
|
|
func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error {
|
|
const migratorTimeout = 15 * time.Second
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, migratorTimeout)
|
|
defer cancel()
|
|
|
|
conn, err := dbPool.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := migrator.Migrate(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|