mirror of
https://github.com/grassrootseconomics/cic-chain-events.git
synced 2024-12-22 11:57:31 +01:00
Merge branch 'sohail/proxy-contract'
This commit is contained in:
commit
e6e942278b
@ -1,33 +1,71 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/celo-org/celo-blockchain/common"
|
||||
"github.com/grassrootseconomics/celoutils"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/filter"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
|
||||
"github.com/grassrootseconomics/w3-celo-patch"
|
||||
"github.com/grassrootseconomics/w3-celo-patch/module/eth"
|
||||
"github.com/grassrootseconomics/w3-celo-patch/w3types"
|
||||
)
|
||||
|
||||
var (
|
||||
systemAddress string
|
||||
)
|
||||
func initAddressFilter(celoProvider *celoutils.Provider, cache *sync.Map) filter.Filter {
|
||||
var (
|
||||
tokenIndexEntryCount big.Int
|
||||
)
|
||||
|
||||
func initAddressFilter() filter.Filter {
|
||||
// TODO: Temporary shortcut
|
||||
systemAddress = strings.ToLower(ko.MustString("chain.system_address"))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// TODO: Bootstrap addresses from smart contract
|
||||
// TODO: Add route to update cache
|
||||
cache := &sync.Map{}
|
||||
registryMap, err := celoProvider.RegistryMap(ctx, celoutils.HexToAddress(ko.MustString("chain.registry_address")))
|
||||
if err != nil {
|
||||
lo.Fatal("init: critical error creating address filter", "error", err)
|
||||
}
|
||||
|
||||
cache.Store(strings.ToLower(ko.MustString("chain.token_index_address")), "TokenIndex")
|
||||
cache.Store(strings.ToLower(ko.MustString("chain.gas_faucet_address")), "GasFaucet")
|
||||
cache.Store(strings.ToLower(ko.MustString("chain.user_index_address")), "UserIndex")
|
||||
for k, v := range registryMap {
|
||||
cache.Store(strings.ToLower(v.Hex()), k)
|
||||
}
|
||||
|
||||
if err := celoProvider.Client.CallCtx(
|
||||
ctx,
|
||||
eth.CallFunc(w3.MustNewFunc("entryCount()", "uint256"), registryMap[celoutils.TokenIndex]).Returns(&tokenIndexEntryCount),
|
||||
); err != nil {
|
||||
lo.Fatal("init: critical error creating address filter", "error", err)
|
||||
}
|
||||
|
||||
calls := make([]w3types.Caller, tokenIndexEntryCount.Int64())
|
||||
tokenAddresses := make([]common.Address, tokenIndexEntryCount.Int64())
|
||||
|
||||
entrySig := w3.MustNewFunc("entry(uint256 _idx)", "address")
|
||||
|
||||
// TODO: There is a 5MB limit to a RPC batch call size.
|
||||
// Test if 10k entries will raise an error (future proofed for a lot of years)
|
||||
for i := 0; i < int(tokenIndexEntryCount.Int64()); i++ {
|
||||
calls[i] = eth.CallFunc(entrySig, registryMap[celoutils.TokenIndex], new(big.Int).SetInt64(int64(i))).Returns(&tokenAddresses[i])
|
||||
}
|
||||
|
||||
if err := celoProvider.Client.CallCtx(
|
||||
ctx,
|
||||
calls...,
|
||||
); err != nil {
|
||||
lo.Fatal("init: critical error creating address filter", "error", err)
|
||||
}
|
||||
|
||||
for i, v := range tokenAddresses {
|
||||
cache.Store(strings.ToLower(v.Hex()), fmt.Sprintf("TOKEN_%d", i))
|
||||
}
|
||||
|
||||
return filter.NewAddressFilter(filter.AddressFilterOpts{
|
||||
Cache: cache,
|
||||
Logg: lo,
|
||||
SystemAddress: systemAddress,
|
||||
Cache: cache,
|
||||
Logg: lo,
|
||||
})
|
||||
}
|
||||
|
||||
@ -41,9 +79,8 @@ func initTransferFilter(pub *pub.Pub) filter.Filter {
|
||||
|
||||
func initGasGiftFilter(pub *pub.Pub) filter.Filter {
|
||||
return filter.NewGasFilter(filter.GasFilterOpts{
|
||||
Pub: pub,
|
||||
Logg: lo,
|
||||
SystemAddress: systemAddress,
|
||||
Pub: pub,
|
||||
Logg: lo,
|
||||
})
|
||||
}
|
||||
|
||||
@ -53,3 +90,11 @@ func initRegisterFilter(pub *pub.Pub) filter.Filter {
|
||||
Logg: lo,
|
||||
})
|
||||
}
|
||||
|
||||
func initTokenIndexFilter(cache *sync.Map, pub *pub.Pub) filter.Filter {
|
||||
return filter.NewTokenIndexFilter(filter.TokenIndexFilterOpts{
|
||||
Cache: cache,
|
||||
Pub: pub,
|
||||
Logg: lo,
|
||||
})
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/alitto/pond"
|
||||
"github.com/grassrootseconomics/celoutils"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pool"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/store"
|
||||
@ -128,3 +129,22 @@ func initPub(natsConn *nats.Conn, jsCtx nats.JetStreamContext) *pub.Pub {
|
||||
|
||||
return pub
|
||||
}
|
||||
|
||||
func initCeloProvider() *celoutils.Provider {
|
||||
providerOpts := celoutils.ProviderOpts{
|
||||
RpcEndpoint: ko.MustString("chain.rpc_endpoint"),
|
||||
}
|
||||
|
||||
if ko.Bool("chain.testnet") {
|
||||
providerOpts.ChainId = celoutils.TestnetChainId
|
||||
} else {
|
||||
providerOpts.ChainId = celoutils.MainnetChainId
|
||||
}
|
||||
|
||||
provider, err := celoutils.NewProvider(providerOpts)
|
||||
if err != nil {
|
||||
lo.Fatal("init: critical error loading chain provider", "error", err)
|
||||
}
|
||||
|
||||
return provider
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
internalServiceContainer struct {
|
||||
internalServicesContainer struct {
|
||||
apiService *echo.Echo
|
||||
pub *pub.Pub
|
||||
}
|
||||
@ -55,19 +55,23 @@ func main() {
|
||||
natsConn, jsCtx := initJetStream()
|
||||
jsPub := initPub(natsConn, jsCtx)
|
||||
|
||||
celoProvider := initCeloProvider()
|
||||
cache := &sync.Map{}
|
||||
|
||||
pipeline := pipeline.NewPipeline(pipeline.PipelineOpts{
|
||||
BlockFetcher: graphqlFetcher,
|
||||
Filters: []filter.Filter{
|
||||
initAddressFilter(),
|
||||
initAddressFilter(celoProvider, cache),
|
||||
initGasGiftFilter(jsPub),
|
||||
initTransferFilter(jsPub),
|
||||
initRegisterFilter(jsPub),
|
||||
initTokenIndexFilter(cache, jsPub),
|
||||
},
|
||||
Logg: lo,
|
||||
Store: pgStore,
|
||||
})
|
||||
|
||||
internalServices := &internalServiceContainer{
|
||||
internalServices := &internalServicesContainer{
|
||||
pub: jsPub,
|
||||
}
|
||||
syncerStats := &syncer.Stats{}
|
||||
|
@ -17,7 +17,7 @@ func createSigChannel() (chan os.Signal, func()) {
|
||||
}
|
||||
}
|
||||
|
||||
func startGracefulShutdown(ctx context.Context, internalServices *internalServiceContainer) {
|
||||
func startGracefulShutdown(ctx context.Context, internalServices *internalServicesContainer) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
13
config.toml
13
config.toml
@ -5,16 +5,15 @@ go_process = true
|
||||
# API server
|
||||
[service]
|
||||
# Host and port
|
||||
address = ":5000"
|
||||
address = ":5001"
|
||||
|
||||
# Geth API endpoints
|
||||
[chain]
|
||||
graphql_endpoint = ""
|
||||
ws_endpoint = ""
|
||||
system_address = ""
|
||||
token_index_address = ""
|
||||
gas_faucet_address = ""
|
||||
user_index_address = ""
|
||||
graphql_endpoint = ""
|
||||
ws_endpoint = ""
|
||||
rpc_endpoint = ""
|
||||
testnet = true
|
||||
registry_address = ""
|
||||
|
||||
# Syncer configs
|
||||
[syncer]
|
||||
|
2
go.mod
2
go.mod
@ -6,7 +6,7 @@ require (
|
||||
github.com/VictoriaMetrics/metrics v1.23.1
|
||||
github.com/alitto/pond v1.8.3
|
||||
github.com/celo-org/celo-blockchain v1.7.2
|
||||
github.com/grassrootseconomics/celoutils v1.1.1
|
||||
github.com/grassrootseconomics/celoutils v1.2.1
|
||||
github.com/grassrootseconomics/w3-celo-patch v0.2.0
|
||||
github.com/jackc/pgx/v5 v5.3.1
|
||||
github.com/jackc/tern/v2 v2.0.1
|
||||
|
2
go.sum
2
go.sum
@ -269,6 +269,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
|
||||
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
|
||||
github.com/grassrootseconomics/celoutils v1.1.1 h1:REsndvfBkPN8UKOoQFNEGm/sCwKtTm+woYtgMl3bfZ0=
|
||||
github.com/grassrootseconomics/celoutils v1.1.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q=
|
||||
github.com/grassrootseconomics/celoutils v1.2.1 h1:ndM4h7Df0d57m2kdRXRStrnunqOL61wQ51rnOanX1KI=
|
||||
github.com/grassrootseconomics/celoutils v1.2.1/go.mod h1:Uo5YRy6AGLAHDZj9jaOI+AWoQ1H3L0v79728pPMkm9Q=
|
||||
github.com/grassrootseconomics/w3-celo-patch v0.2.0 h1:YqibbPzX0tQKmxU1nUGzThPKk/fiYeYZY6Aif3eyu8U=
|
||||
github.com/grassrootseconomics/w3-celo-patch v0.2.0/go.mod h1:WhBXNzNIvHmS6B2hAeShs56oa9Azb4jQSrOMKuMdBWw=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
|
@ -10,31 +10,24 @@ import (
|
||||
|
||||
type (
|
||||
AddressFilterOpts struct {
|
||||
Cache *sync.Map
|
||||
Logg logf.Logger
|
||||
SystemAddress string
|
||||
Cache *sync.Map
|
||||
Logg logf.Logger
|
||||
}
|
||||
|
||||
AddressFilter struct {
|
||||
cache *sync.Map
|
||||
logg logf.Logger
|
||||
systemAddress string
|
||||
cache *sync.Map
|
||||
logg logf.Logger
|
||||
}
|
||||
)
|
||||
|
||||
func NewAddressFilter(o AddressFilterOpts) Filter {
|
||||
return &AddressFilter{
|
||||
cache: o.Cache,
|
||||
logg: o.Logg,
|
||||
systemAddress: o.SystemAddress,
|
||||
cache: o.Cache,
|
||||
logg: o.Logg,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
|
||||
if transaction.From.Address == f.systemAddress {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if _, found := f.cache.Load(transaction.To.Address); found {
|
||||
return true, nil
|
||||
}
|
||||
|
@ -3,63 +3,69 @@ package filter
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/celo-org/celo-blockchain/common/hexutil"
|
||||
"github.com/celo-org/celo-blockchain/common"
|
||||
"github.com/grassrootseconomics/celoutils"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/grassrootseconomics/w3-celo-patch"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
const (
|
||||
gasFilterEventSubject = "CHAIN.gas"
|
||||
gasEventSubject = "CHAIN.gas"
|
||||
)
|
||||
|
||||
var (
|
||||
giveToSig = w3.MustNewFunc("giveTo(address)", "uint256")
|
||||
)
|
||||
|
||||
type (
|
||||
GasFilterOpts struct {
|
||||
Logg logf.Logger
|
||||
Pub *pub.Pub
|
||||
SystemAddress string
|
||||
Logg logf.Logger
|
||||
Pub *pub.Pub
|
||||
}
|
||||
|
||||
GasFilter struct {
|
||||
logg logf.Logger
|
||||
pub *pub.Pub
|
||||
systemAddress string
|
||||
logg logf.Logger
|
||||
pub *pub.Pub
|
||||
}
|
||||
)
|
||||
|
||||
func NewGasFilter(o GasFilterOpts) Filter {
|
||||
return &GasFilter{
|
||||
logg: o.Logg,
|
||||
pub: o.Pub,
|
||||
systemAddress: o.SystemAddress,
|
||||
logg: o.Logg,
|
||||
pub: o.Pub,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
|
||||
transferValue, err := hexutil.DecodeUint64(transaction.Value)
|
||||
if err != nil {
|
||||
return false, err
|
||||
if len(transaction.InputData) < 10 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TODO: This is a temporary shortcut to gift gas. Switch to gas faucet contract.
|
||||
if transaction.From.Address == f.systemAddress && transferValue > 0 {
|
||||
transferEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
To: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
Value: transferValue,
|
||||
if transaction.InputData[:10] == "0x63e4bff4" {
|
||||
var address common.Address
|
||||
|
||||
if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
giveToEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
To: address.Hex(),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
}
|
||||
|
||||
if transaction.Status == 1 {
|
||||
transferEvent.Success = true
|
||||
giveToEvent.Success = true
|
||||
}
|
||||
|
||||
if err := f.pub.Publish(
|
||||
gasFilterEventSubject,
|
||||
gasEventSubject,
|
||||
transaction.Hash,
|
||||
transferEvent,
|
||||
giveToEvent,
|
||||
); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -16,44 +16,44 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
addSig = w3.MustNewFunc("add(address)", "bool")
|
||||
registerSig = w3.MustNewFunc("register(address)", "")
|
||||
)
|
||||
|
||||
type (
|
||||
RegisterFilterOpts struct {
|
||||
Logg logf.Logger
|
||||
Pub *pub.Pub
|
||||
Logg logf.Logger
|
||||
Pub *pub.Pub
|
||||
}
|
||||
|
||||
RegisterFilter struct {
|
||||
logg logf.Logger
|
||||
pub *pub.Pub
|
||||
logg logf.Logger
|
||||
pub *pub.Pub
|
||||
}
|
||||
)
|
||||
|
||||
func NewRegisterFilter(o RegisterFilterOpts) Filter {
|
||||
return &RegisterFilter{
|
||||
logg: o.Logg,
|
||||
pub: o.Pub,
|
||||
logg: o.Logg,
|
||||
pub: o.Pub,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
|
||||
if len(transaction.InputData) < 10 {
|
||||
if len(transaction.InputData) < 10 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if transaction.InputData[:10] == "0x0a3b0a4f" {
|
||||
if transaction.InputData[:10] == "0x4420e486" {
|
||||
var address common.Address
|
||||
|
||||
if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
|
||||
if err := registerSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
addEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
ContractAddress: transaction.To.Address,
|
||||
To: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
To: address.Hex(),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
}
|
||||
|
81
internal/filter/token_index_filter.go
Normal file
81
internal/filter/token_index_filter.go
Normal file
@ -0,0 +1,81 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/celo-org/celo-blockchain/common"
|
||||
"github.com/grassrootseconomics/celoutils"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pub"
|
||||
"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
|
||||
"github.com/grassrootseconomics/w3-celo-patch"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
const (
|
||||
tokenIndexFilterEventSubject = "CHAIN.tokenAdded"
|
||||
)
|
||||
|
||||
var (
|
||||
addSig = w3.MustNewFunc("add(address)", "bool")
|
||||
)
|
||||
|
||||
type (
|
||||
TokenIndexFilterOpts struct {
|
||||
Cache *sync.Map
|
||||
Logg logf.Logger
|
||||
Pub *pub.Pub
|
||||
}
|
||||
|
||||
TokenIndexFilter struct {
|
||||
pub *pub.Pub
|
||||
cache *sync.Map
|
||||
logg logf.Logger
|
||||
}
|
||||
)
|
||||
|
||||
func NewTokenIndexFilter(o TokenIndexFilterOpts) Filter {
|
||||
return &TokenIndexFilter{
|
||||
cache: o.Cache,
|
||||
logg: o.Logg,
|
||||
pub: o.Pub,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *TokenIndexFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
|
||||
if len(transaction.InputData) < 10 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if transaction.InputData[:10] == "0x0a3b0a4f" {
|
||||
var address common.Address
|
||||
|
||||
if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
f.cache.Store(strings.ToLower(address.Hex()), transaction.Hash)
|
||||
|
||||
addEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
To: address.Hex(),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
}
|
||||
|
||||
if transaction.Status == 1 {
|
||||
addEvent.Success = true
|
||||
}
|
||||
|
||||
if err := f.pub.Publish(
|
||||
tokenIndexFilterEventSubject,
|
||||
transaction.Hash,
|
||||
addEvent,
|
||||
); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
@ -57,13 +57,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
|
||||
return false, err
|
||||
}
|
||||
|
||||
f.logg.Debug("transfer_filter: new reg", "transfer", to)
|
||||
|
||||
transferEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
From: celoutils.ChecksumAddress(transaction.From.Address),
|
||||
To: to.Hex(),
|
||||
ContractAddress: transaction.To.Address,
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
Value: value.Uint64(),
|
||||
@ -93,13 +91,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
|
||||
return false, err
|
||||
}
|
||||
|
||||
f.logg.Debug("transfer_filter: new reg", "transferFrom", to)
|
||||
|
||||
transferEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
From: from.Hex(),
|
||||
To: to.Hex(),
|
||||
ContractAddress: transaction.To.Address,
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
Value: value.Uint64(),
|
||||
@ -128,13 +124,11 @@ func (f *TransferFilter) Execute(_ context.Context, transaction *fetch.Transacti
|
||||
return false, err
|
||||
}
|
||||
|
||||
f.logg.Debug("transfer_filter: new reg", "mintTo", to)
|
||||
|
||||
transferEvent := &pub.MinimalTxInfo{
|
||||
Block: transaction.Block.Number,
|
||||
From: celoutils.ChecksumAddress(transaction.From.Address),
|
||||
To: to.Hex(),
|
||||
ContractAddress: transaction.To.Address,
|
||||
ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
|
||||
TxHash: transaction.Hash,
|
||||
TxIndex: transaction.Index,
|
||||
Value: value.Uint64(),
|
||||
|
@ -7,12 +7,14 @@ import (
|
||||
"github.com/alitto/pond"
|
||||
"github.com/celo-org/celo-blockchain/core/types"
|
||||
"github.com/celo-org/celo-blockchain/ethclient"
|
||||
"github.com/celo-org/celo-blockchain/event"
|
||||
"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
|
||||
"github.com/zerodha/logf"
|
||||
)
|
||||
|
||||
const (
|
||||
jobTimeout = 5 * time.Second
|
||||
jobTimeout = 5 * time.Second
|
||||
resubscribeBackoff = 2 * time.Second
|
||||
)
|
||||
|
||||
type (
|
||||
@ -53,10 +55,13 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) {
|
||||
func (hs *HeadSyncer) Start(ctx context.Context) error {
|
||||
headerReceiver := make(chan *types.Header, 1)
|
||||
|
||||
sub, err := hs.ethClient.SubscribeNewHead(context.Background(), headerReceiver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sub := event.ResubscribeErr(resubscribeBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
|
||||
if err != nil {
|
||||
hs.logg.Error("head syncer: resubscribe error", "error", err)
|
||||
}
|
||||
|
||||
return hs.ethClient.SubscribeNewHead(ctx, headerReceiver)
|
||||
})
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
for {
|
||||
@ -64,8 +69,6 @@ func (hs *HeadSyncer) Start(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
hs.logg.Info("head syncer: shutdown signal received")
|
||||
return nil
|
||||
case err := <-sub.Err():
|
||||
return err
|
||||
case header := <-headerReceiver:
|
||||
blockNumber := header.Number.Uint64()
|
||||
hs.logg.Debug("head syncer: received new block", "block", blockNumber)
|
||||
@ -75,7 +78,7 @@ func (hs *HeadSyncer) Start(ctx context.Context) error {
|
||||
defer cancel()
|
||||
|
||||
if err := hs.pipeline.Run(ctx, blockNumber); err != nil {
|
||||
hs.logg.Error("head syncer: piepline run error", "error", err)
|
||||
hs.logg.Error("head syncer: pipeline run error", "error", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user