diff --git a/cmd/service/api.go b/cmd/service/api.go index 537e219..b634c73 100644 --- a/cmd/service/api.go +++ b/cmd/service/api.go @@ -2,18 +2,17 @@ package main import ( "net/http" - "time" "github.com/VictoriaMetrics/metrics" "github.com/go-playground/validator/v10" "github.com/grassrootseconomics/cic-custodial/internal/api" "github.com/grassrootseconomics/cic-custodial/internal/custodial" + "github.com/grassrootseconomics/cic-custodial/pkg/util" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ) const ( - contextTimeout = 5 * time.Second systemGlobalLockKey = "system:global_lock" ) @@ -31,7 +30,7 @@ func initApiServer(custodialContainer *custodial.Custodial) *echo.Echo { server.Use(middleware.Recover()) server.Use(middleware.BodyLimit("1M")) - server.Use(middleware.ContextTimeout(contextTimeout)) + server.Use(middleware.ContextTimeout(util.SLATimeout)) if ko.Bool("service.metrics") { server.GET("/metrics", func(c echo.Context) error { diff --git a/cmd/service/init.go b/cmd/service/init.go index 0286883..c745855 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -7,17 +7,12 @@ import ( "github.com/bsm/redislock" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/sub" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/pkg/logg" - "github.com/grassrootseconomics/cic-custodial/pkg/postgres" "github.com/grassrootseconomics/cic-custodial/pkg/redis" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/knadh/goyesql/v2" "github.com/knadh/koanf/parsers/toml" "github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/file" @@ -84,21 +79,6 @@ func initCeloProvider() *celoutils.Provider { return provider } -// Load postgres pool. -func initPostgresPool() *pgxpool.Pool { - poolOpts := postgres.PostgresPoolOpts{ - DSN: ko.MustString("postgres.dsn"), - MigrationsFolderPath: migrationsFolderFlag, - } - - pool, err := postgres.NewPostgresPool(context.Background(), poolOpts) - if err != nil { - lo.Fatal("init: critical error connecting to postgres", "error", err) - } - - return pool -} - // Load separate redis connection for the tasker on a reserved db namespace. func initAsynqRedisPool() *redis.RedisPool { poolOpts := redis.RedisPoolOpts{ @@ -129,31 +109,6 @@ func initCommonRedisPool() *redis.RedisPool { return pool } -// Load SQL statements into struct. -func initQueries() *queries.Queries { - parsedQueries, err := goyesql.ParseFile(queriesFlag) - if err != nil { - lo.Fatal("init: critical error loading SQL queries", "error", err) - } - - loadedQueries, err := queries.LoadQueries(parsedQueries) - if err != nil { - lo.Fatal("init: critical error loading SQL queries", "error", err) - } - - return loadedQueries -} - -// Load postgres based keystore. -func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) keystore.Keystore { - keystore := keystore.NewPostgresKeytore(keystore.Opts{ - PostgresPool: postgresPool, - Queries: queries, - }) - - return keystore -} - // Load redis backed noncestore. func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore { return nonce.NewRedisNoncestore(nonce.Opts{ @@ -174,11 +129,17 @@ func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient { } // Load Postgres store. -func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store { - return store.NewPostgresStore(store.Opts{ - PostgresPool: postgresPool, - Queries: queries, +func initPgStore() store.Store { + store, err := store.NewPgStore(store.Opts{ + DSN: ko.MustString("postgres.dsn"), + MigrationsFolderPath: migrationsFolderFlag, + QueriesFolderPath: queriesFlag, }) + if err != nil { + lo.Fatal("init: critical error loading Postgres store", "error", err) + } + + return store } // Init JetStream context for both pub/sub. diff --git a/cmd/service/main.go b/cmd/service/main.go index bf7d5f5..e233f9d 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -46,14 +46,11 @@ func init() { func main() { lo.Info("main: starting cic-custodial", "build", build) - parsedQueries := initQueries() celoProvider := initCeloProvider() - postgresPool := initPostgresPool() asynqRedisPool := initAsynqRedisPool() redisPool := initCommonRedisPool() - postgresKeystore := initPostgresKeystore(postgresPool, parsedQueries) - pgStore := initPostgresStore(postgresPool, parsedQueries) + store := initPgStore() redisNoncestore := initRedisNoncestore(redisPool) lockProvider := initLockProvider(redisPool.Client) taskerClient := initTaskerClient(asynqRedisPool) @@ -62,10 +59,9 @@ func main() { custodial, err := custodial.NewCustodial(custodial.Opts{ CeloProvider: celoProvider, - Keystore: postgresKeystore, LockProvider: lockProvider, Noncestore: redisNoncestore, - PgStore: pgStore, + Store: store, RedisClient: redisPool.Client, RegistryAddress: ko.MustString("chain.registry_address"), SystemPrivateKey: ko.MustString("system.private_key"), diff --git a/internal/api/account.go b/internal/api/account.go index 8438a74..5b0943b 100644 --- a/internal/api/account.go +++ b/internal/api/account.go @@ -22,7 +22,7 @@ func HandleAccountCreate(cu *custodial.Custodial) func(echo.Context) error { return err } - id, err := cu.Keystore.WriteKeyPair(c.Request().Context(), generatedKeyPair) + id, err := cu.Store.WriteKeyPair(c.Request().Context(), generatedKeyPair) if err != nil { return err } diff --git a/internal/api/sign.go b/internal/api/sign.go index 2f3abd3..8cb9958 100644 --- a/internal/api/sign.go +++ b/internal/api/sign.go @@ -40,7 +40,7 @@ func HandleSignTransfer(cu *custodial.Custodial) func(echo.Context) error { return err } - accountActive, gasQuota, err := cu.PgStore.GetAccountStatusByAddress(c.Request().Context(), req.From) + accountActive, gasQuota, err := cu.Store.GetAccountStatus(c.Request().Context(), req.From) if err != nil { return err } diff --git a/internal/api/track.go b/internal/api/track.go index 6bff890..1463517 100644 --- a/internal/api/track.go +++ b/internal/api/track.go @@ -28,7 +28,7 @@ func HandleTrackTx(cu *custodial.Custodial) func(echo.Context) error { return err } - txs, err := cu.PgStore.GetTxStatusByTrackingId(c.Request().Context(), txStatusRequest.TrackingId) + txs, err := cu.Store.GetTxStatus(c.Request().Context(), txStatusRequest.TrackingId) if err != nil { return err } @@ -36,7 +36,7 @@ func HandleTrackTx(cu *custodial.Custodial) func(echo.Context) error { return c.JSON(http.StatusOK, OkResp{ Ok: true, Result: H{ - "transactions": txs, + "transaction": txs, }, }) } diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go index c8ec15f..3697cee 100644 --- a/internal/custodial/custodial.go +++ b/internal/custodial/custodial.go @@ -3,16 +3,15 @@ package custodial import ( "context" "crypto/ecdsa" - "time" "github.com/bsm/redislock" "github.com/celo-org/celo-blockchain/common" eth_crypto "github.com/celo-org/celo-blockchain/crypto" "github.com/grassrootseconomics/celoutils" - "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" + "github.com/grassrootseconomics/cic-custodial/pkg/util" "github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/labstack/gommon/log" @@ -22,10 +21,9 @@ import ( type ( Opts struct { CeloProvider *celoutils.Provider - Keystore keystore.Keystore LockProvider *redislock.Client Noncestore nonce.Noncestore - PgStore store.Store + Store store.Store RedisClient *redis.Client RegistryAddress string SystemPrivateKey string @@ -36,10 +34,9 @@ type ( Custodial struct { Abis map[string]*w3.Func CeloProvider *celoutils.Provider - Keystore keystore.Keystore LockProvider *redislock.Client Noncestore nonce.Noncestore - PgStore store.Store + Store store.Store RedisClient *redis.Client RegistryMap map[string]common.Address SystemPrivateKey *ecdsa.PrivateKey @@ -49,7 +46,7 @@ type ( ) func NewCustodial(o Opts) (*Custodial, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), util.SLATimeout) defer cancel() registryMap, err := o.CeloProvider.RegistryMap(ctx, celoutils.HexToAddress(o.RegistryAddress)) @@ -86,10 +83,9 @@ func NewCustodial(o Opts) (*Custodial, error) { return &Custodial{ Abis: initAbis(), CeloProvider: o.CeloProvider, - Keystore: o.Keystore, LockProvider: o.LockProvider, Noncestore: o.Noncestore, - PgStore: o.PgStore, + Store: o.Store, RedisClient: o.RedisClient, RegistryMap: registryMap, SystemPrivateKey: privateKey, diff --git a/internal/keystore/keystore.go b/internal/keystore/keystore.go deleted file mode 100644 index 609723f..0000000 --- a/internal/keystore/keystore.go +++ /dev/null @@ -1,14 +0,0 @@ -package keystore - -import ( - "context" - "crypto/ecdsa" - - "github.com/grassrootseconomics/cic-custodial/pkg/keypair" -) - -// Keystore defines how keypairs should be stored and accessed from a storage backend. -type Keystore interface { - WriteKeyPair(context.Context, keypair.Key) (uint, error) - LoadPrivateKey(context.Context, string) (*ecdsa.PrivateKey, error) -} diff --git a/internal/keystore/postgres.go b/internal/keystore/postgres.go deleted file mode 100644 index d5cab2b..0000000 --- a/internal/keystore/postgres.go +++ /dev/null @@ -1,61 +0,0 @@ -package keystore - -import ( - "context" - "crypto/ecdsa" - - eth_crypto "github.com/celo-org/celo-blockchain/crypto" - "github.com/grassrootseconomics/cic-custodial/internal/queries" - "github.com/grassrootseconomics/cic-custodial/pkg/keypair" - "github.com/jackc/pgx/v5/pgxpool" -) - -type ( - Opts struct { - PostgresPool *pgxpool.Pool - Queries *queries.Queries - } - - PostgresKeystore struct { - db *pgxpool.Pool - queries *queries.Queries - } -) - -func NewPostgresKeytore(o Opts) Keystore { - return &PostgresKeystore{ - db: o.PostgresPool, - queries: o.Queries, - } -} - -// WriteKeyPair inserts a keypair into the db and returns the linked id. -func (ks *PostgresKeystore) WriteKeyPair(ctx context.Context, keypair keypair.Key) (uint, error) { - var ( - id uint - ) - - if err := ks.db.QueryRow(ctx, ks.queries.WriteKeyPair, keypair.Public, keypair.Private).Scan(&id); err != nil { - return id, err - } - - return id, nil -} - -// LoadPrivateKey loads a private key as a crypto primitive for direct use. An id is used to search for the private key. -func (ks *PostgresKeystore) LoadPrivateKey(ctx context.Context, publicKey string) (*ecdsa.PrivateKey, error) { - var ( - privateKeyString string - ) - - if err := ks.db.QueryRow(ctx, ks.queries.LoadKeyPair, publicKey).Scan(&privateKeyString); err != nil { - return nil, err - } - - privateKey, err := eth_crypto.HexToECDSA(privateKeyString) - if err != nil { - return nil, err - } - - return privateKey, nil -} diff --git a/internal/queries/queries.go b/internal/queries/queries.go deleted file mode 100644 index 82644bc..0000000 --- a/internal/queries/queries.go +++ /dev/null @@ -1,33 +0,0 @@ -package queries - -import ( - "fmt" - - "github.com/knadh/goyesql/v2" -) - -type Queries struct { - // Keystore - WriteKeyPair string `query:"write-key-pair"` - LoadKeyPair string `query:"load-key-pair"` - // Store - CreateOTX string `query:"create-otx"` - CreateDispatchStatus string `query:"create-dispatch-status"` - ActivateAccount string `query:"activate-account"` - UpdateChainStatus string `query:"update-chain-status"` - GetTxStatusByTrackingId string `query:"get-tx-status-by-tracking-id"` - GetAccountActivationQuorum string `query:"get-account-activation-quorum"` - GetAccountStatus string `query:"get-account-status-by-address"` - DecrGasQuota string `query:"decr-gas-quota"` - ResetGasQuota string `query:"reset-gas-quota"` -} - -func LoadQueries(q goyesql.Queries) (*Queries, error) { - loadedQueries := &Queries{} - - if err := goyesql.ScanToStruct(loadedQueries, q, nil); err != nil { - return nil, fmt.Errorf("failed to scan queries %v", err) - } - - return loadedQueries, nil -} diff --git a/internal/store/account.go b/internal/store/account.go index 7b95794..8f68c74 100644 --- a/internal/store/account.go +++ b/internal/store/account.go @@ -4,32 +4,48 @@ import ( "context" ) -func (s *PostgresStore) GetAccountStatusByAddress(ctx context.Context, publicAddress string) (bool, int, error) { +func (s *PgStore) ActivateAccount( + ctx context.Context, + publicAddress string, +) error { + if _, err := s.db.Exec( + ctx, + s.queries.ActivateAccount, + publicAddress, + ); err != nil { + return err + } + + return nil +} + +func (s *PgStore) GetAccountStatus( + ctx context.Context, + publicAddress string, +) (bool, int, error) { var ( accountActive bool gasQuota int ) - if err := s.db.QueryRow(ctx, s.queries.GetAccountStatus, publicAddress).Scan(&accountActive, &gasQuota); err != nil { + if err := s.db.QueryRow( + ctx, + s.queries.GetAccountStatus, + publicAddress, + ).Scan( + &accountActive, + &gasQuota, + ); err != nil { return false, 0, err } return accountActive, gasQuota, nil } -func (s *PostgresStore) GetAccountActivationQuorum(ctx context.Context, trackingId string) (int, error) { - var ( - quorum int - ) - - if err := s.db.QueryRow(ctx, s.queries.GetAccountActivationQuorum, trackingId).Scan(&quorum); err != nil { - return 0, err - } - - return quorum, nil -} - -func (s *PostgresStore) DecrGasQuota(ctx context.Context, publicAddress string) error { +func (s *PgStore) DecrGasQuota( + ctx context.Context, + publicAddress string, +) error { if _, err := s.db.Exec( ctx, s.queries.DecrGasQuota, @@ -41,7 +57,10 @@ func (s *PostgresStore) DecrGasQuota(ctx context.Context, publicAddress string) return nil } -func (s *PostgresStore) ResetGasQuota(ctx context.Context, publicAddress string) error { +func (s *PgStore) ResetGasQuota( + ctx context.Context, + publicAddress string, +) error { if _, err := s.db.Exec( ctx, s.queries.ResetGasQuota, @@ -52,15 +71,3 @@ func (s *PostgresStore) ResetGasQuota(ctx context.Context, publicAddress string) return nil } - -func (s *PostgresStore) ActivateAccount(ctx context.Context, publicAddress string) error { - if _, err := s.db.Exec( - ctx, - s.queries.ActivateAccount, - publicAddress, - ); err != nil { - return err - } - - return nil -} diff --git a/internal/store/keystore.go b/internal/store/keystore.go new file mode 100644 index 0000000..c3dfb68 --- /dev/null +++ b/internal/store/keystore.go @@ -0,0 +1,53 @@ +package store + +import ( + "context" + "crypto/ecdsa" + + eth_crypto "github.com/celo-org/celo-blockchain/crypto" + "github.com/grassrootseconomics/cic-custodial/pkg/keypair" +) + +func (s *PgStore) WriteKeyPair( + ctx context.Context, + keypair keypair.Key, +) (uint, error) { + var ( + id uint + ) + + if err := s.db.QueryRow( + ctx, + s.queries.WriteKeyPair, + keypair.Public, + keypair.Private, + ).Scan(&id); err != nil { + return id, err + } + + return id, nil +} + +func (s *PgStore) LoadPrivateKey( + ctx context.Context, + publicKey string, +) (*ecdsa.PrivateKey, error) { + var ( + privateKeyString string + ) + + if err := s.db.QueryRow( + ctx, + s.queries.LoadKeyPair, + publicKey, + ).Scan(&privateKeyString); err != nil { + return nil, err + } + + privateKey, err := eth_crypto.HexToECDSA(privateKeyString) + if err != nil { + return nil, err + } + + return privateKey, nil +} diff --git a/internal/store/otx.go b/internal/store/otx.go index 505fa2d..14fca24 100644 --- a/internal/store/otx.go +++ b/internal/store/otx.go @@ -2,21 +2,39 @@ package store import ( "context" + "math/big" "time" "github.com/georgysavva/scany/v2/pgxscan" "github.com/grassrootseconomics/cic-custodial/pkg/enum" ) -type TxStatus struct { - Type string `db:"type" json:"txType"` - TxHash string `db:"tx_hash" json:"txHash"` - TransferValue uint64 `db:"transfer_value" json:"transferValue"` - CreatedAt time.Time `db:"created_at" json:"createdAt"` - Status string `db:"status" json:"status"` -} +type ( + Otx struct { + TrackingId string + Type enum.OtxType + RawTx string + TxHash string + From string + Data string + GasLimit uint64 + TransferValue uint64 + GasPrice *big.Int + Nonce uint64 + } + txStatus struct { + CreatedAt time.Time `db:"created_at" json:"createdAt"` + Status string `db:"status" json:"status"` + TransferValue uint64 `db:"transfer_value" json:"transferValue"` + TxHash string `db:"tx_hash" json:"txHash"` + Type string `db:"type" json:"txType"` + } +) -func (s *PostgresStore) CreateOtx(ctx context.Context, otx OTX) (uint, error) { +func (s *PgStore) CreateOtx( + ctx context.Context, + otx Otx, +) (uint, error) { var ( id uint ) @@ -34,37 +52,52 @@ func (s *PostgresStore) CreateOtx(ctx context.Context, otx OTX) (uint, error) { otx.GasLimit, otx.TransferValue, otx.Nonce, - ).Scan(&id); err != nil { + ).Scan( + &id, + ); err != nil { return id, err } return id, nil } -func (s *PostgresStore) GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error) { +func (s *PgStore) GetTxStatus( + ctx context.Context, + trackingId string, +) (txStatus, error) { var ( - txs []*TxStatus + tx txStatus ) - if err := pgxscan.Select( + rows, err := s.db.Query( ctx, - s.db, - &txs, s.queries.GetTxStatusByTrackingId, trackingId, - ); err != nil { - return nil, err + ) + if err != nil { + return tx, err } - return txs, nil + if err := pgxscan.ScanOne( + &tx, + rows, + ); err != nil { + return tx, err + } + + return tx, nil } -func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error { +func (s *PgStore) CreateDispatchStatus( + ctx context.Context, + otxId uint, + otxStatus enum.OtxStatus, +) error { if _, err := s.db.Exec( ctx, s.queries.CreateDispatchStatus, - dispatch.OtxId, - dispatch.Status, + otxId, + otxStatus, ); err != nil { return err } @@ -72,21 +105,26 @@ func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch Dispa return nil } -func (s *PostgresStore) UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error { +func (s *PgStore) UpdateDispatchStatus( + ctx context.Context, + txSuccess bool, + txHash string, + txBlock uint64, +) error { var ( status = enum.SUCCESS ) - if !chainEvent.Success { + if !txSuccess { status = enum.REVERTED } if _, err := s.db.Exec( ctx, - s.queries.UpdateChainStatus, - chainEvent.TxHash, + s.queries.UpdateDispatchStatus, + txHash, status, - chainEvent.Block, + txBlock, ); err != nil { return err } diff --git a/internal/store/postgres.go b/internal/store/postgres.go deleted file mode 100644 index 97c4dfe..0000000 --- a/internal/store/postgres.go +++ /dev/null @@ -1,25 +0,0 @@ -package store - -import ( - "github.com/grassrootseconomics/cic-custodial/internal/queries" - "github.com/jackc/pgx/v5/pgxpool" -) - -type ( - Opts struct { - PostgresPool *pgxpool.Pool - Queries *queries.Queries - } - - PostgresStore struct { - db *pgxpool.Pool - queries *queries.Queries - } -) - -func NewPostgresStore(o Opts) Store { - return &PostgresStore{ - db: o.PostgresPool, - queries: o.Queries, - } -} diff --git a/internal/store/store.go b/internal/store/store.go index fe1e50a..8cad093 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -2,49 +2,127 @@ package store import ( "context" - "math/big" + "crypto/ecdsa" + "fmt" + "os" "github.com/grassrootseconomics/cic-custodial/pkg/enum" + "github.com/grassrootseconomics/cic-custodial/pkg/keypair" + "github.com/grassrootseconomics/cic-custodial/pkg/util" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/tern/v2/migrate" + "github.com/knadh/goyesql/v2" ) type ( - MinimalTxInfo struct { - Block uint64 `json:"block"` - From string `json:"from"` - To string `json:"to"` - ContractAddress string `json:"contractAddress"` - Success bool `json:"success"` - TxHash string `json:"transactionHash"` - TxIndex uint `json:"transactionIndex"` - Value uint64 `json:"value"` - } - OTX struct { - TrackingId string - Type enum.OtxType - RawTx string - TxHash string - From string - Data string - GasLimit uint64 - TransferValue uint64 - GasPrice *big.Int - Nonce uint64 - } - - DispatchStatus struct { - OtxId uint - Status enum.OtxStatus - } - Store interface { - CreateOtx(ctx context.Context, otx OTX) (id uint, err error) - CreateDispatchStatus(ctx context.Context, dispatch DispatchStatus) error - GetTxStatusByTrackingId(ctx context.Context, trackingId string) ([]*TxStatus, error) - UpdateOtxStatusFromChainEvent(ctx context.Context, chainEvent MinimalTxInfo) error - GetAccountStatusByAddress(ctx context.Context, publicAddress string) (bool, int, error) - GetAccountActivationQuorum(ctx context.Context, trackingId string) (int, error) - DecrGasQuota(ctx context.Context, publicAddress string) error - ResetGasQuota(ctx context.Context, publicAddress string) error - ActivateAccount(ctx context.Context, publicAddress string) error + // Keypair related actions. + LoadPrivateKey(context.Context, string) (*ecdsa.PrivateKey, error) + WriteKeyPair(context.Context, keypair.Key) (uint, error) + // Otx related actions. + CreateOtx(context.Context, Otx) (uint, error) + GetTxStatus(context.Context, string) (txStatus, error) + CreateDispatchStatus(context.Context, uint, enum.OtxStatus) error + UpdateDispatchStatus(context.Context, bool, string, uint64) error + // Account related actions. + ActivateAccount(context.Context, string) error + GetAccountStatus(context.Context, string) (bool, int, error) + // Gas quota related actions. + DecrGasQuota(context.Context, string) error + ResetGasQuota(context.Context, string) error + } + + Opts struct { + DSN string + MigrationsFolderPath string + QueriesFolderPath string + } + + PgStore struct { + db *pgxpool.Pool + queries *queries + } + + queries struct { + // Keystore related queries. + WriteKeyPair string `query:"write-key-pair"` + LoadKeyPair string `query:"load-key-pair"` + // Otx related queries. + CreateOTX string `query:"create-otx"` + GetTxStatusByTrackingId string `query:"get-tx-status-by-tracking-id"` + CreateDispatchStatus string `query:"create-dispatch-status"` + UpdateDispatchStatus string `query:"update-dispatch-status"` + // Account related queries. + ActivateAccount string `query:"activate-account"` + GetAccountStatus string `query:"get-account-status-by-address"` + DecrGasQuota string `query:"decr-gas-quota"` + ResetGasQuota string `query:"reset-gas-quota"` } ) + +func NewPgStore(o Opts) (Store, error) { + parsedConfig, err := pgxpool.ParseConfig(o.DSN) + if err != nil { + return nil, err + } + + dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig) + if err != nil { + return nil, err + } + + queries, err := loadQueries(o.QueriesFolderPath) + if err != nil { + return nil, err + } + + if err := runMigrations(context.Background(), dbPool, o.MigrationsFolderPath); err != nil { + return nil, err + } + + return &PgStore{ + db: dbPool, + queries: queries, + }, nil +} + +func loadQueries(queriesPath string) (*queries, error) { + parsedQueries, err := goyesql.ParseFile(queriesPath) + if err != nil { + return nil, err + } + + loadedQueries := &queries{} + + if err := goyesql.ScanToStruct(loadedQueries, parsedQueries, nil); err != nil { + return nil, fmt.Errorf("failed to scan queries %v", err) + } + + return loadedQueries, nil +} + +func runMigrations(ctx context.Context, dbPool *pgxpool.Pool, migrationsPath string) error { + ctx, cancel := context.WithTimeout(ctx, util.SLATimeout) + defer cancel() + + conn, err := dbPool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + migrator, err := migrate.NewMigrator(ctx, conn.Conn(), "schema_version") + if err != nil { + return err + } + + if err := migrator.LoadMigrations(os.DirFS(migrationsPath)); err != nil { + return err + } + + if err := migrator.Migrate(ctx); err != nil { + return err + } + + return nil +} diff --git a/internal/tasker/server.go b/internal/tasker/server.go index 44185bb..8a668a2 100644 --- a/internal/tasker/server.go +++ b/internal/tasker/server.go @@ -11,7 +11,7 @@ import ( ) const ( - retryRequeueInterval = 2 * time.Second + retryRequeueInterval = 1 * time.Second ) type TaskerServerOpts struct { diff --git a/internal/tasker/task/account_refill_gas.go b/internal/tasker/task/account_refill_gas.go index 7beeeb7..ea6fb7a 100644 --- a/internal/tasker/task/account_refill_gas.go +++ b/internal/tasker/task/account_refill_gas.go @@ -35,7 +35,7 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a return err } - _, gasQuota, err := cu.PgStore.GetAccountStatusByAddress(ctx, payload.PublicKey) + _, gasQuota, err := cu.Store.GetAccountStatus(ctx, payload.PublicKey) if err != nil { return err } @@ -142,7 +142,7 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a return err } - id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ + id, err := cu.Store.CreateOtx(ctx, store.Otx{ TrackingId: payload.TrackingId, Type: enum.REFILL_GAS, RawTx: hexutil.Encode(rawTx), diff --git a/internal/tasker/task/account_register.go b/internal/tasker/task/account_register.go index 5756806..6b77898 100644 --- a/internal/tasker/task/account_register.go +++ b/internal/tasker/task/account_register.go @@ -82,7 +82,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte return err } - id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ + id, err := cu.Store.CreateOtx(ctx, store.Otx{ TrackingId: payload.TrackingId, Type: enum.ACCOUNT_REGISTER, RawTx: hexutil.Encode(rawTx), @@ -96,7 +96,7 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte if err != nil { return err } - + disptachJobPayload, err := json.Marshal(TxPayload{ OtxId: id, Tx: builtTx, diff --git a/internal/tasker/task/dispatch_tx.go b/internal/tasker/task/dispatch_tx.go index 77aeabe..ae3612f 100644 --- a/internal/tasker/task/dispatch_tx.go +++ b/internal/tasker/task/dispatch_tx.go @@ -9,7 +9,6 @@ import ( "github.com/celo-org/celo-blockchain/core/types" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" - "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/hibiken/asynq" @@ -24,41 +23,37 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro return func(ctx context.Context, t *asynq.Task) error { var ( payload TxPayload - dispatchStatus store.DispatchStatus dispathchTx common.Hash + dispatchStatus enum.OtxStatus = enum.IN_NETWORK ) if err := json.Unmarshal(t.Payload(), &payload); err != nil { return err } - dispatchStatus.OtxId = payload.OtxId - if err := cu.CeloProvider.Client.CallCtx( ctx, eth.SendTx(payload.Tx).Returns(&dispathchTx), ); err != nil { switch err.Error() { case celoutils.ErrGasPriceLow: - dispatchStatus.Status = enum.FAIL_LOW_GAS_PRICE + dispatchStatus = enum.FAIL_LOW_GAS_PRICE case celoutils.ErrInsufficientGas: - dispatchStatus.Status = enum.FAIL_NO_GAS + dispatchStatus = enum.FAIL_NO_GAS case celoutils.ErrNonceLow: - dispatchStatus.Status = enum.FAIL_LOW_NONCE + dispatchStatus = enum.FAIL_LOW_NONCE default: - dispatchStatus.Status = enum.FAIL_UNKNOWN_RPC_ERROR + dispatchStatus = enum.FAIL_UNKNOWN_RPC_ERROR } - if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil { + if err := cu.Store.CreateDispatchStatus(ctx, payload.OtxId, dispatchStatus); err != nil { return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } - dispatchStatus.Status = enum.IN_NETWORK - - if err := cu.PgStore.CreateDispatchStatus(ctx, dispatchStatus); err != nil { + if err := cu.Store.CreateDispatchStatus(ctx, payload.OtxId, dispatchStatus); err != nil { return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) } diff --git a/internal/tasker/task/sign_transfer.go b/internal/tasker/task/sign_transfer.go index 463a726..d836785 100644 --- a/internal/tasker/task/sign_transfer.go +++ b/internal/tasker/task/sign_transfer.go @@ -47,7 +47,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er } defer lock.Release(ctx) - key, err := cu.Keystore.LoadPrivateKey(ctx, payload.From) + key, err := cu.Store.LoadPrivateKey(ctx, payload.From) if err != nil { return err } @@ -89,7 +89,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er return err } - id, err := cu.PgStore.CreateOtx(ctx, store.OTX{ + id, err := cu.Store.CreateOtx(ctx, store.Otx{ TrackingId: payload.TrackingId, Type: enum.TRANSFER_VOUCHER, RawTx: hexutil.Encode(rawTx), @@ -105,7 +105,7 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er return err } - if err := cu.PgStore.DecrGasQuota(ctx, payload.From); err != nil { + if err := cu.Store.DecrGasQuota(ctx, payload.From); err != nil { return err } diff --git a/pkg/postgres/pool.go b/pkg/postgres/pool.go deleted file mode 100644 index d9d05ee..0000000 --- a/pkg/postgres/pool.go +++ /dev/null @@ -1,56 +0,0 @@ -package postgres - -import ( - "context" - "os" - "time" - - "github.com/jackc/pgx/v5/pgxpool" - "github.com/jackc/tern/v2/migrate" -) - -const ( - schemaTable = "schema_version" -) - -type PostgresPoolOpts struct { - DSN string - MigrationsFolderPath string -} - -// NewPostgresPool creates a reusbale connection pool across the cic-custodial component. -func NewPostgresPool(ctx context.Context, o PostgresPoolOpts) (*pgxpool.Pool, error) { - parsedConfig, err := pgxpool.ParseConfig(o.DSN) - if err != nil { - return nil, err - } - - dbPool, err := pgxpool.NewWithConfig(ctx, parsedConfig) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - conn, err := dbPool.Acquire(ctx) - if err != nil { - return nil, err - } - defer conn.Release() - - migrator, err := migrate.NewMigrator(ctx, conn.Conn(), schemaTable) - if err != nil { - return nil, err - } - - if err := migrator.LoadMigrations(os.DirFS(o.MigrationsFolderPath)); err != nil { - return nil, err - } - - if err := migrator.Migrate(ctx); err != nil { - return nil, err - } - - return dbPool, nil -} diff --git a/queries.sql b/queries.sql index 4fb3065..8933ddf 100644 --- a/queries.sql +++ b/queries.sql @@ -9,11 +9,6 @@ INSERT INTO keystore(public_key, private_key) VALUES($1, $2) RETURNING id -- $1: public_key SELECT private_key FROM keystore WHERE public_key=$1 ---name: activate-account --- Activate an account following successful quorum --- $1: public_key -UPDATE keystore SET active = true WHERE public_key=$1 - --name: create-otx -- Create a new locally originating tx -- $1: tracking_id @@ -39,6 +34,13 @@ INSERT INTO otx_sign( nonce ) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id +--name: get-tx-status-by-tracking-id +-- Gets tx status's from possible multiple txs with the same tracking_id +-- $1: tracking_id +SELECT otx_sign.type, otx_sign.tx_hash, otx_sign.transfer_value, otx_sign.created_at, otx_dispatch.status FROM otx_sign +INNER JOIN otx_dispatch ON otx_sign.id = otx_dispatch.otx_id +WHERE otx_sign.tracking_id=$1 + --name: create-dispatch-status -- Create a new dispatch status -- $1: otx_id @@ -48,7 +50,7 @@ INSERT INTO otx_dispatch( "status" ) VALUES($1, $2) RETURNING id ---name: update-chain-status +--name: update-dispatch-status -- Updates the status of the dispatched tx with the chain mine status -- $1: tx_hash -- $2: status @@ -60,22 +62,10 @@ UPDATE otx_dispatch SET "status" = $2, "block" = $3 WHERE otx_dispatch.id = ( AND otx_dispatch.status = 'IN_NETWORK' ) ---name: get-tx-status-by-tracking-id --- Gets tx status's from possible multiple txs with the same tracking_id --- $1: tracking_id -SELECT otx_sign.type, otx_sign.tx_hash, otx_sign.transfer_value, otx_sign.created_at, otx_dispatch.status FROM otx_sign -INNER JOIN otx_dispatch ON otx_sign.id = otx_dispatch.otx_id -WHERE otx_sign.tracking_id=$1 - --- TODO: Scroll by status type with cursor pagination - ---name: get-account-activation-quorum --- Gets quorum of required and confirmed system transactions for the account --- $1: tracking_id -SELECT count(*) FROM otx_dispatch INNER JOIN -otx_sign ON otx_dispatch.otx_id = otx_sign.id -WHERE otx_sign.tracking_id=$1 -AND otx_dispatch.status = 'SUCCESS' +--name: activate-account +-- Activate an account following successful quorum +-- $1: public_key +UPDATE keystore SET active = true WHERE public_key=$1 --name: get-account-status-by-address -- Gets current gas quota for an individual account by address @@ -101,4 +91,4 @@ FROM gas_quota_meta WHERE key_id = ( SELECT id FROM keystore WHERE public_key=$1 -) \ No newline at end of file +)