diff --git a/cmd/service/init.go b/cmd/service/init.go index b968b42..0286883 100644 --- a/cmd/service/init.go +++ b/cmd/service/init.go @@ -3,14 +3,12 @@ package main import ( "context" "strings" - "time" "github.com/bsm/redislock" "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/sub" @@ -159,7 +157,7 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries) // Load redis backed noncestore. func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore { return nonce.NewRedisNoncestore(nonce.Opts{ - RedisPool: redisPool, + RedisPool: redisPool, }) } @@ -199,19 +197,6 @@ func initJetStream() (*nats.Conn, nats.JetStreamContext) { return natsConn, js } -func initPub(jsCtx nats.JetStreamContext) *pub.Pub { - pub, err := pub.NewPub(pub.PubOpts{ - DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour, - JsCtx: jsCtx, - PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour, - }) - if err != nil { - lo.Fatal("init: critical error bootstrapping pub", "error", err) - } - - return pub -} - func initSub(natsConn *nats.Conn, jsCtx nats.JetStreamContext, cu *custodial.Custodial) *sub.Sub { sub, err := sub.NewSub(sub.SubOpts{ CustodialContainer: cu, diff --git a/cmd/service/main.go b/cmd/service/main.go index 4117e8f..bf7d5f5 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -59,7 +59,6 @@ func main() { taskerClient := initTaskerClient(asynqRedisPool) natsConn, jsCtx := initJetStream() - jsPub := initPub(jsCtx) custodial, err := custodial.NewCustodial(custodial.Opts{ CeloProvider: celoProvider, @@ -67,7 +66,6 @@ func main() { LockProvider: lockProvider, Noncestore: redisNoncestore, PgStore: pgStore, - Pub: jsPub, RedisClient: redisPool.Client, RegistryAddress: ko.MustString("chain.registry_address"), SystemPrivateKey: ko.MustString("system.private_key"), diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 7c3209f..ce86eee 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -13,32 +13,32 @@ services: interval: 10s timeout: 5s retries: 5 - postgres: - image: postgres:14-alpine - restart: unless-stopped - user: postgres - environment: - - POSTGRES_PASSWORD=postgres - - POSTGRES_USER=postgres - volumes: - - cic-custodial-pg:/var/lib/postgresql/data - - ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql - ports: - - "127.0.0.1:5432:5432" - healthcheck: - test: ["CMD-SHELL", "pg_isready"] - interval: 10s - timeout: 5s - retries: 5 - nats: - image: nats:2.9 - restart: unless-stopped - command: "-js -sd /nats/data" - volumes: - - cic-custodial-nats:/nats/data - ports: - - "127.0.0.1:4222:4222" - - "127.0.0.1:8222:8222" + # postgres: + # image: postgres:14-alpine + # restart: unless-stopped + # user: postgres + # environment: + # - POSTGRES_PASSWORD=postgres + # - POSTGRES_USER=postgres + # volumes: + # - cic-custodial-pg:/var/lib/postgresql/data + # - ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql + # ports: + # - "127.0.0.1:5433:5432" + # healthcheck: + # test: ["CMD-SHELL", "pg_isready"] + # interval: 10s + # timeout: 5s + # retries: 5 + # nats: + # image: nats:2.9 + # restart: unless-stopped + # command: "-js -sd /nats/data" + # volumes: + # - cic-custodial-nats:/nats/data + # ports: + # - "127.0.0.1:4223:4222" + # - "127.0.0.1:8223:8222" asynqmon: image: hibiken/asynqmon restart: unless-stopped @@ -49,16 +49,16 @@ services: depends_on: redis: condition: service_healthy - cic-chain-events: - image: ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest - restart: unless-stopped - env_file: - - events.env - ports: - - '127.0.0.1:5001:5000' - depends_on: - postgres: - condition: service_healthy + # cic-chain-events: + # image: ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest + # restart: unless-stopped + # env_file: + # - events.env + # ports: + # - '127.0.0.1:5001:5000' + # depends_on: + # postgres: + # condition: service_healthy volumes: cic-custodial-pg: driver: local diff --git a/internal/custodial/custodial.go b/internal/custodial/custodial.go index 3f64413..c8ec15f 100644 --- a/internal/custodial/custodial.go +++ b/internal/custodial/custodial.go @@ -11,7 +11,6 @@ import ( "github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/nonce" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/w3-celo-patch" @@ -27,7 +26,6 @@ type ( LockProvider *redislock.Client Noncestore nonce.Noncestore PgStore store.Store - Pub *pub.Pub RedisClient *redis.Client RegistryAddress string SystemPrivateKey string @@ -42,7 +40,6 @@ type ( LockProvider *redislock.Client Noncestore nonce.Noncestore PgStore store.Store - Pub *pub.Pub RedisClient *redis.Client RegistryMap map[string]common.Address SystemPrivateKey *ecdsa.PrivateKey @@ -93,7 +90,6 @@ func NewCustodial(o Opts) (*Custodial, error) { LockProvider: o.LockProvider, Noncestore: o.Noncestore, PgStore: o.PgStore, - Pub: o.Pub, RedisClient: o.RedisClient, RegistryMap: registryMap, SystemPrivateKey: privateKey, diff --git a/internal/pub/js_pub.go b/internal/pub/js_pub.go deleted file mode 100644 index a3155a8..0000000 --- a/internal/pub/js_pub.go +++ /dev/null @@ -1,65 +0,0 @@ -package pub - -import ( - "encoding/json" - "time" - - "github.com/nats-io/nats.go" -) - -const ( - streamName string = "CUSTODIAL" - streamSubjects string = "CUSTODIAL.*" - AccountActivated string = "CUSTODIAL.accountActivated" - GasRefilled string = "CUSTODIAL.gasRefilled" -) - -type ( - PubOpts struct { - DedupDuration time.Duration - JsCtx nats.JetStreamContext - PersistDuration time.Duration - } - - Pub struct { - jsCtx nats.JetStreamContext - } - - EventPayload struct { - TxHash string `json:"txHash"` - } -) - -func NewPub(o PubOpts) (*Pub, error) { - stream, _ := o.JsCtx.StreamInfo(streamName) - if stream == nil { - _, err := o.JsCtx.AddStream(&nats.StreamConfig{ - Name: streamName, - MaxAge: o.PersistDuration, - Storage: nats.FileStorage, - Subjects: []string{streamSubjects}, - Duplicates: o.DedupDuration, - }) - if err != nil { - return nil, err - } - } - - return &Pub{ - jsCtx: o.JsCtx, - }, nil -} - -func (p *Pub) Publish(subject string, dedupId string, eventPayload interface{}) error { - jsonData, err := json.Marshal(eventPayload) - if err != nil { - return err - } - - _, err = p.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId)) - if err != nil { - return err - } - - return nil -} diff --git a/internal/sub/handler.go b/internal/sub/handler.go index 9047366..a627966 100644 --- a/internal/sub/handler.go +++ b/internal/sub/handler.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" - "github.com/grassrootseconomics/cic-custodial/internal/pub" "github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/nats-io/nats.go" ) @@ -28,36 +27,12 @@ func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error { if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil { return err } - - eventPayload := &pub.EventPayload{ - TxHash: chainEvent.TxHash, - } - - if err := s.cu.Pub.Publish( - pub.AccountActivated, - chainEvent.TxHash, - eventPayload, - ); err != nil { - return err - } } case "CHAIN.gas": if chainEvent.Success { if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil { return err } - - eventPayload := &pub.EventPayload{ - TxHash: chainEvent.TxHash, - } - - if err := s.cu.Pub.Publish( - pub.GasRefilled, - chainEvent.TxHash, - eventPayload, - ); err != nil { - return err - } } }