xnapshot: 12-02

This commit is contained in:
Mohamed Sohail 2023-02-12 12:50:43 +03:00
parent 773474cad9
commit 4f7909e4ee
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
16 changed files with 285 additions and 90 deletions

View File

@ -22,6 +22,10 @@ func initAbis() map[string]*w3.Func {
"transfer": w3.MustNewFunc("transfer(address,uint256)", "bool"),
// Keccak hash -> 0x23b872dd
"transferFrom": w3.MustNewFunc("transferFrom(address, address, uint256)", "bool"),
// Add to account index
"add": w3.MustNewFunc("add(address)", "bool"),
// giveTo gas refill
"giveTo": w3.MustNewFunc("giveTo(address)", "uint256"),
}
}
@ -31,6 +35,8 @@ func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) (*tas
// Some custodial system defaults loaded from the config file.
systemContainer := &tasker.SystemContainer{
Abis: initAbis(),
AccountIndexContract: w3.A(ko.MustString("system.account_index")),
GasFaucetContract: w3.A(ko.MustString("system.gas_faucet")),
GasRefillThreshold: big.NewInt(ko.MustInt64("system.gas_refill_threshold")),
GasRefillValue: big.NewInt(ko.MustInt64("system.gas_refill_value")),
GiftableGasValue: big.NewInt(ko.MustInt64("system.giftable_gas_value")),

View File

@ -66,7 +66,8 @@ func initCeloProvider() (*celo.Provider, error) {
}
if ko.Bool("chain.testnet") {
providerOpts.ChainId = celo.TestnetChainId
// Devnet = 1337
providerOpts.ChainId = 1337
} else {
providerOpts.ChainId = celo.MainnetChainId
}

View File

@ -18,6 +18,7 @@ func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *task
taskerServerOpts := tasker.TaskerServerOpts{
Concurrency: ko.MustInt("asynq.worker_count"),
Logg: lo,
LogLevel: asynq.ErrorLevel,
RedisPool: redisPool,
SystemContainer: custodialContainer.systemContainer,
TaskerClient: custodialContainer.taskerClient,
@ -34,6 +35,15 @@ func initTasker(custodialContainer *custodial, redisPool *redis.RedisPool) *task
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.RegisterAccountOnChain, task.RegisterAccountOnChainProcessor(
custodialContainer.celoProvider,
custodialContainer.lockProvider,
custodialContainer.noncestore,
custodialContainer.pgStore,
custodialContainer.systemContainer,
custodialContainer.taskerClient,
js,
))
taskerServer.RegisterHandlers(tasker.GiftGasTask, task.GiftGasProcessor(
custodialContainer.celoProvider,
custodialContainer.lockProvider,

View File

@ -1,5 +1,5 @@
[service]
address = ":5000"
address = ":5005"
# Exposes Prometheus metrics
# /metrics endpoint
metrics = true
@ -7,26 +7,26 @@ metrics = true
# System default values
# Valus are in wei unless otherwise stated
[system]
# Any account below 1 KES equivalent of CELO is topped up again
# 10000000000000000 = 1 KES
gas_refill_threshold = 10000000000000000
gas_refill_value = 10000000000000000
# Every custodial account is given 2 KES worth of CELO
giftable_gas_value = 20000000000000000
# The giftable token is a training voucher
# Every new user is given 5 DGFT
giftable_token_address = "0x486aD10d70107900546455F7a0e022c300F157Bf"
gas_faucet = "0x6dE38F79Cf455339e8141D15E208ba09eea634e1"
giftable_token_address = "0x4091fc149522d5FE31d0970687078B1aE0625892"
giftable_token_value = 5000000
gas_refill_threshold = 100000000000000000
gas_refill_value = 100000000000000000
# Every custodial account is given 2 KES worth of CELO
giftable_gas_value = 2000000000000000000
# System private key
# Should always be toped up
private_key = "a6af6c597c614e3c8ee4b7638ab7c3f737aece3773a5413ca8caf4338e6b06d1"
private_key = "d87f322629cf071ccd3ddf17ab5e1abf098a25c4c5d791d7535d265379b2ae37"
lock_prefix = "lock:"
public_key = "0x80097c773B3E83472FC7952c5206a7DB35d42bEF"
public_key = "0xe5ab7A5af1f28aA8E9658AC33a6ebF2a8641d948"
token_decimals = 6
token_transfer_gas_limit = 100000
token_transfer_gas_limit = 200000
account_index = "0x70138F458Fa56C034acb19E38d082843327F18A4"
[chain]
rpc_endpoint = "https://alfajores-forno.celo-testnet.org"
rpc_endpoint = "http://192.168.0.101:8545"
testnet = true
[postgres]
@ -43,3 +43,22 @@ worker_count = 15
debug = false
dsn = "redis://localhost:6379/0"
task_retention_hrs = 24
# https://docs.nats.io/
[jetstream]
endpoint = "nats://localhost:4222"
stream_name = "CUSTODIAL"
# Duration JetStream should keep the message before remocing it from the persistent store
persist_duration_hours = 48
# Duration to ignore duplicate transactions (e.g. due to restart)
dedup_duration_hours = 6
# Stream subjects
stream_subjects = [
"CUSTODIAL.accountNewNonce",
"CUSTODIAL.accountRegister",
"CUSTODIAL.giftNewAccountGas",
"CUSTODIAL.giftNewAccountVoucher",
"CUSTODIAL.dispatchFail",
"CUSTODIAL.dispatchSuccess",
"CUSTODIAL.transferSign"
]

View File

@ -30,6 +30,15 @@ services:
interval: 10s
timeout: 5s
retries: 5
nats:
image: nats:2.9
restart: unless-stopped
command: "-js -sd /nats/data"
volumes:
- cic-custodial-nats:/nats/data
ports:
- "4222:4222"
- "8222:8222"
asynqmon:
image: hibiken/asynqmon
restart: unless-stopped
@ -45,3 +54,5 @@ volumes:
driver: local
cic-custodial-redis:
driver: local
cic-custodial-nats:
driver: local

View File

@ -24,7 +24,7 @@ func SignTransferHandler(
return func(c echo.Context) error {
var transferRequest struct {
TrackingId string `json:"trackingId" validate:"required"`
From string `json:"from" validate:"required,eth_address"`
From string `json:"from" validate:"required,eth_addr"`
To string `json:"to" validate:"required,eth_addr"`
VoucherAddress string `json:"voucherAddress" validate:"required,eth_addr"`
Amount int64 `json:"amount" validate:"required,numeric"`

View File

@ -16,7 +16,6 @@ func (s *PostgresStore) CreateDispatchStatus(ctx context.Context, dispatch Dispa
s.queries.CreateDispatchStatus,
dispatch.OtxId,
dispatch.Status,
dispatch.TrackingId,
).Scan(&id); err != nil {
return id, err
}

View File

@ -10,13 +10,14 @@ func (s *PostgresStore) CreateOTX(ctx context.Context, otx OTX) (uint, error) {
if err := s.db.QueryRow(
ctx,
s.queries.CreateOTX,
otx.TrackingId,
otx.Type,
otx.RawTx,
otx.TxHash,
otx.From,
otx.Data,
otx.GasPrice,
otx.Nonce,
otx.TrackingId,
).Scan(&id); err != nil {
return id, err
}

View File

@ -8,19 +8,19 @@ import (
type (
OTX struct {
TrackingId string
Type string
RawTx string
TxHash string
From string
Data string
GasPrice uint64
Nonce uint64
TrackingId string
}
DispatchStatus struct {
OtxId uint
Status status.Status
TrackingId string
OtxId uint
Status status.Status
}
Store interface {

View File

@ -2,12 +2,12 @@ package task
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common/hexutil"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/store"
@ -30,9 +30,9 @@ type (
)
func PrepareAccount(
js nats.JetStreamContext,
noncestore nonce.Noncestore,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
@ -48,7 +48,7 @@ func PrepareAccount(
}
_, err := taskerClient.CreateTask(
tasker.GiftGasTask,
tasker.RegisterAccountOnChain,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
@ -87,14 +87,153 @@ func PrepareAccount(
}
}
func GiftGasProcessor(
func RegisterAccountOnChainProcessor(
celoProvider *celo.Provider,
js nats.JetStreamContext,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
p AccountPayload
)
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
lock, err := lockProvider.Obtain(ctx, system.LockPrefix+system.PublicKey, system.LockTimeout, nil)
if err != nil {
return err
}
defer lock.Release(ctx)
nonce, err := noncestore.Acquire(ctx, system.PublicKey)
if err != nil {
return err
}
input, err := system.Abis["add"].EncodeArgs(w3.A(p.PublicKey))
if err != nil {
return err
}
// TODO: Review gas params.
builtTx, err := celoProvider.SignContractExecutionTx(
system.PrivateKey,
celo.ContractExecutionTxOpts{
ContractAddress: system.AccountIndexContract,
InputData: input,
GasPrice: big.NewInt(20000000000),
GasLimit: system.TokenTransferGasLimit,
Nonce: nonce,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
rawTx, err := builtTx.MarshalBinary()
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
id, err := pg.CreateOTX(ctx, store.OTX{
TrackingId: p.TrackingId,
Type: "ACCOUNT_REGISTER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.TxDispatchTask,
tasker.HighPriority,
&tasker.Task{
Payload: disptachJobPayload,
},
)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
_, err = taskerClient.CreateTask(
tasker.GiftGasTask,
tasker.DefaultPriority,
&tasker.Task{
Payload: t.Payload(),
},
)
if err != nil {
return err
}
eventPayload := &accountEventPayload{
TrackingId: p.TrackingId,
}
eventJson, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.Publish("CUSTODIAL.accountRegister", eventJson)
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
return err
}
return err
}
return nil
}
}
func GiftGasProcessor(
celoProvider *celo.Provider,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
@ -144,12 +283,14 @@ func GiftGasProcessor(
}
id, err := pg.CreateOTX(ctx, store.OTX{
RawTx: hex.EncodeToString(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: string(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
TrackingId: p.TrackingId,
Type: "GIFT_GAS",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
@ -160,9 +301,8 @@ func GiftGasProcessor(
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
TrackingId: p.TrackingId,
Tx: builtTx,
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
@ -211,12 +351,12 @@ func GiftGasProcessor(
func GiftTokenProcessor(
celoProvider *celo.Provider,
js nats.JetStreamContext,
lockProvider *redislock.Client,
noncestore nonce.Noncestore,
pg store.Store,
system *tasker.SystemContainer,
taskerClient *tasker.TaskerClient,
js nats.JetStreamContext,
) func(context.Context, *asynq.Task) error {
return func(ctx context.Context, t *asynq.Task) error {
var (
@ -271,12 +411,14 @@ func GiftTokenProcessor(
}
id, err := pg.CreateOTX(ctx, store.OTX{
RawTx: hex.EncodeToString(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: string(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
TrackingId: p.TrackingId,
Type: "GIFT_VOUCHER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: system.PublicKey,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {
@ -287,9 +429,8 @@ func GiftTokenProcessor(
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
TrackingId: p.TrackingId,
Tx: builtTx,
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, system.PublicKey); err != nil {

View File

@ -16,14 +16,13 @@ import (
type (
TxPayload struct {
OtxId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
Tx *types.Transaction `json:"tx"`
OtxId uint `json:"otxId"`
Tx *types.Transaction `json:"tx"`
}
dispatchEventPayload struct {
TrackingId string
TxHash string
OtxId uint
TxHash string
}
)
@ -44,12 +43,11 @@ func TxDispatch(
}
dispatchStatus := store.DispatchStatus{
OtxId: p.OtxId,
TrackingId: p.TrackingId,
OtxId: p.OtxId,
}
eventPayload := &dispatchEventPayload{
TrackingId: p.TrackingId,
OtxId: p.OtxId,
}
if err := celoProvider.Client.CallCtx(
@ -77,7 +75,12 @@ func TxDispatch(
return err
}
dispatchStatus.TrackingId = status.Successful
dispatchStatus.Status = status.Successful
_, err := pg.CreateDispatchStatus(ctx, dispatchStatus)
if err != nil {
return err
}
eventPayload.TxHash = txHash.Hex()
eventJson, err := json.Marshal(eventPayload)
@ -85,7 +88,7 @@ func TxDispatch(
return err
}
_, err = js.Publish("CUSTODIAL.dispatchSuccessful", eventJson, nats.MsgId(txHash.Hex()))
_, err = js.Publish("CUSTODIAL.dispatchSuccess", eventJson, nats.MsgId(txHash.Hex()))
if err != nil {
return err
}

View File

@ -2,11 +2,11 @@ package task
import (
"context"
"encoding/hex"
"encoding/json"
"math/big"
"github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common/hexutil"
celo "github.com/grassrootseconomics/cic-celo-sdk"
"github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce"
@ -108,12 +108,14 @@ func SignTransfer(
}
id, err := pg.CreateOTX(ctx, store.OTX{
RawTx: hex.EncodeToString(rawTx),
TxHash: builtTx.Hash().Hex(),
From: p.From,
Data: string(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
TrackingId: p.TrackingId,
Type: "TRANSFER",
RawTx: hexutil.Encode(rawTx),
TxHash: builtTx.Hash().Hex(),
From: p.From,
Data: hexutil.Encode(builtTx.Data()),
GasPrice: builtTx.GasPrice().Uint64(),
Nonce: builtTx.Nonce(),
})
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {
@ -124,9 +126,8 @@ func SignTransfer(
}
disptachJobPayload, err := json.Marshal(TxPayload{
OtxId: id,
TrackingId: p.TrackingId,
Tx: builtTx,
OtxId: id,
Tx: builtTx,
})
if err != nil {
if err := noncestore.Return(ctx, p.From); err != nil {

View File

@ -17,6 +17,8 @@ type (
type SystemContainer struct {
Abis map[string]*w3.Func
AccountIndexContract common.Address
GasFaucetContract common.Address
GasRefillThreshold *big.Int
GasRefillValue *big.Int
GiftableGasValue *big.Int
@ -37,6 +39,7 @@ type Task struct {
const (
PrepareAccountTask TaskName = "sys:prepare_account"
RegisterAccountOnChain TaskName = "sys:register_account"
GiftGasTask TaskName = "sys:gift_gas"
GiftTokenTask TaskName = "sys:gift_token"
RefillGasTask TaskName = "admin:refill_gas"

View File

@ -5,4 +5,4 @@ CREATE TABLE IF NOT EXISTS keystore (
private_key TEXT NOT NULL,
active BOOLEAN DEFAULT true,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
);

View File

@ -2,22 +2,22 @@
CREATE TABLE IF NOT EXISTS otx (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
tracking_id TEXT NOT NULL,
"type" TEXT NOT NULL,
raw_tx TEXT NOT NULL,
tx_hash TEXT NOT NULL,
from TEXT NOT NULL,
data TEXT NOT NULL,
"from" TEXT NOT NULL,
"data" TEXT NOT NULL,
gas_price bigint NOT NULL,
nonce int NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
);
CREATE INDEX IF NOT EXISTS tx_hash_idx ON otx USING hash(tx_hash);
CREATE INDEX IF NOT EXISTS from_idx ON otx USING hash(from);
CREATE INDEX IF NOT EXISTS from_idx ON otx USING hash("from");
-- Dispatch status table
CREATE TABLE IF NOT EXISTS dispatch (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
otx_id INT REFERENCES otx(id),
status TEXT NOT NULL,
"status" TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
CREATE INDEX IF NOT EXISTS dispatch_receipt_idx ON dispatch USING hash(dispatch_receipt);
);

View File

@ -9,28 +9,30 @@ INSERT INTO keystore(public_key, private_key) VALUES($1, $2) RETURNING id
--name: load-key-pair
-- Load saved key pair
-- $1: public_key
SELECT private_key FROM keystore WHERE id=$1
SELECT private_key FROM keystore WHERE public_key=$1
-- OTX queries
--name: create-otx
-- Create a new locally originating tx
-- $1: raw_tx
-- $2: tx_hash
-- $3: from
-- $4: data
-- $5: gas_price
-- $6: nonce
-- $7: tracking_id
-- $1: tracking_id
-- $2: type
-- $3: raw_tx
-- $4: tx_hash
-- $5: from
-- $6: data
-- $7: gas_price
-- $8: nonce
INSERT INTO otx(
tracking_id,
"type",
raw_tx,
tx_hash,
from,
data,
"from",
"data",
gas_price,
nonce,
tracking_id
) VALUES($1, $2, $3, $4, $5, $6, $7) RETURNING id
nonce
) VALUES($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id
-- Dispatch status queries
@ -39,9 +41,7 @@ INSERT INTO otx(
-- Create a new dispatch status
-- $1: otx_id
-- $2: status
-- £3: tracking_id
INSERT INTO otx(
INSERT INTO dispatch(
otx_id,
status,
tracking_id
) VALUES($1, $2, $3) RETURNING id
"status"
) VALUES($1, $2) RETURNING id