refactor (store): consolidate all pg store related actions

* All postgres related functions now live in internal/store.
* Updated queries.sql file to match struct order (readibility)
* Moved keystore -> store
* Moved queries -> store
* Removed pkg/postgres
This commit is contained in:
Mohamed Sohail 2023-04-11 10:14:49 +00:00
parent 82294b96f8
commit eba329eefa
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
22 changed files with 318 additions and 394 deletions

View File

@ -2,18 +2,17 @@ package main
import ( import (
"net/http" "net/http"
"time"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"github.com/grassrootseconomics/cic-custodial/internal/api" "github.com/grassrootseconomics/cic-custodial/internal/api"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/pkg/util"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware" "github.com/labstack/echo/v4/middleware"
) )
const ( const (
contextTimeout = 5 * time.Second
systemGlobalLockKey = "system:global_lock" systemGlobalLockKey = "system:global_lock"
) )
@ -31,7 +30,7 @@ func initApiServer(custodialContainer *custodial.Custodial) *echo.Echo {
server.Use(middleware.Recover()) server.Use(middleware.Recover())
server.Use(middleware.BodyLimit("1M")) server.Use(middleware.BodyLimit("1M"))
server.Use(middleware.ContextTimeout(contextTimeout)) server.Use(middleware.ContextTimeout(util.SLATimeout))
if ko.Bool("service.metrics") { if ko.Bool("service.metrics") {
server.GET("/metrics", func(c echo.Context) error { server.GET("/metrics", func(c echo.Context) error {

View File

@ -7,17 +7,12 @@ import (
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/sub" "github.com/grassrootseconomics/cic-custodial/internal/sub"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/logg"
"github.com/grassrootseconomics/cic-custodial/pkg/postgres"
"github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/knadh/goyesql/v2"
"github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/file"
@ -84,21 +79,6 @@ func initCeloProvider() *celoutils.Provider {
return provider return provider
} }
// Load postgres pool.
func initPostgresPool() *pgxpool.Pool {
poolOpts := postgres.PostgresPoolOpts{
DSN: ko.MustString("postgres.dsn"),
MigrationsFolderPath: migrationsFolderFlag,
}
pool, err := postgres.NewPostgresPool(context.Background(), poolOpts)
if err != nil {
lo.Fatal("init: critical error connecting to postgres", "error", err)
}
return pool
}
// Load separate redis connection for the tasker on a reserved db namespace. // Load separate redis connection for the tasker on a reserved db namespace.
func initAsynqRedisPool() *redis.RedisPool { func initAsynqRedisPool() *redis.RedisPool {
poolOpts := redis.RedisPoolOpts{ poolOpts := redis.RedisPoolOpts{
@ -129,31 +109,6 @@ func initCommonRedisPool() *redis.RedisPool {
return pool return pool
} }
// Load SQL statements into struct.
func initQueries() *queries.Queries {
parsedQueries, err := goyesql.ParseFile(queriesFlag)
if err != nil {
lo.Fatal("init: critical error loading SQL queries", "error", err)
}
loadedQueries, err := queries.LoadQueries(parsedQueries)
if err != nil {
lo.Fatal("init: critical error loading SQL queries", "error", err)
}
return loadedQueries
}
// Load postgres based keystore.
func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) keystore.Keystore {
keystore := keystore.NewPostgresKeytore(keystore.Opts{
PostgresPool: postgresPool,
Queries: queries,
})
return keystore
}
// Load redis backed noncestore. // Load redis backed noncestore.
func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore { func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore {
return nonce.NewRedisNoncestore(nonce.Opts{ return nonce.NewRedisNoncestore(nonce.Opts{
@ -174,11 +129,17 @@ func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient {
} }
// Load Postgres store. // Load Postgres store.
func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store { func initPgStore() store.Store {
return store.NewPostgresStore(store.Opts{ store, err := store.NewPgStore(store.Opts{
PostgresPool: postgresPool, DSN: ko.MustString("postgres.dsn"),
Queries: queries, MigrationsFolderPath: migrationsFolderFlag,
QueriesFolderPath: queriesFlag,
}) })
if err != nil {
lo.Fatal("init: critical error loading Postgres store", "error", err)
}
return store
} }
// Init JetStream context for both pub/sub. // Init JetStream context for both pub/sub.

View File

@ -46,14 +46,11 @@ func init() {
func main() { func main() {
lo.Info("main: starting cic-custodial", "build", build) lo.Info("main: starting cic-custodial", "build", build)
parsedQueries := initQueries()
celoProvider := initCeloProvider() celoProvider := initCeloProvider()
postgresPool := initPostgresPool()
asynqRedisPool := initAsynqRedisPool() asynqRedisPool := initAsynqRedisPool()
redisPool := initCommonRedisPool() redisPool := initCommonRedisPool()
postgresKeystore := initPostgresKeystore(postgresPool, parsedQueries) store := initPgStore()
pgStore := initPostgresStore(postgresPool, parsedQueries)
redisNoncestore := initRedisNoncestore(redisPool) redisNoncestore := initRedisNoncestore(redisPool)
lockProvider := initLockProvider(redisPool.Client) lockProvider := initLockProvider(redisPool.Client)
taskerClient := initTaskerClient(asynqRedisPool) taskerClient := initTaskerClient(asynqRedisPool)
@ -62,10 +59,9 @@ func main() {
custodial, err := custodial.NewCustodial(custodial.Opts{ custodial, err := custodial.NewCustodial(custodial.Opts{
CeloProvider: celoProvider, CeloProvider: celoProvider,
Keystore: postgresKeystore,
LockProvider: lockProvider, LockProvider: lockProvider,
Noncestore: redisNoncestore, Noncestore: redisNoncestore,
PgStore: pgStore, Store: store,
RedisClient: redisPool.Client, RedisClient: redisPool.Client,
RegistryAddress: ko.MustString("chain.registry_address"), RegistryAddress: ko.MustString("chain.registry_address"),
SystemPrivateKey: ko.MustString("system.private_key"), SystemPrivateKey: ko.MustString("system.private_key"),

View File

@ -22,7 +22,7 @@ func HandleAccountCreate(cu *custodial.Custodial) func(echo.Context) error {
return err return err
} }
id, err := cu.Keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair) id, err := cu.Store.WriteKeyPair(c.Request().Context(), generatedKeyPair)
if err != nil { if err != nil {
return err return err
} }

View File

@ -40,7 +40,7 @@ func HandleSignTransfer(cu *custodial.Custodial) func(echo.Context) error {
return err return err
} }
accountActive, gasQuota, err := cu.PgStore.GetAccountStatusByAddress(c.Request().Context(), req.From) accountActive, gasQuota, err := cu.Store.GetAccountStatus(c.Request().Context(), req.From)
if err != nil { if err != nil {
return err return err
} }

View File

@ -28,7 +28,7 @@ func HandleTrackTx(cu *custodial.Custodial) func(echo.Context) error {
return err return err
} }
txs, err := cu.PgStore.GetTxStatusByTrackingId(c.Request().Context(), txStatusRequest.TrackingId) txs, err := cu.Store.GetTxStatus(c.Request().Context(), txStatusRequest.TrackingId)
if err != nil { if err != nil {
return err return err
} }
@ -36,7 +36,7 @@ func HandleTrackTx(cu *custodial.Custodial) func(echo.Context) error {
return c.JSON(http.StatusOK, OkResp{ return c.JSON(http.StatusOK, OkResp{
Ok: true, Ok: true,
Result: H{ Result: H{
"transactions": txs, "transaction": txs,
}, },
}) })
} }

View File

@ -3,16 +3,15 @@ package custodial
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/common"
eth_crypto "github.com/celo-org/celo-blockchain/crypto" eth_crypto "github.com/celo-org/celo-blockchain/crypto"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/util"
"github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch"
"github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/labstack/gommon/log" "github.com/labstack/gommon/log"
@ -22,10 +21,9 @@ import (
type ( type (
Opts struct { Opts struct {
CeloProvider *celoutils.Provider CeloProvider *celoutils.Provider
Keystore keystore.Keystore
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore
PgStore store.Store Store store.Store
RedisClient *redis.Client RedisClient *redis.Client
RegistryAddress string RegistryAddress string
SystemPrivateKey string SystemPrivateKey string
@ -36,10 +34,9 @@ type (
Custodial struct { Custodial struct {
Abis map[string]*w3.Func Abis map[string]*w3.Func
CeloProvider *celoutils.Provider CeloProvider *celoutils.Provider
Keystore keystore.Keystore
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore
PgStore store.Store Store store.Store
RedisClient *redis.Client RedisClient *redis.Client
RegistryMap map[string]common.Address RegistryMap map[string]common.Address
SystemPrivateKey *ecdsa.PrivateKey SystemPrivateKey *ecdsa.PrivateKey
@ -49,7 +46,7 @@ type (
) )
func NewCustodial(o Opts) (*Custodial, error) { func NewCustodial(o Opts) (*Custodial, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), util.SLATimeout)
defer cancel() defer cancel()
registryMap, err := o.CeloProvider.RegistryMap(ctx, celoutils.HexToAddress(o.RegistryAddress)) registryMap, err := o.CeloProvider.RegistryMap(ctx, celoutils.HexToAddress(o.RegistryAddress))
@ -86,10 +83,9 @@ func NewCustodial(o Opts) (*Custodial, error) {
return &Custodial{ return &Custodial{
Abis: initAbis(), Abis: initAbis(),
CeloProvider: o.CeloProvider, CeloProvider: o.CeloProvider,
Keystore: o.Keystore,
LockProvider: o.LockProvider, LockProvider: o.LockProvider,
Noncestore: o.Noncestore, Noncestore: o.Noncestore,
PgStore: o.PgStore, Store: o.Store,
RedisClient: o.RedisClient, RedisClient: o.RedisClient,
RegistryMap: registryMap, RegistryMap: registryMap,
SystemPrivateKey: privateKey, SystemPrivateKey: privateKey,

View File

@ -1,14 +0,0 @@
package keystore
import (
"context"
"crypto/ecdsa"
"github.com/grassrootseconomics/cic-custodial/pkg/keypair"
)
// Keystore defines how keypairs should be stored and accessed from a storage backend.
type Keystore interface {
WriteKeyPair(context.Context, keypair.Key) (uint, error)
LoadPrivateKey(context.Context, string) (*ecdsa.PrivateKey, error)
}

View File

@ -1,61 +0,0 @@
package keystore
import (
"context"
"crypto/ecdsa"
eth_crypto "github.com/celo-org/celo-blockchain/crypto"
"github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/grassrootseconomics/cic-custodial/pkg/keypair"
"github.com/jackc/pgx/v5/pgxpool"
)
type (
Opts struct {
PostgresPool *pgxpool.Pool
Queries *queries.Queries
}
PostgresKeystore struct {
db *pgxpool.Pool
queries *queries.Queries
}
)
func NewPostgresKeytore(o Opts) Keystore {
return &PostgresKeystore{
db: o.PostgresPool,
queries: o.Queries,
}
}
// WriteKeyPair inserts a keypair into the db and returns the linked id.
func (ks *PostgresKeystore) WriteKeyPair(ctx context.Context, keypair keypair.Key) (uint, error) {
var (
id uint
)
if err := ks.db.QueryRow(ctx, ks.queries.WriteKeyPair, keypair.Public, keypair.Private).Scan(&id); err != nil {
return id, err
}
return id, nil
}
// LoadPrivateKey loads a private key as a crypto primitive for direct use. An id is used to search for the private key.
func (ks *PostgresKeystore) LoadPrivateKey(ctx context.Context, publicKey string) (*ecdsa.PrivateKey, error) {
var (
privateKeyString string
)
if err := ks.db.QueryRow(ctx, ks.queries.LoadKeyPair, publicKey).Scan(&privateKeyString); err != nil {
return nil, err
}
privateKey, err := eth_crypto.HexToECDSA(privateKeyString)
if err != nil {
return nil, err
}
return privateKey, nil
}

View File

@ -1,33 +0,0 @@
package queries
import (
"fmt"
"github.com/knadh/goyesql/v2"
)
type Queries struct {
// Keystore
WriteKeyPair string `query:"write-key-pair"`
LoadKeyPair string `query:"load-key-pair"`
// Store
CreateOTX string `query:"create-otx"`
CreateDispatchStatus string `query:"create-dispatch-status"`
ActivateAccount string `query:"activate-account"`
UpdateChainStatus string `query:"update-chain-status"`
GetTxStatusByTrackingId string `query:"get-tx-status-by-tracking-id"`
GetAccountActivationQuorum string `query:"get-account-activation-quorum"`
GetAccountStatus string `query:"get-account-status-by-address"`
DecrGasQuota string `query:"decr-gas-quota"`
ResetGasQuota string `query:"reset-gas-quota"`
}
func LoadQueries(q goyesql.Queries) (*Queries, error) {
loadedQueries := &Queries{}
if err := goyesql.ScanToStruct(loadedQueries, q, nil); err != nil {
return nil, fmt.Errorf("failed to scan queries %v", err)
}
return loadedQueries, nil
}

View File

@ -4,32 +4,48 @@ import (
"context" "context"
) )
func (s *PostgresStore) GetAccountStatusByAddress(ctx context.Context, publicAddress string) (bool, int, error) { func (s *PgStore) ActivateAccount(
ctx context.Context,
publicAddress string,
) error {
if _, err := s.db.Exec(
ctx,
s.queries.ActivateAccount,
publicAddress,
); err != nil {
return err
}
return nil
}
func (s *PgStore) GetAccountStatus(
ctx context.Context,
publicAddress string,
) (bool, int, error) {
var ( var (
accountActive bool accountActive bool
gasQuota int gasQuota int
) )
if err := s.db.QueryRow(ctx, s.queries.GetAccountStatus, publicAddress).Scan(&accountActive, &gasQuota); err != nil { if err := s.db.QueryRow(
ctx,
s.queries.GetAccountStatus,
publicAddress,
).Scan(
&accountActive,
&gasQuota,
); err != nil {
return false, 0, err return false, 0, err
} }
return accountActive, gasQuota, nil return accountActive, gasQuota, nil
} }
func (s *PostgresStore) GetAccountActivationQuorum(ctx context.Context, trackingId string) (int, error) { func (s *PgStore) DecrGasQuota(
var ( ctx context.Context,
quorum int publicAddress string,
) ) error {
if err := s.db.QueryRow(ctx, s.queries.GetAccountActivationQuorum, trackingId).Scan(&quorum); err != nil {
return 0, err
}
return quorum, nil
}
func (s *PostgresStore) DecrGasQuota(ctx context.Context, publicAddress string) error {
if _, err := s.db.Exec( if _, err := s.db.Exec(
ctx, ctx,
s.queries.DecrGasQuota, s.queries.DecrGasQuota,
@ -41,7 +57,10 @@ func (s *PostgresStore) DecrGasQuota(ctx context.Context, publicAddress string)
return nil return nil
} }
func (s *PostgresStore) ResetGasQuota(ctx context.Context, publicAddress string) error { func (s *PgStore) ResetGasQuota(
ctx context.Context,
publicAddress string,
) error {
if _, err := s.db.Exec( if _, err := s.db.Exec(
ctx, ctx,
s.queries.ResetGasQuota, s.queries.ResetGasQuota,
@ -52,15 +71,3 @@ func (s *PostgresStore) ResetGasQuota(ctx context.Context, publicAddress string)
return nil return nil
} }
func (s *PostgresStore) ActivateAccount(ctx context.Context, publicAddress string) error {
if _, err := s.db.Exec(
ctx,
s.queries.ActivateAccount,
publicAddress,
); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,53 @@
package store
import (
"context"
"crypto/ecdsa"
eth_crypto "github.com/celo-org/celo-blockchain/crypto"
"github.com/grassrootseconomics/cic-custodial/pkg/keypair"
)
func (s *PgStore) WriteKeyPair(
ctx context.Context,
keypair keypair.Key,
) (uint, error) {
var (
id uint
)
if err := s.db.QueryRow(
ctx,
s.queries.WriteKeyPair,
keypair.Public,
keypair.Private,
).Scan(&id); err != nil {
return id, err
}
return id, nil
}
func (s *PgStore) LoadPrivateKey(
ctx context.Context,
publicKey string,
) (*ecdsa.PrivateKey, error) {
var (
privateKeyString string
)
if err := s.db.QueryRow(
ctx,
s.queries.LoadKeyPair,
publicKey,
).Scan(&privateKeyString); err != nil {
return nil, err
}
privateKey, err := eth_crypto.HexToECDSA(privateKeyString)
if err != nil {
return nil, err
}
return privateKey, nil
}

View File

@ -2,21 +2,39 @@ package store
import ( import (
"context" "context"
"math/big"
"time" "time"
"github.com/georgysavva/scany/v2/pgxscan" "github.com/georgysavva/scany/v2/pgxscan"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
) )
type TxStatus struct { type (
Type string `db:"type" json:"txType"` Otx struct {
TxHash string `db:"tx_hash" json:"txHash"` TrackingId string
TransferValue uint64 `db:"transfer_value" json:"transferValue"` Type enum.OtxType
CreatedAt time.Time `db:"created_at" json:"createdAt"` RawTx string
Status string `db:"status" json:"status"` TxHash string
} From string
Data string
GasLimit uint64
TransferValue uint64
GasPrice *big.Int
Nonce uint64
}
txStatus struct {
CreatedAt time.Time `db:"created_at" json:"createdAt"`
Status string `db:"status" json:"status"`
TransferValue uint64 `db:"transfer_value" json:"transferValue"`
TxHash string `db:"tx_hash" json:"txHash"`
Type string `db:"type" json:"txType"`
}
)
func (s *PostgresStore) CreateOtx(ctx context.Context, otx OTX) (uint, error) { func (s *PgStore) CreateOtx(
ctx context.Context,
otx Otx,
) (uint, error) {
var ( var (
id uint id uint
) )
@ -34,37 +52,52 @@ func (s *PostgresStore) CreateOtx(ctx context.Context, otx OTX) (uint, error) {
otx.GasLimit, otx.GasLimit,
otx.TransferValue, otx.TransferValue,
otx.Nonce, otx.Nonce,
).Scan(&id); err != nil { ).Scan(
&id,
); err != nil {
return id, err return id, err
} }
return id, nil return id, nil
} }
func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error) { func (s *PgStore) GetTxStatus(
ctx context.Context,
trackingId string,
) (txStatus, error) {
var ( var (
txs []*TxStatus tx txStatus
) )
if err := pgxscan.Select( rows, err := s.db.Query(
ctx, ctx,
s.db,
&txs,
s.queries.GetTxStatusByTrackingId, s.queries.GetTxStatusByTrackingId,
trackingId, trackingId,
); err != nil { )
return nil, err if err != nil {
return tx, err
} }
return txs, nil if err := pgxscan.ScanOne(
&tx,
rows,
); err != nil {
return tx, err
}
return tx, nil
} }
func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error { func (s *PgStore) CreateDispatchStatus(
ctx context.Context,
otxId uint,
otxStatus enum.OtxStatus,
) error {
if _, err := s.db.Exec( if _, err := s.db.Exec(
ctx, ctx,
s.queries.CreateDispatchStatus, s.queries.CreateDispatchStatus,
dispatch.OtxId, otxId,
dispatch.Status, otxStatus,
); err != nil { ); err != nil {
return err return err
} }
@ -72,21 +105,26 @@ func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch Dispa
return nil return nil
} }
func (s *PostgresStore) UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error { func (s *PgStore) UpdateDispatchStatus(
ctx context.Context,
txSuccess bool,
txHash string,
txBlock uint64,
) error {
var ( var (
status = enum.SUCCESS status = enum.SUCCESS
) )
if !chainEvent.Success { if !txSuccess {
status = enum.REVERTED status = enum.REVERTED
} }
if _, err := s.db.Exec( if _, err := s.db.Exec(
ctx, ctx,
s.queries.UpdateChainStatus, s.queries.UpdateDispatchStatus,
chainEvent.TxHash, txHash,
status, status,
chainEvent.Block, txBlock,
); err != nil { ); err != nil {
return err return err
} }

View File

@ -1,25 +0,0 @@
package store
import (
"github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/jackc/pgx/v5/pgxpool"
)
type (
Opts struct {
PostgresPool *pgxpool.Pool
Queries *queries.Queries
}
PostgresStore struct {
db *pgxpool.Pool
queries *queries.Queries
}
)
func NewPostgresStore(o Opts) Store {
return &PostgresStore{
db: o.PostgresPool,
queries: o.Queries,
}
}

View File

@ -2,49 +2,127 @@ package store
import ( import (
"context" "context"
"math/big" "crypto/ecdsa"
"fmt"
"os"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
"github.com/grassrootseconomics/cic-custodial/pkg/keypair"
"github.com/grassrootseconomics/cic-custodial/pkg/util"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate"
"github.com/knadh/goyesql/v2"
) )
type ( type (
MinimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}
OTX struct {
TrackingId string
Type enum.OtxType
RawTx string
TxHash string
From string
Data string
GasLimit uint64
TransferValue uint64
GasPrice *big.Int
Nonce uint64
}
DispatchStatus struct {
OtxId uint
Status enum.OtxStatus
}
Store interface { Store interface {
CreateOtx(ctx context.Context, otx OTX) (id uint, err error) // Keypair related actions.
CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error LoadPrivateKey(context.Context, string) (*ecdsa.PrivateKey, error)
GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error) WriteKeyPair(context.Context, keypair.Key) (uint, error)
UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error // Otx related actions.
GetAccountStatusByAddress(ctx context.Context, publicAddress string) (bool, int, error) CreateOtx(context.Context, Otx) (uint, error)
GetAccountActivationQuorum(ctx context.Context, trackingId string) (int, error) GetTxStatus(context.Context, string) (txStatus, error)
DecrGasQuota(ctx context.Context, publicAddress string) error CreateDispatchStatus(context.Context, uint, enum.OtxStatus) error
ResetGasQuota(ctx context.Context, publicAddress string) error UpdateDispatchStatus(context.Context, bool, string, uint64) error
ActivateAccount(ctx context.Context, publicAddress string) error // Account related actions.
ActivateAccount(context.Context, string) error
GetAccountStatus(context.Context, string) (bool, int, error)
// Gas quota related actions.
DecrGasQuota(context.Context, string) error
ResetGasQuota(context.Context, string) error
}
Opts struct {
DSN string
MigrationsFolderPath string
QueriesFolderPath string
}
PgStore struct {
db *pgxpool.Pool
queries *queries
}
queries struct {
// Keystore related queries.
WriteKeyPair string `query:"write-key-pair"`
LoadKeyPair string `query:"load-key-pair"`
// Otx related queries.
CreateOTX string `query:"create-otx"`
GetTxStatusByTrackingId string `query:"get-tx-status-by-tracking-id"`
CreateDispatchStatus string `query:"create-dispatch-status"`
UpdateDispatchStatus string `query:"update-dispatch-status"`
// Account related queries.
ActivateAccount string `query:"activate-account"`
GetAccountStatus string `query:"get-account-status-by-address"`
DecrGasQuota string `query:"decr-gas-quota"`
ResetGasQuota string `query:"reset-gas-quota"`
} }
) )
func NewPgStore(o Opts) (Store, error) {
parsedConfig, err := pgxpool.ParseConfig(o.DSN)
if err != nil {
return nil, err
}
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
if err != nil {
return nil, err
}
queries, err := loadQueries(o.QueriesFolderPath)
if err != nil {
return nil, err
}
if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil {
return nil, err
}
return &PgStore{
db: dbPool,
queries: queries,
}, nil
}
func loadQueries(queriesPath string) (*queries, error) {
parsedQueries, err := goyesql.ParseFile(queriesPath)
if err != nil {
return nil, err
}
loadedQueries := &queries{}
if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil {
return nil, fmt.Errorf("failed to scan queries %v", err)
}
return loadedQueries, nil
}
func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error {
ctx, cancel := context.WithTimeout(ctx, util.SLATimeout)
defer cancel()
conn, err := dbPool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version")
if err != nil {
return err
}
if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil {
return err
}
if err := migrator.Migrate(ctx); err != nil {
return err
}
return nil
}

View File

@ -11,7 +11,7 @@ import (
) )
const ( const (
retryRequeueInterval = 2 * time.Second retryRequeueInterval = 1 * time.Second
) )
type TaskerServerOpts struct { type TaskerServerOpts struct {

View File

@ -35,7 +35,7 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a
return err return err
} }
_, gasQuota, err := cu.PgStore.GetAccountStatusByAddress(ctx, payload.PublicKey) _, gasQuota, err := cu.Store.GetAccountStatus(ctx, payload.PublicKey)
if err != nil { if err != nil {
return err return err
} }
@ -142,7 +142,7 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a
return err return err
} }
id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ id, err := cu.Store.CreateOtx(ctx, store.Otx{
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
Type: enum.REFILL_GAS, Type: enum.REFILL_GAS,
RawTx: hexutil.Encode(rawTx), RawTx: hexutil.Encode(rawTx),

View File

@ -82,7 +82,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte
return err return err
} }
id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ id, err := cu.Store.CreateOtx(ctx, store.Otx{
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
Type: enum.ACCOUNT_REGISTER, Type: enum.ACCOUNT_REGISTER,
RawTx: hexutil.Encode(rawTx), RawTx: hexutil.Encode(rawTx),
@ -96,7 +96,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte
if err != nil { if err != nil {
return err return err
} }
disptachJobPayload, err := json.Marshal(TxPayload{ disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id, OtxId: id,
Tx: builtTx, Tx: builtTx,

View File

@ -9,7 +9,6 @@ import (
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
"github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/grassrootseconomics/w3-celo-patch/module/eth"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
@ -24,41 +23,37 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro
return func(ctx context.Context, t *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error {
var ( var (
payload TxPayload payload TxPayload
dispatchStatus store.DispatchStatus
dispathchTx common.Hash dispathchTx common.Hash
dispatchStatus enum.OtxStatus = enum.IN_NETWORK
) )
if err := json.Unmarshal(t.Payload(), &payload); err != nil { if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return err return err
} }
dispatchStatus.OtxId = payload.OtxId
if err := cu.CeloProvider.Client.CallCtx( if err := cu.CeloProvider.Client.CallCtx(
ctx, ctx,
eth.SendTx(payload.Tx).Returns(&dispathchTx), eth.SendTx(payload.Tx).Returns(&dispathchTx),
); err != nil { ); err != nil {
switch err.Error() { switch err.Error() {
case celoutils.ErrGasPriceLow: case celoutils.ErrGasPriceLow:
dispatchStatus.Status = enum.FAIL_LOW_GAS_PRICE dispatchStatus = enum.FAIL_LOW_GAS_PRICE
case celoutils.ErrInsufficientGas: case celoutils.ErrInsufficientGas:
dispatchStatus.Status = enum.FAIL_NO_GAS dispatchStatus = enum.FAIL_NO_GAS
case celoutils.ErrNonceLow: case celoutils.ErrNonceLow:
dispatchStatus.Status = enum.FAIL_LOW_NONCE dispatchStatus = enum.FAIL_LOW_NONCE
default: default:
dispatchStatus.Status = enum.FAIL_UNKNOWN_RPC_ERROR dispatchStatus = enum.FAIL_UNKNOWN_RPC_ERROR
} }
if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil { if err := cu.Store.CreateDispatchStatus(ctx, payload.OtxId, dispatchStatus); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }
dispatchStatus.Status = enum.IN_NETWORK if err := cu.Store.CreateDispatchStatus(ctx, payload.OtxId, dispatchStatus); err != nil {
if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }

View File

@ -47,7 +47,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er
} }
defer lock.Release(ctx) defer lock.Release(ctx)
key, err := cu.Keystore.LoadPrivateKey(ctx, payload.From) key, err := cu.Store.LoadPrivateKey(ctx, payload.From)
if err != nil { if err != nil {
return err return err
} }
@ -89,7 +89,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er
return err return err
} }
id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ id, err := cu.Store.CreateOtx(ctx, store.Otx{
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
Type: enum.TRANSFER_VOUCHER, Type: enum.TRANSFER_VOUCHER,
RawTx: hexutil.Encode(rawTx), RawTx: hexutil.Encode(rawTx),
@ -105,7 +105,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er
return err return err
} }
if err := cu.PgStore.DecrGasQuota(ctx, payload.From); err != nil { if err := cu.Store.DecrGasQuota(ctx, payload.From); err != nil {
return err return err
} }

View File

@ -1,56 +0,0 @@
package postgres
import (
"context"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate"
)
const (
schemaTable = "schema_version"
)
type PostgresPoolOpts struct {
DSN string
MigrationsFolderPath string
}
// NewPostgresPool creates a reusbale connection pool across the cic-custodial component.
func NewPostgresPool(ctx context.Context, o PostgresPoolOpts) (*pgxpool.Pool, error) {
parsedConfig, err := pgxpool.ParseConfig(o.DSN)
if err != nil {
return nil, err
}
dbPool, err := pgxpool.NewWithConfig(ctx, parsedConfig)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
conn, err := dbPool.Acquire(ctx)
if err != nil {
return nil, err
}
defer conn.Release()
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), schemaTable)
if err != nil {
return nil, err
}
if err := migrator.LoadMigrations(os.DirFS(o.MigrationsFolderPath)); err != nil {
return nil, err
}
if err := migrator.Migrate(ctx); err != nil {
return nil, err
}
return dbPool, nil
}

View File

@ -9,11 +9,6 @@ INSERT INTO keystore(public_key, private_key) VALUES($1, $2) RETURNING id
-- $1: public_key -- $1: public_key
SELECT private_key FROM keystore WHERE public_key=$1 SELECT private_key FROM keystore WHERE public_key=$1
--name: activate-account
-- Activate an account following successful quorum
-- $1: public_key
UPDATE keystore SET active = true WHERE public_key=$1
--name: create-otx --name: create-otx
-- Create a new locally originating tx -- Create a new locally originating tx
-- $1: tracking_id -- $1: tracking_id
@ -39,6 +34,13 @@ INSERT INTO otx_sign(
nonce nonce
) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id
--name: get-tx-status-by-tracking-id
-- Gets tx status's from possible multiple txs with the same tracking_id
-- $1: tracking_id
SELECT otx_sign.type, otx_sign.tx_hash, otx_sign.transfer_value, otx_sign.created_at, otx_dispatch.status FROM otx_sign
INNER JOIN otx_dispatch ON otx_sign.id = otx_dispatch.otx_id
WHERE otx_sign.tracking_id=$1
--name: create-dispatch-status --name: create-dispatch-status
-- Create a new dispatch status -- Create a new dispatch status
-- $1: otx_id -- $1: otx_id
@ -48,7 +50,7 @@ INSERT INTO otx_dispatch(
"status" "status"
) VALUES($1, $2) RETURNING id ) VALUES($1, $2) RETURNING id
--name: update-chain-status --name: update-dispatch-status
-- Updates the status of the dispatched tx with the chain mine status -- Updates the status of the dispatched tx with the chain mine status
-- $1: tx_hash -- $1: tx_hash
-- $2: status -- $2: status
@ -60,22 +62,10 @@ UPDATE otx_dispatch SET "status" = $2, "block" = $3 WHERE otx_dispatch.id = (
AND otx_dispatch.status = 'IN_NETWORK' AND otx_dispatch.status = 'IN_NETWORK'
) )
--name: get-tx-status-by-tracking-id --name: activate-account
-- Gets tx status's from possible multiple txs with the same tracking_id -- Activate an account following successful quorum
-- $1: tracking_id -- $1: public_key
SELECT otx_sign.type, otx_sign.tx_hash, otx_sign.transfer_value, otx_sign.created_at, otx_dispatch.status FROM otx_sign UPDATE keystore SET active = true WHERE public_key=$1
INNER JOIN otx_dispatch ON otx_sign.id = otx_dispatch.otx_id
WHERE otx_sign.tracking_id=$1
-- TODO: Scroll by status type with cursor pagination
--name: get-account-activation-quorum
-- Gets quorum of required and confirmed system transactions for the account
-- $1: tracking_id
SELECT count(*) FROM otx_dispatch INNER JOIN
otx_sign ON otx_dispatch.otx_id = otx_sign.id
WHERE otx_sign.tracking_id=$1
AND otx_dispatch.status = 'SUCCESS'
--name: get-account-status-by-address --name: get-account-status-by-address
-- Gets current gas quota for an individual account by address -- Gets current gas quota for an individual account by address
@ -101,4 +91,4 @@ FROM gas_quota_meta
WHERE key_id = ( WHERE key_id = (
SELECT id FROM keystore SELECT id FROM keystore
WHERE public_key=$1 WHERE public_key=$1
) )