refactor: remove unecessary publisher component

* clients should now rely on the CHAIn.* subjects
This commit is contained in:
Mohamed Sohail 2023-04-06 06:08:38 +00:00
parent e203c49049
commit a98fe958a3
Signed by: kamikazechaser
GPG Key ID: 7DD45520C01CD85D
6 changed files with 37 additions and 148 deletions

View File

@ -3,14 +3,12 @@ package main
import ( import (
"context" "context"
"strings" "strings"
"time"
"github.com/bsm/redislock" "github.com/bsm/redislock"
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/custodial" "github.com/grassrootseconomics/cic-custodial/internal/custodial"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/queries" "github.com/grassrootseconomics/cic-custodial/internal/queries"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/sub" "github.com/grassrootseconomics/cic-custodial/internal/sub"
@ -159,7 +157,7 @@ func initPostgresKeystore(postgresPool *pgxpool.Pool, queries *queries.Queries)
// Load redis backed noncestore. // Load redis backed noncestore.
func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore { func initRedisNoncestore(redisPool *redis.RedisPool) nonce.Noncestore {
return nonce.NewRedisNoncestore(nonce.Opts{ return nonce.NewRedisNoncestore(nonce.Opts{
RedisPool: redisPool, RedisPool: redisPool,
}) })
} }
@ -199,19 +197,6 @@ func initJetStream() (*nats.Conn, nats.JetStreamContext) {
return natsConn, js return natsConn, js
} }
func initPub(jsCtx nats.JetStreamContext) *pub.Pub {
pub, err := pub.NewPub(pub.PubOpts{
DedupDuration: time.Duration(ko.MustInt("jetstream.dedup_duration_hrs")) * time.Hour,
JsCtx: jsCtx,
PersistDuration: time.Duration(ko.MustInt("jetstream.persist_duration_hrs")) * time.Hour,
})
if err != nil {
lo.Fatal("init: critical error bootstrapping pub", "error", err)
}
return pub
}
func initSub(natsConn *nats.Conn, jsCtx nats.JetStreamContext, cu *custodial.Custodial) *sub.Sub { func initSub(natsConn *nats.Conn, jsCtx nats.JetStreamContext, cu *custodial.Custodial) *sub.Sub {
sub, err := sub.NewSub(sub.SubOpts{ sub, err := sub.NewSub(sub.SubOpts{
CustodialContainer: cu, CustodialContainer: cu,

View File

@ -59,7 +59,6 @@ func main() {
taskerClient := initTaskerClient(asynqRedisPool) taskerClient := initTaskerClient(asynqRedisPool)
natsConn, jsCtx := initJetStream() natsConn, jsCtx := initJetStream()
jsPub := initPub(jsCtx)
custodial, err := custodial.NewCustodial(custodial.Opts{ custodial, err := custodial.NewCustodial(custodial.Opts{
CeloProvider: celoProvider, CeloProvider: celoProvider,
@ -67,7 +66,6 @@ func main() {
LockProvider: lockProvider, LockProvider: lockProvider,
Noncestore: redisNoncestore, Noncestore: redisNoncestore,
PgStore: pgStore, PgStore: pgStore,
Pub: jsPub,
RedisClient: redisPool.Client, RedisClient: redisPool.Client,
RegistryAddress: ko.MustString("chain.registry_address"), RegistryAddress: ko.MustString("chain.registry_address"),
SystemPrivateKey: ko.MustString("system.private_key"), SystemPrivateKey: ko.MustString("system.private_key"),

View File

@ -13,32 +13,32 @@ services:
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 5 retries: 5
postgres: # postgres:
image: postgres:14-alpine # image: postgres:14-alpine
restart: unless-stopped # restart: unless-stopped
user: postgres # user: postgres
environment: # environment:
- POSTGRES_PASSWORD=postgres # - POSTGRES_PASSWORD=postgres
- POSTGRES_USER=postgres # - POSTGRES_USER=postgres
volumes: # volumes:
- cic-custodial-pg:/var/lib/postgresql/data # - cic-custodial-pg:/var/lib/postgresql/data
- ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql # - ./init_db.sql:/docker-entrypoint-initdb.d/init_db.sql
ports: # ports:
- "127.0.0.1:5432:5432" # - "127.0.0.1:5433:5432"
healthcheck: # healthcheck:
test: ["CMD-SHELL", "pg_isready"] # test: ["CMD-SHELL", "pg_isready"]
interval: 10s # interval: 10s
timeout: 5s # timeout: 5s
retries: 5 # retries: 5
nats: # nats:
image: nats:2.9 # image: nats:2.9
restart: unless-stopped # restart: unless-stopped
command: "-js -sd /nats/data" # command: "-js -sd /nats/data"
volumes: # volumes:
- cic-custodial-nats:/nats/data # - cic-custodial-nats:/nats/data
ports: # ports:
- "127.0.0.1:4222:4222" # - "127.0.0.1:4223:4222"
- "127.0.0.1:8222:8222" # - "127.0.0.1:8223:8222"
asynqmon: asynqmon:
image: hibiken/asynqmon image: hibiken/asynqmon
restart: unless-stopped restart: unless-stopped
@ -49,16 +49,16 @@ services:
depends_on: depends_on:
redis: redis:
condition: service_healthy condition: service_healthy
cic-chain-events: # cic-chain-events:
image: ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest # image: ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest
restart: unless-stopped # restart: unless-stopped
env_file: # env_file:
- events.env # - events.env
ports: # ports:
- '127.0.0.1:5001:5000' # - '127.0.0.1:5001:5000'
depends_on: # depends_on:
postgres: # postgres:
condition: service_healthy # condition: service_healthy
volumes: volumes:
cic-custodial-pg: cic-custodial-pg:
driver: local driver: local

View File

@ -11,7 +11,6 @@ import (
"github.com/grassrootseconomics/celoutils" "github.com/grassrootseconomics/celoutils"
"github.com/grassrootseconomics/cic-custodial/internal/keystore" "github.com/grassrootseconomics/cic-custodial/internal/keystore"
"github.com/grassrootseconomics/cic-custodial/internal/nonce" "github.com/grassrootseconomics/cic-custodial/internal/nonce"
"github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/grassrootseconomics/cic-custodial/internal/tasker" "github.com/grassrootseconomics/cic-custodial/internal/tasker"
"github.com/grassrootseconomics/w3-celo-patch" "github.com/grassrootseconomics/w3-celo-patch"
@ -27,7 +26,6 @@ type (
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore
PgStore store.Store PgStore store.Store
Pub *pub.Pub
RedisClient *redis.Client RedisClient *redis.Client
RegistryAddress string RegistryAddress string
SystemPrivateKey string SystemPrivateKey string
@ -42,7 +40,6 @@ type (
LockProvider *redislock.Client LockProvider *redislock.Client
Noncestore nonce.Noncestore Noncestore nonce.Noncestore
PgStore store.Store PgStore store.Store
Pub *pub.Pub
RedisClient *redis.Client RedisClient *redis.Client
RegistryMap map[string]common.Address RegistryMap map[string]common.Address
SystemPrivateKey *ecdsa.PrivateKey SystemPrivateKey *ecdsa.PrivateKey
@ -93,7 +90,6 @@ func NewCustodial(o Opts) (*Custodial, error) {
LockProvider: o.LockProvider, LockProvider: o.LockProvider,
Noncestore: o.Noncestore, Noncestore: o.Noncestore,
PgStore: o.PgStore, PgStore: o.PgStore,
Pub: o.Pub,
RedisClient: o.RedisClient, RedisClient: o.RedisClient,
RegistryMap: registryMap, RegistryMap: registryMap,
SystemPrivateKey: privateKey, SystemPrivateKey: privateKey,

View File

@ -1,65 +0,0 @@
package pub
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
const (
streamName string = "CUSTODIAL"
streamSubjects string = "CUSTODIAL.*"
AccountActivated string = "CUSTODIAL.accountActivated"
GasRefilled string = "CUSTODIAL.gasRefilled"
)
type (
PubOpts struct {
DedupDuration time.Duration
JsCtx nats.JetStreamContext
PersistDuration time.Duration
}
Pub struct {
jsCtx nats.JetStreamContext
}
EventPayload struct {
TxHash string `json:"txHash"`
}
)
func NewPub(o PubOpts) (*Pub, error) {
stream, _ := o.JsCtx.StreamInfo(streamName)
if stream == nil {
_, err := o.JsCtx.AddStream(&nats.StreamConfig{
Name: streamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{streamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &Pub{
jsCtx: o.JsCtx,
}, nil
}
func (p *Pub) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = p.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/grassrootseconomics/cic-custodial/internal/pub"
"github.com/grassrootseconomics/cic-custodial/internal/store" "github.com/grassrootseconomics/cic-custodial/internal/store"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
@ -28,36 +27,12 @@ func (s *Sub) handler(ctx context.Context, msg *nats.Msg) error {
if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil { if err := s.cu.PgStore.ActivateAccount(ctx, chainEvent.To); err != nil {
return err return err
} }
eventPayload := &pub.EventPayload{
TxHash: chainEvent.TxHash,
}
if err := s.cu.Pub.Publish(
pub.AccountActivated,
chainEvent.TxHash,
eventPayload,
); err != nil {
return err
}
} }
case "CHAIN.gas": case "CHAIN.gas":
if chainEvent.Success { if chainEvent.Success {
if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil { if err := s.cu.PgStore.ResetGasQuota(ctx, chainEvent.To); err != nil {
return err return err
} }
eventPayload := &pub.EventPayload{
TxHash: chainEvent.TxHash,
}
if err := s.cu.Pub.Publish(
pub.GasRefilled,
chainEvent.TxHash,
eventPayload,
); err != nil {
return err
}
} }
} }