refactor: (breaking) decouple js pub/sub, improve tasker server

This commit is contained in:
Mohamed Sohail 2023-03-06 08:18:41 +00:00
parent a47e44f262
commit 70419a9b19
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
20 changed files with 415 additions and 312 deletions

View File

@ -7,8 +7,8 @@ import (
eth_crypto "github.com/celo-org/celo-blockchain/crypto" eth_crypto "github.com/celo-org/celo-blockchain/crypto"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch"
) )
@ -31,12 +31,12 @@ func initAbis() map[string]*w3.Func {
// Bootstrap the internal custodial system configs and system signer key. // Bootstrap the internal custodial system configs and system signer key.
// This container is passed down to individual tasker and API handlers. // This container is passed down to individual tasker and API handlers.
func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *tasker.SystemContainer { func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *custodial.SystemContainer {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
// Some custodial system defaults loaded from the config file. // Some custodial system defaults loaded from the config file.
systemContainer := &tasker.SystemContainer{ systemContainer := &custodial.SystemContainer{
Abis: initAbis(), Abis: initAbis(),
AccountIndexContract: w3.A(ko.MustString("system.account_index_address")), AccountIndexContract: w3.A(ko.MustString("system.account_index_address")),
GasFaucetContract: w3.A(ko.MustString("system.gas_faucet_address")), GasFaucetContract: w3.A(ko.MustString("system.gas_faucet_address")),
@ -55,10 +55,10 @@ func initSystemContainer(ctx context.Context, noncestore nonce.Noncestore) *task
// Check if system signer account nonce is present. // Check if system signer account nonce is present.
// If not (first boot), we bootstrap it from the network. // If not (first boot), we bootstrap it from the network.
currentSystemNonce, err := noncestore.Peek(ctx, ko.MustString("system.public_key")) currentSystemNonce, err := noncestore.Peek(ctx, ko.MustString("system.public_key"))
lo.Info("custodial: loaded (noncestore) system nonce", "nonce", currentSystemNonce) lo.Info("custodial: loaded system nonce from noncestore", "nonce", currentSystemNonce)
if err == redis.Nil { if err == redis.Nil {
nonce, err := noncestore.SyncNetworkNonce(ctx, ko.MustString("system.public_key")) nonce, err := noncestore.SyncNetworkNonce(ctx, ko.MustString("system.public_key"))
lo.Info("custodial: syncing system nonce", "nonce", nonce) lo.Info("custodial: syncing system nonce from network", "nonce", nonce)
if err != nil { if err != nil {
lo.Fatal("custodial: critical error bootstrapping system container", "error", err) lo.Fatal("custodial: critical error bootstrapping system container", "error", err)
} }

View File

@ -7,11 +7,13 @@ import (
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/sub"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/logg" "github.com/grassrootseconomics/cic-custodial/pkg/logg"
"github.com/grassrootseconomics/cic-custodial/pkg/postgres" "github.com/grassrootseconomics/cic-custodial/pkg/postgres"
@ -22,6 +24,7 @@ import (
"github.com/knadh/koanf/providers/env" "github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file" "github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
@ -46,14 +49,14 @@ func initConfig() *koanf.Koanf {
confFile := file.Provider(confFlag) confFile := file.Provider(confFlag)
if err := ko.Load(confFile, toml.Parser()); err != nil { if err := ko.Load(confFile, toml.Parser()); err != nil {
lo.Fatal("Could not load config file", "error", err) lo.Fatal("init: could not load config file", "error", err)
} }
if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string { if err := ko.Load(env.Provider("CUSTODIAL_", ".", func(s string) string {
return strings.ReplaceAll(strings.ToLower( return strings.ReplaceAll(strings.ToLower(
strings.TrimPrefix(s, "CUSTODIAL_")), "__", ".") strings.TrimPrefix(s, "CUSTODIAL_")), "__", ".")
}), nil); err != nil { }), nil); err != nil {
lo.Fatal("Could not override config from env vars", "error", err) lo.Fatal("init: could not override config from env vars", "error", err)
} }
if debugFlag { if debugFlag {
@ -170,11 +173,10 @@ func initLockProvider(redisPool redislock.RedisClient) *redislock.Client {
func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient { func initTaskerClient(redisPool *redis.RedisPool) *tasker.TaskerClient {
return tasker.NewTaskerClient(tasker.TaskerClientOpts{ return tasker.NewTaskerClient(tasker.TaskerClientOpts{
RedisPool: redisPool, RedisPool: redisPool,
TaskRetention: time.Duration(ko.MustInt64("asynq.task_retention_hrs")) * time.Hour,
}) })
} }
// Load Postgres store // Load Postgres store.
func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store { func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) store.Store {
return store.NewPostgresStore(store.Opts{ return store.NewPostgresStore(store.Opts{
PostgresPool: postgresPool, PostgresPool: postgresPool,
@ -182,20 +184,44 @@ func initPostgresStore(postgresPool *pgxpool.Pool, queries *queries.Queries) sto
}) })
} }
// Init JetStream context for tasker events. // Init JetStream context for both pub/sub.
func initJetStream(pgStore store.Store) *events.JetStream { func initJetStream() (*nats.Conn, nats.JetStreamContext) {
jsEmitter, err := events.NewJetStreamEventEmitter(events.JetStreamOpts{ natsConn, err := nats.Connect(ko.MustString("jetstream.endpoint"))
Logg: lo,
PgStore: pgStore,
ServerUrl: ko.MustString("jetstream.endpoint"),
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour,
})
if err != nil { if err != nil {
lo.Fatal("main: critical error loading jetstream event emitter") lo.Fatal("init: critical error connecting to NATS", "error", err)
}
js, err := natsConn.JetStream()
if err != nil {
lo.Fatal("init: bad JetStream opts", "error", err)
} }
return jsEmitter return natsConn, js
}
func initPub(jsCtx nats.JetStreamContext) *pub.Pub {
pub, err := pub.NewPub(pub.PubOpts{
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour,
JsCtx: jsCtx,
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
})
if err != nil {
lo.Fatal("init: critical error bootstrapping pub", "error", err)
}
return pub
}
func initSub(natsConn *nats.Conn, jsCtx nats.JetStreamContext, cu *custodial.Custodial) *sub.Sub {
sub, err := sub.NewSub(sub.SubOpts{
CustodialContainer: cu,
JsCtx: jsCtx,
Logg: lo,
NatsConn: natsConn,
})
if err != nil {
lo.Fatal("init: critical error bootstrapping sub", "error", err)
}
return sub
} }

View File

@ -7,7 +7,7 @@ import (
"sync" "sync"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/sub"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/knadh/koanf/v2" "github.com/knadh/koanf/v2"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
@ -17,7 +17,7 @@ import (
type ( type (
internalServiceContainer struct { internalServiceContainer struct {
apiService *echo.Echo apiService *echo.Echo
jetstreamSub *events.JetStream jetstreamSub *sub.Sub
taskerService *tasker.TaskerServer taskerService *tasker.TaskerServer
} }
) )
@ -57,15 +57,16 @@ func main() {
taskerClient := initTaskerClient(asynqRedisPool) taskerClient := initTaskerClient(asynqRedisPool)
systemContainer := initSystemContainer(context.Background(), redisNoncestore) systemContainer := initSystemContainer(context.Background(), redisNoncestore)
jsEventEmitter := initJetStream(pgStore) natsConn, jsCtx := initJetStream()
jsPub := initPub(jsCtx)
custodial := &custodial.Custodial{ custodial := &custodial.Custodial{
CeloProvider: celoProvider, CeloProvider: celoProvider,
EventEmitter: jsEventEmitter,
Keystore: postgresKeystore, Keystore: postgresKeystore,
LockProvider: lockProvider, LockProvider: lockProvider,
Noncestore: redisNoncestore, Noncestore: redisNoncestore,
PgStore: pgStore, PgStore: pgStore,
Pub: jsPub,
SystemContainer: systemContainer, SystemContainer: systemContainer,
TaskerClient: taskerClient, TaskerClient: taskerClient,
} }
@ -80,8 +81,9 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
lo.Info("main: starting API server") host := ko.MustString("service.address")
if err := internalServices.apiService.Start(ko.MustString("service.address")); err != nil { lo.Info("main: starting API server", "host", host)
if err := internalServices.apiService.Start(host); err != nil {
if strings.Contains(err.Error(), "Server closed") { if strings.Contains(err.Error(), "Server closed") {
lo.Info("main: shutting down server") lo.Info("main: shutting down server")
} else { } else {
@ -94,23 +96,23 @@ func main() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
lo.Info("Starting tasker") lo.Info("main: starting tasker")
if err := internalServices.taskerService.Start(); err != nil { if err := internalServices.taskerService.Start(); err != nil {
lo.Fatal("main: could not start task server", "err", err) lo.Fatal("main: could not start task server", "err", err)
} }
}() }()
internalServices.jetstreamSub = jsEventEmitter internalServices.jetstreamSub = initSub(natsConn, jsCtx, custodial)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
lo.Info("Starting jetstream sub") lo.Info("main: starting jetstream sub")
if err := internalServices.jetstreamSub.Subscriber(); err != nil { if err := internalServices.jetstreamSub.Process(); err != nil {
lo.Fatal("main: error running jetstream sub", "err", err) lo.Fatal("main: error running jetstream sub", "err", err)
} }
}() }()
<-signalCh lo.Info("main: graceful shutdown triggered", "signal", <-signalCh)
startGracefulShutdown(context.Background(), internalServices) startGracefulShutdown(context.Background(), internalServices)
wg.Wait() wg.Wait()

View File

@ -1,24 +1,40 @@
package main package main
import ( import (
"context"
"time"
"github.com/bsm/redislock"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/internal/tasker/task" "github.com/grassrootseconomics/cic-custodial/internal/tasker/task"
"github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/zerodha/logf"
)
const (
fixedRetryCount = 25
fixedRetryPeriod = time.Second * 1
) )
// Load tasker handlers, injecting any necessary handler dependencies from the system container. // Load tasker handlers, injecting any necessary handler dependencies from the system container.
func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisPool) *tasker.TaskerServer { func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisPool) *tasker.TaskerServer {
taskerServerOpts := tasker.TaskerServerOpts{ taskerServerOpts := tasker.TaskerServerOpts{
Concurrency: ko.MustInt("asynq.worker_count"), Concurrency: ko.MustInt("asynq.worker_count"),
IsFailureHandler: isFailureHandler,
Logg: lo, Logg: lo,
LogLevel: asynq.InfoLevel, LogLevel: asynq.InfoLevel,
RedisPool: redisPool, RedisPool: redisPool,
RetryHandler: retryHandler,
} }
taskerServer := tasker.NewTaskerServer(taskerServerOpts) taskerServer := tasker.NewTaskerServer(taskerServerOpts)
taskerServer.RegisterMiddlewareStack([]asynq.MiddlewareFunc{
observibilityMiddleware(lo),
})
taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountPrepareTask, task.AccountPrepare(custodialContainer))
taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountRegisterTask, task.AccountRegisterOnChainProcessor(custodialContainer))
taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer)) taskerServer.RegisterHandlers(tasker.AccountGiftGasTask, task.AccountGiftGasProcessor(custodialContainer))
@ -29,3 +45,38 @@ func initTasker(custodialContainer *custodial.Custodial, redisPool *redis.RedisP
return taskerServer return taskerServer
} }
func isFailureHandler(err error) bool {
switch err {
// Ignore lock contention errors; retry until lock obtain.
case redislock.ErrNotObtained:
return false
default:
return true
}
}
func retryHandler(count int, err error, task *asynq.Task) time.Duration {
if count < fixedRetryCount {
return fixedRetryPeriod
} else {
return asynq.DefaultRetryDelayFunc(count, err, task)
}
}
func observibilityMiddleware(logg logf.Logger) asynq.MiddlewareFunc {
return func(handler asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, task *asynq.Task) error {
taskId, _ := asynq.GetTaskID(ctx)
err := handler.ProcessTask(ctx, task)
if err != nil && isFailureHandler(err) {
lo.Error("tasker: handler error", "err", err, "task_type", task.Type(), "task_id", taskId)
} else {
lo.Info("tasker: process task", "task_type", task.Type(), "task_id", taskId)
}
return err
})
}
}

View File

@ -1,22 +1,46 @@
package custodial package custodial
import ( import (
"crypto/ecdsa"
"math/big"
"time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/events"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch"
) )
type Custodial struct { type (
SystemContainer struct {
Abis map[string]*w3.Func
AccountIndexContract common.Address
GasFaucetContract common.Address
GasRefillThreshold *big.Int
GasRefillValue *big.Int
GiftableGasValue *big.Int
GiftableToken common.Address
GiftableTokenValue *big.Int
LockPrefix string
LockTimeout time.Duration
PrivateKey *ecdsa.PrivateKey
PublicKey string
TokenDecimals int
TokenTransferGasLimit uint64
}
Custodial struct {
CeloProvider *celoutils.Provider CeloProvider *celoutils.Provider
EventEmitter *events.JetStream
Keystore keystore.Keystore Keystore keystore.Keystore
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore
PgStore store.Store PgStore store.Store
SystemContainer *tasker.SystemContainer Pub *pub.Pub
SystemContainer *SystemContainer
TaskerClient *tasker.TaskerClient TaskerClient *tasker.TaskerClient
} }
)

View File

@ -1,169 +0,0 @@
package events
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
)
const (
// Pub
StreamName string = "CUSTODIAL"
StreamSubjects string = "CUSTODIAL.*"
AccountNewNonce string = "CUSTODIAL.accountNewNonce"
AccountRegister string = "CUSTODIAL.accountRegister"
AccountGiftGas string = "CUSTODIAL.systemNewAccountGas"
AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher"
AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas"
DispatchFail string = "CUSTODIAL.dispatchFail"
DispatchSuccess string = "CUSTODIAL.dispatchSuccess"
SignTransfer string = "CUSTODIAL.signTransfer"
// Sub
durableId = "cic-custodial"
pullStream = "CHAIN"
pullSubject = "CHAIN.*"
actionTimeout = 5 * time.Second
)
type (
JetStreamOpts struct {
Logg logf.Logger
ServerUrl string
PersistDuration time.Duration
PgStore store.Store
DedupDuration time.Duration
}
JetStream struct {
logg logf.Logger
jsCtx nats.JetStreamContext
pgStore store.Store
natsConn *nats.Conn
}
EventPayload struct {
OtxId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
TxHash string `json:"txHash"`
}
)
func NewJetStreamEventEmitter(o JetStreamOpts) (*JetStream, error) {
natsConn, err := nats.Connect(o.ServerUrl)
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
// Bootstrap stream if it doesn't exist.
stream, _ := js.StreamInfo(StreamName)
if stream == nil {
_, err = js.AddStream(&nats.StreamConfig{
Name: StreamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{StreamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
// Add a durable consumer
_, err = js.AddConsumer(pullStream, &nats.ConsumerConfig{
Durable: durableId,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
return &JetStream{
logg: o.Logg,
jsCtx: js,
pgStore: o.PgStore,
natsConn: natsConn,
}, nil
}
// Close gracefully shutdowns the JetStream connection.
func (js *JetStream) Close() {
if js.natsConn != nil {
js.natsConn.Close()
}
}
// Publish publishes the JSON data to the NATS stream.
func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}
func (js *JetStream) Subscriber() error {
subOpts := []nats.SubOpt{
nats.ManualAck(),
nats.Bind(pullStream, durableId),
}
natsSub, err := js.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...)
if err != nil {
return err
}
for {
events, err := natsSub.Fetch(1)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
continue
} else if errors.Is(err, nats.ErrConnectionClosed) {
return nil
} else {
return err
}
}
if len(events) > 0 {
var (
chainEvent store.MinimalTxInfo
msg = events[0]
)
if err := json.Unmarshal(msg.Data, &chainEvent); err != nil {
msg.Nak()
js.logg.Error("jetstream sub: json unmarshal fail", "error", err)
} else {
ctx, cancel := context.WithTimeout(context.Background(), actionTimeout)
if err := js.pgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil {
msg.Nak()
js.logg.Error("jetstream sub: otx marker failed to update state", "error", err)
} else {
msg.Ack()
}
cancel()
}
}
}
}

73
internal/pub/js_pub.go Normal file
View File

@ -0,0 +1,73 @@
package pub
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
const (
StreamName string = "CUSTODIAL"
StreamSubjects string = "CUSTODIAL.*"
AccountNewNonce string = "CUSTODIAL.accountNewNonce"
AccountRegister string = "CUSTODIAL.accountRegister"
AccountGiftGas string = "CUSTODIAL.systemNewAccountGas"
AccountGiftVoucher string = "CUSTODIAL.systemNewAccountVoucher"
AccountRefillGas string = "CUSTODIAL.systemRefillAccountGas"
DispatchFail string = "CUSTODIAL.dispatchFail"
DispatchSuccess string = "CUSTODIAL.dispatchSuccess"
SignTransfer string = "CUSTODIAL.signTransfer"
)
type (
PubOpts struct {
DedupDuration time.Duration
JsCtx nats.JetStreamContext
PersistDuration time.Duration
}
Pub struct {
jsCtx nats.JetStreamContext
}
EventPayload struct {
OtxId uint `json:"otxId"`
TrackingId string `json:"trackingId"`
TxHash string `json:"txHash"`
}
)
func NewPub(o PubOpts) (*Pub, error) {
stream, _ := o.JsCtx.StreamInfo(StreamName)
if stream == nil {
_, err := o.JsCtx.AddStream(&nats.StreamConfig{
Name: StreamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{StreamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &Pub{
jsCtx: o.JsCtx,
}, nil
}
func (p *Pub) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = p.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

30
internal/sub/handler.go Normal file
View File

@ -0,0 +1,30 @@
package sub
import (
"context"
"encoding/json"
"github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go"
)
func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error {
var (
chainEvent store.MinimalTxInfo
)
if err := json.Unmarshal(msg.Data, &chainEvent); err != nil {
return err
}
if err := s.cu.PgStore.UpdateOtxStatusFromChainEvent(ctx, chainEvent); err != nil {
return err
}
switch msg.Subject {
case "CHAIN.gas":
//
}
return nil
}

100
internal/sub/js_sub.go Normal file
View File

@ -0,0 +1,100 @@
package sub
import (
"context"
"errors"
"time"
"github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/nats-io/nats.go"
"github.com/zerodha/logf"
)
const (
durableId = "cic-custodial"
pullStream = "CHAIN"
pullSubject = "CHAIN.*"
actionTimeout = 5 * time.Second
waitDelay = 1 * time.Second
)
type (
SubOpts struct {
CustodialContainer *custodial.Custodial
JsCtx nats.JetStreamContext
Logg logf.Logger
NatsConn *nats.Conn
}
Sub struct {
cu *custodial.Custodial
jsCtx nats.JetStreamContext
logg logf.Logger
natsConn *nats.Conn
}
)
func NewSub(o SubOpts) (*Sub, error) {
_, err := o.JsCtx.AddConsumer(pullStream, &nats.ConsumerConfig{
Durable: durableId,
AckPolicy: nats.AckExplicitPolicy,
FilterSubject: pullSubject,
})
if err != nil {
return nil, err
}
return &Sub{
cu: o.CustodialContainer,
jsCtx: o.JsCtx,
logg: o.Logg,
natsConn: o.NatsConn,
}, nil
}
func (s *Sub) Process() error {
subOpts := []nats.SubOpt{
nats.ManualAck(),
nats.Bind(pullStream, durableId),
}
natsSub, err := s.jsCtx.PullSubscribe(pullSubject, durableId, subOpts...)
if err != nil {
return err
}
for {
events, err := natsSub.Fetch(1)
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
s.logg.Debug("sub: no msg to pull")
time.Sleep(waitDelay)
continue
} else if errors.Is(err, nats.ErrConnectionClosed) {
return nil
} else {
return err
}
}
if len(events) > 0 {
msg := events[0]
ctx, cancel := context.WithTimeout(context.Background(), actionTimeout)
if err := s.handler(ctx, msg); err != nil {
s.logg.Error("sub: handler error", "error", err)
msg.Nak()
} else {
s.logg.Debug("sub: processed msg", "subject", msg.Subject)
msg.Ack()
}
cancel()
}
}
}
func (s *Sub) Close() {
if s.natsConn != nil {
s.natsConn.Close()
}
}

View File

@ -10,17 +10,16 @@ import (
) )
const ( const (
taskTimeout = 60 taskTimeout = 60 * time.Second
taskRetention = 48 * time.Hour
) )
type TaskerClientOpts struct { type TaskerClientOpts struct {
RedisPool *redis.RedisPool RedisPool *redis.RedisPool
TaskRetention time.Duration
} }
type TaskerClient struct { type TaskerClient struct {
Client *asynq.Client Client *asynq.Client
taskRetention time.Duration
} }
func NewTaskerClient(o TaskerClientOpts) *TaskerClient { func NewTaskerClient(o TaskerClientOpts) *TaskerClient {
@ -39,8 +38,8 @@ func (c *TaskerClient) CreateTask(ctx context.Context, taskName TaskName, queueN
task.Payload, task.Payload,
asynq.Queue(string(queueName)), asynq.Queue(string(queueName)),
asynq.TaskID(task.Id), asynq.TaskID(task.Id),
asynq.Retention(c.taskRetention), asynq.Retention(taskRetention),
asynq.Timeout(taskTimeout*time.Second), asynq.Timeout(taskTimeout),
) )
taskInfo, err := c.Client.EnqueueContext(ctx, qTask) taskInfo, err := c.Client.EnqueueContext(ctx, qTask)

View File

@ -4,22 +4,24 @@ import (
"context" "context"
"time" "time"
"github.com/bsm/redislock" "github.com/grassrootseconomics/cic-custodial/pkg/logg"
"github.com/grassrootseconomics/cic-custodial/pkg/redis" "github.com/grassrootseconomics/cic-custodial/pkg/redis"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/zerodha/logf" "github.com/zerodha/logf"
) )
const ( const (
fixedRetryCount = 25 retryRequeueInterval = 2 * time.Second
fixedRetryPeriod = time.Second * 1
) )
type TaskerServerOpts struct { type TaskerServerOpts struct {
Concurrency int Concurrency int
ErrorHandler asynq.ErrorHandler
IsFailureHandler func(error) bool
Logg logf.Logger Logg logf.Logger
LogLevel asynq.LogLevel LogLevel asynq.LogLevel
RedisPool *redis.RedisPool RedisPool *redis.RedisPool
RetryHandler asynq.RetryDelayFunc
} }
type TaskerServer struct { type TaskerServer struct {
@ -32,13 +34,16 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer {
o.RedisPool, o.RedisPool,
asynq.Config{ asynq.Config{
Concurrency: o.Concurrency, Concurrency: o.Concurrency,
IsFailure: expectedFailures, DelayedTaskCheckInterval: retryRequeueInterval,
ErrorHandler: o.ErrorHandler,
IsFailure: o.IsFailureHandler,
LogLevel: o.LogLevel, LogLevel: o.LogLevel,
Logger: logg.AsynqCompatibleLogger(o.Logg),
Queues: map[string]int{ Queues: map[string]int{
string(HighPriority): 5, string(HighPriority): 7,
string(DefaultPriority): 2, string(DefaultPriority): 3,
}, },
RetryDelayFunc: retryDelay, RetryDelayFunc: o.RetryHandler,
}, },
) )
@ -50,6 +55,10 @@ func NewTaskerServer(o TaskerServerOpts) *TaskerServer {
} }
} }
func (ts *TaskerServer) RegisterMiddlewareStack(middlewareStack []asynq.MiddlewareFunc) {
ts.mux.Use(middlewareStack...)
}
func (ts *TaskerServer) RegisterHandlers(taskName TaskName, taskHandler func(context.Context, *asynq.Task) error) { func (ts *TaskerServer) RegisterHandlers(taskName TaskName, taskHandler func(context.Context, *asynq.Task) error) {
ts.mux.HandleFunc(string(taskName), taskHandler) ts.mux.HandleFunc(string(taskName), taskHandler)
} }
@ -66,22 +75,3 @@ func (ts *TaskerServer) Stop() {
ts.server.Stop() ts.server.Stop()
ts.server.Shutdown() ts.server.Shutdown()
} }
func expectedFailures(err error) bool {
switch err {
// Ignore lock contention errors; retry until lock obtain.
case redislock.ErrNotObtained:
return false
default:
return true
}
}
// Immidiatel
func retryDelay(count int, err error, task *asynq.Task) time.Duration {
if count < fixedRetryCount {
return fixedRetryPeriod
} else {
return asynq.DefaultRetryDelayFunc(count, err, task)
}
}

View File

@ -8,7 +8,7 @@ import (
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
@ -105,14 +105,14 @@ func AccountGiftGasProcessor(cu *custodial.Custodial) func(context.Context, *asy
return err return err
} }
eventPayload := &events.EventPayload{ eventPayload := &pub.EventPayload{
OtxId: id, OtxId: id,
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(), TxHash: builtTx.Hash().Hex(),
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.AccountGiftGas, pub.AccountGiftGas,
builtTx.Hash().Hex(), builtTx.Hash().Hex(),
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -7,7 +7,7 @@ import (
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
@ -115,14 +115,14 @@ func GiftVoucherProcessor(cu *custodial.Custodial) func(context.Context, *asynq.
return err return err
} }
eventPayload := &events.EventPayload{ eventPayload := &pub.EventPayload{
OtxId: id, OtxId: id,
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(), TxHash: builtTx.Hash().Hex(),
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.AccountGiftVoucher, pub.AccountGiftVoucher,
builtTx.Hash().Hex(), builtTx.Hash().Hex(),
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
@ -64,12 +64,12 @@ func AccountPrepare(cu *custodial.Custodial) func(context.Context, *asynq.Task)
return err return err
} }
eventPayload := events.EventPayload{ eventPayload := pub.EventPayload{
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.AccountNewNonce, pub.AccountNewNonce,
payload.PublicKey, payload.PublicKey,
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -9,7 +9,7 @@ import (
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
@ -120,14 +120,14 @@ func AccountRefillGasProcessor(cu *custodial.Custodial) func(context.Context, *a
return err return err
} }
eventPayload := &events.EventPayload{ eventPayload := &pub.EventPayload{
OtxId: id, OtxId: id,
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(), TxHash: builtTx.Hash().Hex(),
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.AccountRefillGas, pub.AccountRefillGas,
builtTx.Hash().Hex(), builtTx.Hash().Hex(),
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -8,7 +8,7 @@ import (
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
@ -113,14 +113,14 @@ func AccountRegisterOnChainProcessor(cu *custodial.Custodial) func(context.Conte
return err return err
} }
eventPayload := &events.EventPayload{ eventPayload := &pub.EventPayload{
OtxId: id, OtxId: id,
TrackingId: payload.TrackingId, TrackingId: payload.TrackingId,
TxHash: builtTx.Hash().Hex(), TxHash: builtTx.Hash().Hex(),
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.AccountRegister, pub.AccountRegister,
builtTx.Hash().Hex(), builtTx.Hash().Hex(),
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -9,7 +9,7 @@ import (
"github.com/celo-org/celo-blockchain/core/types" "github.com/celo-org/celo-blockchain/core/types"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
"github.com/grassrootseconomics/w3-celo-patch/module/eth" "github.com/grassrootseconomics/w3-celo-patch/module/eth"
@ -26,7 +26,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro
var ( var (
payload TxPayload payload TxPayload
dispatchStatus store.DispatchStatus dispatchStatus store.DispatchStatus
eventPayload events.EventPayload eventPayload pub.EventPayload
dispathchTx common.Hash dispathchTx common.Hash
) )
@ -58,7 +58,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }
if err := cu.EventEmitter.Publish(events.DispatchFail, txHash, eventPayload); err != nil { if err := cu.Pub.Publish(pub.DispatchFail, txHash, eventPayload); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }
@ -71,7 +71,7 @@ func DispatchTx(cu *custodial.Custodial) func(context.Context, *asynq.Task) erro
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }
if err := cu.EventEmitter.Publish(events.DispatchSuccess, txHash, eventPayload); err != nil { if err := cu.Pub.Publish(pub.DispatchSuccess, txHash, eventPayload); err != nil {
return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry) return fmt.Errorf("dispatch: failed %v: %w", err, asynq.SkipRetry)
} }

View File

@ -9,7 +9,7 @@ import (
"github.com/celo-org/celo-blockchain/common/hexutil" "github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/events" "github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/cic-custodial/pkg/enum" "github.com/grassrootseconomics/cic-custodial/pkg/enum"
@ -160,8 +160,8 @@ func SignTransfer(cu *custodial.Custodial) func(context.Context, *asynq.Task) er
TxHash: builtTx.Hash().Hex(), TxHash: builtTx.Hash().Hex(),
} }
if err := cu.EventEmitter.Publish( if err := cu.Pub.Publish(
events.SignTransfer, pub.SignTransfer,
builtTx.Hash().Hex(), builtTx.Hash().Hex(),
eventPayload, eventPayload,
); err != nil { ); err != nil {

View File

@ -1,13 +1,7 @@
package tasker package tasker
import ( import (
"crypto/ecdsa"
"encoding/json" "encoding/json"
"math/big"
"time"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/w3-celo-patch"
) )
type ( type (
@ -15,23 +9,6 @@ type (
TaskName string TaskName string
) )
type SystemContainer struct {
Abis map[string]*w3.Func
AccountIndexContract common.Address
GasFaucetContract common.Address
GasRefillThreshold *big.Int
GasRefillValue *big.Int
GiftableGasValue *big.Int
GiftableToken common.Address
GiftableTokenValue *big.Int
LockPrefix string
LockTimeout time.Duration
PrivateKey *ecdsa.PrivateKey
PublicKey string
TokenDecimals int
TokenTransferGasLimit uint64
}
type Task struct { type Task struct {
Id string `json:"id"` Id string `json:"id"`
Payload json.RawMessage `json:"payload"` Payload json.RawMessage `json:"payload"`

View File

@ -13,21 +13,21 @@ func AsynqCompatibleLogger(lo logf.Logger) AsynqLogger {
} }
func (l AsynqLogger) Debug(args ...interface{}) { func (l AsynqLogger) Debug(args ...interface{}) {
l.Lo.Debug("asynq", "debug", args[0]) l.Lo.Debug("asynq: server", "msg", args[0])
} }
func (l AsynqLogger) Info(args ...interface{}) { func (l AsynqLogger) Info(args ...interface{}) {
l.Lo.Info("asynq", "info", args[0]) l.Lo.Info("asynq: server", "msg", args[0])
} }
func (l AsynqLogger) Warn(args ...interface{}) { func (l AsynqLogger) Warn(args ...interface{}) {
l.Lo.Warn("asynq", "warn", args[0]) l.Lo.Warn("asynq: server", "msg", args[0])
} }
func (l AsynqLogger) Error(args ...interface{}) { func (l AsynqLogger) Error(args ...interface{}) {
l.Lo.Error("asynq", "error", args[0]) l.Lo.Error("asynq: server", "msg", args[0])
} }
func (l AsynqLogger) Fatal(args ...interface{}) { func (l AsynqLogger) Fatal(args ...interface{}) {
l.Lo.Fatal("asynq", "fatal", args[0]) l.Lo.Fatal("asynq: server", "msg", args[0])
} }