mirror of
https://github.com/grassrootseconomics/eth-tracker.git
synced 2026-05-17 02:15:19 +02:00
release: v1.0.0
This commit is contained in:
80
internal/syncer/realtime.go
Normal file
80
internal/syncer/realtime.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/celo-org/celo-blockchain"
|
||||
"github.com/celo-org/celo-blockchain/core/types"
|
||||
"github.com/celo-org/celo-blockchain/event"
|
||||
)
|
||||
|
||||
type BlockQueueFn func(uint64) error
|
||||
|
||||
const resubscribeInterval = 2 * time.Second
|
||||
|
||||
func (s *Syncer) Stop() {
|
||||
if s.realtimeSub != nil {
|
||||
s.realtimeSub.Unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) Start() {
|
||||
s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn())
|
||||
}
|
||||
|
||||
func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) {
|
||||
newHeadersReceiver := make(chan *types.Header, 1)
|
||||
sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver)
|
||||
s.logg.Info("realtime syncer connected to ws endpoint")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return event.NewSubscription(func(quit <-chan struct{}) error {
|
||||
eventsCtx, eventsCancel := context.WithCancel(context.Background())
|
||||
defer eventsCancel()
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-quit:
|
||||
s.logg.Info("realtime syncer stopping")
|
||||
eventsCancel()
|
||||
case <-eventsCtx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case header := <-newHeadersReceiver:
|
||||
if err := fn(header.Number.Uint64()); err != nil {
|
||||
s.logg.Error("realtime block queuer error", "error", err)
|
||||
}
|
||||
case <-eventsCtx.Done():
|
||||
s.logg.Info("realtime syncer shutting down")
|
||||
return nil
|
||||
case err := <-sub.Err():
|
||||
return err
|
||||
}
|
||||
}
|
||||
}), nil
|
||||
}
|
||||
|
||||
func (s *Syncer) queueRealtimeBlock(blockNumber uint64) error {
|
||||
s.queue.Push(blockNumber)
|
||||
if err := s.db.SetUpperBound(blockNumber); err != nil {
|
||||
return err
|
||||
}
|
||||
s.stats.SetLatestBlock(blockNumber)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc {
|
||||
return func(ctx context.Context, err error) (event.Subscription, error) {
|
||||
if err != nil {
|
||||
s.logg.Error("resubscribing after failed subscription", "error", err)
|
||||
}
|
||||
return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock)
|
||||
}
|
||||
}
|
||||
77
internal/syncer/syncer.go
Normal file
77
internal/syncer/syncer.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/celo-org/celo-blockchain"
|
||||
"github.com/celo-org/celo-blockchain/ethclient"
|
||||
"github.com/grassrootseconomics/celo-tracker/internal/chain"
|
||||
"github.com/grassrootseconomics/celo-tracker/internal/db"
|
||||
"github.com/grassrootseconomics/celo-tracker/internal/queue"
|
||||
"github.com/grassrootseconomics/celo-tracker/internal/stats"
|
||||
)
|
||||
|
||||
type (
|
||||
SyncerOpts struct {
|
||||
DB db.DB
|
||||
Chain chain.Chain
|
||||
Logg *slog.Logger
|
||||
Queue *queue.Queue
|
||||
Stats *stats.Stats
|
||||
StartBlock int64
|
||||
WebSocketEndpoint string
|
||||
}
|
||||
|
||||
Syncer struct {
|
||||
db db.DB
|
||||
ethClient *ethclient.Client
|
||||
logg *slog.Logger
|
||||
realtimeSub celo.Subscription
|
||||
stats *stats.Stats
|
||||
queue *queue.Queue
|
||||
stopCh chan struct{}
|
||||
}
|
||||
)
|
||||
|
||||
func New(o SyncerOpts) (*Syncer, error) {
|
||||
latestBlock, err := o.Chain.GetLatestBlock(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lowerBound, err := o.DB.GetLowerBound()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if lowerBound == 0 {
|
||||
if o.StartBlock > 0 {
|
||||
if err := o.DB.SetLowerBound(uint64(o.StartBlock)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if err := o.DB.SetLowerBound(latestBlock); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := o.DB.SetUpperBound(latestBlock); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
o.Stats.SetLatestBlock(latestBlock)
|
||||
|
||||
ethClient, err := ethclient.Dial(o.WebSocketEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Syncer{
|
||||
db: o.DB,
|
||||
ethClient: ethClient,
|
||||
logg: o.Logg,
|
||||
stats: o.Stats,
|
||||
queue: o.Queue,
|
||||
stopCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user