major refactor: sig ch, remove conf settings, jetstream pub, ci

* This is a major refactor and includes general improvements around

- context cancellation
- build settings
- jetstream pub sub
- logging
- docker builds
- conf loading
This commit is contained in:
2023-03-08 14:30:40 +00:00
parent 661e6cf1f1
commit 2bbc05bb45
30 changed files with 688 additions and 489 deletions

View File

@@ -1,17 +0,0 @@
package events
type EventEmitter interface {
Close()
Publish(subject string, dedupId string, eventPayload interface{}) error
}
type MinimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}

View File

@@ -1,78 +0,0 @@
package events
import (
"time"
"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)
const (
StreamName string = "CHAIN"
StreamSubjects string = "CHAIN.*"
)
type JetStreamOpts struct {
ServerUrl string
PersistDuration time.Duration
DedupDuration time.Duration
}
type JetStream struct {
jsCtx nats.JetStreamContext
nc *nats.Conn
}
func NewJetStreamEventEmitter(o JetStreamOpts) (EventEmitter, error) {
natsConn, err := nats.Connect(o.ServerUrl)
if err != nil {
return nil, err
}
js, err := natsConn.JetStream()
if err != nil {
return nil, err
}
// Bootstrap stream if it doesn't exist.
stream, _ := js.StreamInfo(StreamName)
if stream == nil {
_, err = js.AddStream(&nats.StreamConfig{
Name: StreamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{StreamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &JetStream{
jsCtx: js,
nc: natsConn,
}, nil
}
// Close gracefully shutdowns the JetStream connection.
func (js *JetStream) Close() {
if js.nc != nil {
js.nc.Close()
}
}
// Publish publishes the JSON data to the NATS stream.
func (js *JetStream) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = js.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

View File

@@ -8,17 +8,19 @@ import (
"github.com/zerodha/logf"
)
type AddressFilterOpts struct {
Cache *sync.Map
Logg logf.Logger
SystemAddress string
}
type (
AddressFilterOpts struct {
Cache *sync.Map
Logg logf.Logger
SystemAddress string
}
type AddressFilter struct {
cache *sync.Map
logg logf.Logger
systemAddress string
}
AddressFilter struct {
cache *sync.Map
logg logf.Logger
systemAddress string
}
)
func NewAddressFilter(o AddressFilterOpts) Filter {
return &AddressFilter{

View File

@@ -4,7 +4,7 @@ import (
"context"
"github.com/celo-org/celo-blockchain/common/hexutil"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/zerodha/logf"
)
@@ -13,22 +13,24 @@ const (
gasFilterEventSubject = "CHAIN.gas"
)
type GasFilterOpts struct {
EventEmitter events.EventEmitter
Logg logf.Logger
SystemAddress string
}
type (
GasFilterOpts struct {
Logg logf.Logger
Pub *pub.Pub
SystemAddress string
}
type GasFilter struct {
eventEmitter events.EventEmitter
logg logf.Logger
systemAddress string
}
GasFilter struct {
logg logf.Logger
pub *pub.Pub
systemAddress string
}
)
func NewGasFilter(o GasFilterOpts) Filter {
return &GasFilter{
eventEmitter: o.EventEmitter,
logg: o.Logg,
pub: o.Pub,
systemAddress: o.SystemAddress,
}
}
@@ -41,7 +43,7 @@ func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (b
// TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract.
if transaction.From.Address == f.systemAddress && transferValue > 0 {
transferEvent := &events.MinimalTxInfo{
transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
To: transaction.To.Address,
TxHash: transaction.Hash,
@@ -53,7 +55,7 @@ func (f *GasFilter) Execute(_ context.Context, transaction fetch.Transaction) (b
transferEvent.Success = true
}
if err := f.eventEmitter.Publish(
if err := f.pub.Publish(
gasFilterEventSubject,
transaction.Hash,
transferEvent,

View File

@@ -4,7 +4,7 @@ import (
"context"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/zerodha/logf"
@@ -18,20 +18,22 @@ var (
addSig = w3.MustNewFunc("add(address)", "bool")
)
type RegisterFilterOpts struct {
EventEmitter events.EventEmitter
Logg logf.Logger
}
type (
RegisterFilterOpts struct {
Logg logf.Logger
Pub *pub.Pub
}
type RegisterFilter struct {
eventEmitter events.EventEmitter
logg logf.Logger
}
RegisterFilter struct {
logg logf.Logger
pub *pub.Pub
}
)
func NewRegisterFilter(o RegisterFilterOpts) Filter {
return &RegisterFilter{
eventEmitter: o.EventEmitter,
logg: o.Logg,
logg: o.Logg,
pub: o.Pub,
}
}
@@ -47,7 +49,7 @@ func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transactio
return false, err
}
addEvent := &events.MinimalTxInfo{
addEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
ContractAddress: transaction.To.Address,
To: transaction.To.Address,
@@ -59,7 +61,7 @@ func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transactio
addEvent.Success = true
}
if err := f.eventEmitter.Publish(
if err := f.pub.Publish(
registerEventSubject,
transaction.Hash,
addEvent,
@@ -69,6 +71,6 @@ func (f *RegisterFilter) Execute(_ context.Context, transaction fetch.Transactio
return true, nil
}
return true, nil
}

View File

@@ -5,7 +5,7 @@ import (
"math/big"
"github.com/celo-org/celo-blockchain/common"
"github.com/grassrootseconomics/cic-chain-events/internal/events"
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
"github.com/grassrootseconomics/w3-celo-patch"
"github.com/zerodha/logf"
@@ -21,20 +21,22 @@ var (
mintToSig = w3.MustNewFunc("mintTo(address, uint256)", "bool")
)
type TransferFilterOpts struct {
EventEmitter events.EventEmitter
Logg logf.Logger
}
type (
TransferFilterOpts struct {
Logg logf.Logger
Pub *pub.Pub
}
type TransferFilter struct {
eventEmitter events.EventEmitter
logg logf.Logger
}
TransferFilter struct {
logg logf.Logger
pub *pub.Pub
}
)
func NewTransferFilter(o TransferFilterOpts) Filter {
return &TransferFilter{
eventEmitter: o.EventEmitter,
logg: o.Logg,
logg: o.Logg,
pub: o.Pub,
}
}
@@ -56,7 +58,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
f.logg.Debug("transfer_filter: new reg", "transfer", to)
transferEvent := &events.MinimalTxInfo{
transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
From: transaction.From.Address,
To: to.Hex(),
@@ -70,7 +72,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
transferEvent.Success = true
}
if err := f.eventEmitter.Publish(
if err := f.pub.Publish(
transferFilterEventSubject,
transaction.Hash,
transferEvent,
@@ -92,7 +94,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
f.logg.Debug("transfer_filter: new reg", "transferFrom", to)
transferEvent := &events.MinimalTxInfo{
transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
From: from.Hex(),
To: to.Hex(),
@@ -106,7 +108,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
transferEvent.Success = true
}
if err := f.eventEmitter.Publish(
if err := f.pub.Publish(
transferFilterEventSubject,
transaction.Hash,
transferEvent,
@@ -127,7 +129,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
f.logg.Debug("transfer_filter: new reg", "mintTo", to)
transferEvent := &events.MinimalTxInfo{
transferEvent := &pub.MinimalTxInfo{
Block: transaction.Block.Number,
From: transaction.From.Address,
To: to.Hex(),
@@ -141,7 +143,7 @@ func (f *TransferFilter) Execute(_ context.Context, transaction fetch.Transactio
transferEvent.Success = true
}
if err := f.eventEmitter.Publish(
if err := f.pub.Publish(
transferFilterEventSubject,
transaction.Hash,
transferEvent,

View File

@@ -10,19 +10,21 @@ import (
"github.com/zerodha/logf"
)
type PipelineOpts struct {
BlockFetcher fetch.Fetch
Filters []filter.Filter
Logg logf.Logger
Store store.Store[pgx.Rows]
}
type (
PipelineOpts struct {
BlockFetcher fetch.Fetch
Filters []filter.Filter
Logg logf.Logger
Store store.Store[pgx.Rows]
}
type Pipeline struct {
fetch fetch.Fetch
filters []filter.Filter
logg logf.Logger
store store.Store[pgx.Rows]
}
Pipeline struct {
fetch fetch.Fetch
filters []filter.Filter
logg logf.Logger
store store.Store[pgx.Rows]
}
)
func NewPipeline(o PipelineOpts) *Pipeline {
return &Pipeline{

View File

@@ -7,6 +7,10 @@ import (
"github.com/alitto/pond"
)
const (
idleTimeout = 1 * time.Second
)
type Opts struct {
Concurrency int
QueueSize int
@@ -18,7 +22,7 @@ func NewPool(ctx context.Context, o Opts) *pond.WorkerPool {
o.Concurrency,
o.QueueSize,
pond.MinWorkers(o.Concurrency),
pond.IdleTimeout(time.Second*1),
pond.IdleTimeout(idleTimeout),
pond.Context(ctx),
)
}

81
internal/pub/jetstream.go Normal file
View File

@@ -0,0 +1,81 @@
package pub
import (
"time"
"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)
const (
streamName string = "CHAIN"
streamSubjects string = "CHAIN.*"
)
type (
PubOpts struct {
DedupDuration time.Duration
JsCtx nats.JetStreamContext
NatsConn *nats.Conn
PersistDuration time.Duration
}
Pub struct {
natsConn *nats.Conn
jsCtx nats.JetStreamContext
}
MinimalTxInfo struct {
Block uint64 `json:"block"`
From string `json:"from"`
To string `json:"to"`
ContractAddress string `json:"contractAddress"`
Success bool `json:"success"`
TxHash string `json:"transactionHash"`
TxIndex uint `json:"transactionIndex"`
Value uint64 `json:"value"`
}
)
func NewPub(o PubOpts) (*Pub, error) {
stream, _ := o.JsCtx.StreamInfo(streamName)
if stream == nil {
_, err := o.JsCtx.AddStream(&nats.StreamConfig{
Name: streamName,
MaxAge: o.PersistDuration,
Storage: nats.FileStorage,
Subjects: []string{streamSubjects},
Duplicates: o.DedupDuration,
})
if err != nil {
return nil, err
}
}
return &Pub{
jsCtx: o.JsCtx,
natsConn: o.NatsConn,
}, nil
}
// Close gracefully shutdowns the JetStream connection.
func (p *Pub) Close() {
if p.natsConn != nil {
p.natsConn.Close()
}
}
// Publish publishes the JSON data to the NATS stream.
func (p *Pub) Publish(subject string, dedupId string, eventPayload interface{}) error {
jsonData, err := json.Marshal(eventPayload)
if err != nil {
return err
}
_, err = p.jsCtx.Publish(subject, jsonData, nats.MsgId(dedupId))
if err != nil {
return err
}
return nil
}

View File

@@ -3,27 +3,37 @@ package store
import (
"context"
"fmt"
"os"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate"
"github.com/knadh/goyesql/v2"
"github.com/zerodha/logf"
)
type queries struct {
CommitBlock string `query:"commit-block"`
GetMissingBlocks string `query:"get-missing-blocks"`
GetSearchBounds string `query:"get-search-bounds"`
InitSyncerMeta string `query:"init-syncer-meta"`
SetSearchLowerBound string `query:"set-search-lower-bound"`
}
const (
schemaTable = "schema_version"
)
type PostgresStoreOpts struct {
DSN string
InitialLowerBound uint64
Logg logf.Logger
Queries goyesql.Queries
}
type (
queries struct {
CommitBlock string `query:"commit-block"`
GetMissingBlocks string `query:"get-missing-blocks"`
GetSearchBounds string `query:"get-search-bounds"`
InitSyncerMeta string `query:"init-syncer-meta"`
SetSearchLowerBound string `query:"set-search-lower-bound"`
}
PostgresStoreOpts struct {
DSN string
MigrationsFolderPath string
InitialLowerBound uint64
Logg logf.Logger
Queries goyesql.Queries
}
)
type PostgresStore struct {
logg logf.Logger
@@ -45,12 +55,34 @@ func NewPostgresStore(o PostgresStoreOpts) (Store[pgx.Rows], error) {
return nil, err
}
dbPool, err := pgxpool.NewWithConfig(context.Background(), parsedConfig)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
dbPool, err := pgxpool.NewWithConfig(ctx, parsedConfig)
if err != nil {
return nil, err
}
_, err = dbPool.Exec(context.Background(), postgresStore.queries.InitSyncerMeta, o.InitialLowerBound)
conn, err := dbPool.Acquire(ctx)
if err != nil {
return nil, err
}
defer conn.Release()
migrator, err := migrate.NewMigrator(ctx, conn.Conn(), schemaTable)
if err != nil {
return nil, err
}
if err := migrator.LoadMigrations(os.DirFS(o.MigrationsFolderPath)); err != nil {
return nil, err
}
if err := migrator.Migrate(ctx); err != nil {
return nil, err
}
_, err = dbPool.Exec(ctx, postgresStore.queries.InitSyncerMeta, o.InitialLowerBound)
if err != nil {
return nil, err
}

View File

@@ -2,6 +2,7 @@ package syncer
import (
"context"
"time"
"github.com/alitto/pond"
"github.com/celo-org/celo-blockchain/core/types"
@@ -10,21 +11,27 @@ import (
"github.com/zerodha/logf"
)
type HeadSyncerOpts struct {
Stats *Stats
Pipeline *pipeline.Pipeline
Logg logf.Logger
Pool *pond.WorkerPool
WsEndpoint string
}
const (
jobTimeout = 5 * time.Second
)
type HeadSyncer struct {
stats *Stats
pipeline *pipeline.Pipeline
logg logf.Logger
ethClient *ethclient.Client
pool *pond.WorkerPool
}
type (
HeadSyncerOpts struct {
Logg logf.Logger
Pipeline *pipeline.Pipeline
Pool *pond.WorkerPool
Stats *Stats
WsEndpoint string
}
HeadSyncer struct {
ethClient *ethclient.Client
logg logf.Logger
pipeline *pipeline.Pipeline
pool *pond.WorkerPool
stats *Stats
}
)
func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
ethClient, err := ethclient.Dial(o.WsEndpoint)
@@ -33,11 +40,11 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
}
return &HeadSyncer{
stats: o.Stats,
pipeline: o.Pipeline,
logg: o.Logg,
ethClient: ethClient,
logg: o.Logg,
pipeline: o.Pipeline,
pool: o.Pool,
stats: o.Stats,
}, nil
}
@@ -46,7 +53,7 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
func (hs *HeadSyncer) Start(ctx context.Context) error {
headerReceiver := make(chan *types.Header, 1)
sub, err := hs.ethClient.SubscribeNewHead(ctx, headerReceiver)
sub, err := hs.ethClient.SubscribeNewHead(context.Background(), headerReceiver)
if err != nil {
return err
}
@@ -62,10 +69,12 @@ func (hs *HeadSyncer) Start(ctx context.Context) error {
case header := <-headerReceiver:
blockNumber := header.Number.Uint64()
hs.logg.Debug("head syncer: received new block", "block", blockNumber)
hs.stats.UpdateHeadCursor(blockNumber)
hs.pool.Submit(func() {
if err := hs.pipeline.Run(context.Background(), blockNumber); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), jobTimeout)
defer cancel()
if err := hs.pipeline.Run(ctx, blockNumber); err != nil {
hs.logg.Error("head syncer: piepline run error", "error", err)
}
})

View File

@@ -15,25 +15,27 @@ const (
headBlockLag = 5
)
type JanitorOpts struct {
BatchSize uint64
Logg logf.Logger
Pipeline *pipeline.Pipeline
Pool *pond.WorkerPool
Stats *Stats
Store store.Store[pgx.Rows]
SweepInterval time.Duration
}
type (
JanitorOpts struct {
BatchSize uint64
Logg logf.Logger
Pipeline *pipeline.Pipeline
Pool *pond.WorkerPool
Stats *Stats
Store store.Store[pgx.Rows]
SweepInterval time.Duration
}
type Janitor struct {
batchSize uint64
pipeline *pipeline.Pipeline
logg logf.Logger
pool *pond.WorkerPool
stats *Stats
store store.Store[pgx.Rows]
sweepInterval time.Duration
}
Janitor struct {
batchSize uint64
pipeline *pipeline.Pipeline
logg logf.Logger
pool *pond.WorkerPool
stats *Stats
store store.Store[pgx.Rows]
sweepInterval time.Duration
}
)
func NewJanitor(o JanitorOpts) *Janitor {
return &Janitor{
@@ -56,8 +58,11 @@ func (j *Janitor) Start(ctx context.Context) error {
j.logg.Info("janitor: shutdown signal received")
return nil
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), jobTimeout)
defer cancel()
j.logg.Debug("janitor: starting sweep")
if err := j.QueueMissingBlocks(context.Background()); err != nil {
if err := j.QueueMissingBlocks(ctx); err != nil {
j.logg.Error("janitor: queue missing blocks error", "error", err)
}
}