diff --git a/cmd/tracker/init.go b/cmd/tracker/init.go index 78b61a5..c08ec24 100644 --- a/cmd/tracker/init.go +++ b/cmd/tracker/init.go @@ -14,7 +14,6 @@ import ( func initLogger() *slog.Logger { loggOpts := logg.LoggOpts{ - Component: "tracker", FormatType: logg.Logfmt, LogLevel: slog.LevelInfo, } diff --git a/cmd/tracker/main.go b/cmd/tracker/main.go index 7a66ba9..e573b4d 100644 --- a/cmd/tracker/main.go +++ b/cmd/tracker/main.go @@ -21,7 +21,7 @@ import ( "github.com/knadh/koanf/v2" ) -const defaultGracefulShutdownPeriod = time.Second * 10 +const defaultGracefulShutdownPeriod = time.Second * 15 var ( build = "dev" @@ -43,6 +43,16 @@ func init() { } func main() { + // mux := http.NewServeMux() + // statsviz.Register(mux) + + // go func() { + // lo.Info("metrics", "host:port", http.ListenAndServe("localhost:8080", mux)) + // }() + // go func() { + // lo.Info("profiler", "host:port", http.ListenAndServe("localhost:6060", nil)) + // }() + var ( batchQueue deque.Deque[uint64] blocksQueue deque.Deque[types.Block] @@ -94,10 +104,10 @@ func main() { lo.Error("could not initialize chain syncer", "error", err) os.Exit(1) } - if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil { - lo.Error("could not bootstrap historical syncer", "error", err) - os.Exit(1) - } + // if err := chainSyncer.BootstrapHistoricalSyncer(); err != nil { + // lo.Error("could not bootstrap historical syncer", "error", err) + // os.Exit(1) + // } blockProcessor := processor.NewProcessor(processor.ProcessorOpts{ Chain: chain, @@ -107,35 +117,38 @@ func main() { DB: db, }) + // wg.Add(1) + // go func() { + // defer wg.Done() + // chainSyncer.StartHistoricalSyncer(ctx) + // }() + wg.Add(1) go func() { defer wg.Done() - chainSyncer.StartHistoricalSyncer(ctx) + chainSyncer.StartRealtime() }() wg.Add(1) go func() { defer wg.Done() - chainSyncer.StartRealtimeSyncer(ctx) - }() - - wg.Add(1) - go func() { - defer wg.Done() - blockProcessor.Start(ctx) + blockProcessor.Start() }() <-ctx.Done() lo.Info("shutdown signal received") shutdownCtx, cancel := context.WithTimeout(context.Background(), defaultGracefulShutdownPeriod) + wg.Add(1) + go func() { + defer wg.Done() + chainSyncer.StopRealtime() + }() + wg.Add(1) go func() { defer wg.Done() blockProcessor.Stop() - if err := db.Close(); err != nil { - lo.Error("error closing db", "error", err) - } }() go func() { diff --git a/config.toml b/config.toml index 265db5b..7e77acf 100644 --- a/config.toml +++ b/config.toml @@ -8,9 +8,9 @@ go_process = true address = ":5001" [chain] -start_block = 24873500 +start_block = 24905000 graphql_endpoint = "" ws_endpoint = "wss://ws.celo.grassecon.net" -rpc_endpoint = "https://celo.grassecon.net" +rpc_endpoint = "https://rpc.ankr.com/celo/bae2b7745f52c50974d7ecb1a7c23dc05d9ab5b68caf498a7c73f09a3e8bc04a" testnet = false registry_address = "" diff --git a/go.mod b/go.mod index 4d62002..117ff27 100644 --- a/go.mod +++ b/go.mod @@ -4,16 +4,19 @@ go 1.22.1 require ( github.com/alitto/pond v1.8.3 + github.com/arl/statsviz v0.6.0 github.com/bits-and-blooms/bitset v1.13.0 github.com/celo-org/celo-blockchain v1.8.0 + github.com/dgraph-io/badger/v4 v4.2.0 github.com/ef-ds/deque/v2 v2.0.2 - github.com/grassrootseconomics/celoutils/v2 v2.4.1 + github.com/grassrootseconomics/celoutils/v2 v2.4.2 github.com/grassrootseconomics/w3-celo v0.16.0 github.com/kamikazechaser/common v0.2.0 github.com/knadh/koanf/parsers/toml v0.1.0 github.com/knadh/koanf/providers/env v0.1.0 github.com/knadh/koanf/providers/file v0.1.0 github.com/knadh/koanf/v2 v2.1.0 + go.etcd.io/bbolt v1.3.9 ) require ( @@ -31,7 +34,6 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect - github.com/dgraph-io/badger/v4 v4.2.0 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect @@ -70,7 +72,7 @@ require ( go.opencensus.io v0.22.5 // indirect golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sync v0.3.0 // indirect + golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/go.sum b/go.sum index 5fb60d7..a4bccec 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= +github.com/arl/statsviz v0.6.0 h1:jbW1QJkEYQkufd//4NDYRSNBpwJNrdzPahF7ZmoGdyE= +github.com/arl/statsviz v0.6.0/go.mod h1:0toboo+YGSUXDaS4g1D5TVS4dXs7S7YYT5J/qnW2h8s= github.com/aws/aws-sdk-go-v2 v1.2.0/go.mod h1:zEQs02YRBw1DjK0PoJv3ygDYOFTre1ejlJWl8FwAuQo= github.com/aws/aws-sdk-go-v2/config v1.1.1/go.mod h1:0XsVy9lBI/BCXm+2Tuvt39YmdHwS5unDQmxZOYe8F5Y= github.com/aws/aws-sdk-go-v2/credentials v1.1.1/go.mod h1:mM2iIjwl7LULWtS6JCACyInboHirisUUdkBPoTHMOUo= @@ -144,6 +146,7 @@ github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWa github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= @@ -258,6 +261,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/grassrootseconomics/celoutils/v2 v2.4.1 h1:8S4+TfXVevxu3+tBIyzGM4z6iT3Rw3fZHoS93rGFCVo= github.com/grassrootseconomics/celoutils/v2 v2.4.1/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg= +github.com/grassrootseconomics/celoutils/v2 v2.4.2 h1:EAXLMLJhv9ukAlM2me8A+jHInxXeSSOmEWKY9zHPONQ= +github.com/grassrootseconomics/celoutils/v2 v2.4.2/go.mod h1:DB9sh7lY9zw0/cyCu8uYunAe+IDM8/104l+KEhkJnqg= github.com/grassrootseconomics/w3-celo v0.16.0 h1:AKPd+LGqR4YgkLw44V4Jgq/+prhJfTnaWzFOdS8JRgg= github.com/grassrootseconomics/w3-celo v0.16.0/go.mod h1:SVduFQshhMPMIRFKix6JwOZmv5a/e0NTObVeB4lXrH4= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= @@ -484,6 +489,8 @@ github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -575,8 +582,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -666,6 +673,8 @@ golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/chain/revert.go b/internal/chain/revert.go new file mode 100644 index 0000000..4598043 --- /dev/null +++ b/internal/chain/revert.go @@ -0,0 +1,55 @@ +package chain + +import ( + "context" + "math/big" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/w3-celo" + "github.com/grassrootseconomics/w3-celo/module/eth" +) + +func (c *Chain) GetRevertReason(ctx context.Context, txHash common.Hash, blockNumber *big.Int) (string, error) { + return c.provider.SimulateRevertedTx(ctx, txHash, blockNumber) +} + +func (c *Chain) TestDecodeTransfer(ctx context.Context, logs []*types.Log) { + signature := "Transfer(address indexed _from, address indexed _to, uint256 _value)" + + eventTransfer := w3.MustNewEvent(signature) + + for _, log := range logs { + if log.Topics[0] == w3.H("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") { + var ( + from common.Address + to common.Address + value big.Int + + tokenSymbol string + tokenDecimals big.Int + ) + + if err := c.provider.Client.CallCtx( + ctx, + eth.CallFunc(log.Address, w3.MustNewFunc("symbol()", "string")).Returns(&tokenSymbol), + eth.CallFunc(log.Address, w3.MustNewFunc("decimals()", "uint256")).Returns(&tokenDecimals), + ); err != nil { + c.logg.Error("token details fetcher", "error", err) + } + + if err := eventTransfer.DecodeArgs(log, &from, &to, &value); err != nil { + c.logg.Error("event decoder", "error", err) + } + + c.logg.Info("transfer event", + "hash", log.TxHash, + "token", tokenSymbol, + "from", from, + "to", to, + "value", value.Uint64(), + ) + } + } + +} diff --git a/internal/processor/block.go b/internal/processor/block.go new file mode 100644 index 0000000..85d1706 --- /dev/null +++ b/internal/processor/block.go @@ -0,0 +1,42 @@ +package processor + +import ( + "context" + + "github.com/celo-org/celo-blockchain/common" + "github.com/celo-org/celo-blockchain/core/types" +) + +func (p *Processor) processBlock(ctx context.Context, block types.Block) error { + blockNumber := block.NumberU64() + + txs, err := p.chain.GetTransactions(ctx, block) + p.logg.Debug("successfully fetched transactions", "txs", len(txs)) + if err != nil { + return err + } + + receiptsResp, err := p.chain.GetReceipts(ctx, block) + p.logg.Debug("successfully fetched receipts", "receipts", len(txs)) + if err != nil { + return err + } + + for i, receipt := range receiptsResp { + if receipt.Status > 0 { + // test transfers + p.chain.TestDecodeTransfer(ctx, receipt.Logs) + } else { + revertReason, _ := p.chain.GetRevertReason(ctx, receipt.TxHash, receipt.BlockNumber) + p.logg.Debug("tx reverted", "hash", receipt.TxHash, "revert_reason", revertReason, "input_data", common.Bytes2Hex(txs[i].Data())) + + } + } + + if err := p.db.SetValue(blockNumber); err != nil { + return err + } + p.logg.Debug("successfully processed block", "block", blockNumber) + + return nil +} diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 9a77e0b..68211f2 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -30,6 +30,7 @@ type ( logg *slog.Logger stats *stats.Stats db *db.DB + quit chan struct{} } ) @@ -45,52 +46,38 @@ func NewProcessor(o ProcessorOpts) *Processor { logg: o.Logg, stats: o.Stats, db: o.DB, + quit: make(chan struct{}), } } func (p *Processor) Start() { for { - if p.blocksQueue.Len() > 0 { - v, _ := p.blocksQueue.PopFront() - p.pool.Submit(func() { - if err := p.processBlock(v); err != nil { - p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) - } - }) - } else { - time.Sleep(emptyQueueIdleTime) + select { + case <-p.quit: + p.logg.Info("processor stopped, draining workerpool queue") + p.pool.StopAndWait() + if err := p.db.Close(); err != nil { + p.logg.Info("error closing db", "error", err) + } + return + default: + if p.blocksQueue.Len() > 0 { + v, _ := p.blocksQueue.PopFront() + p.pool.Submit(func() { + p.logg.Info("processing", "block", v.Number()) + if err := p.processBlock(context.Background(), v); err != nil { + p.logg.Info("block processor error", "block", v.NumberU64(), "error", err) + } + }) + } else { + time.Sleep(emptyQueueIdleTime) + p.logg.Debug("queue empty slept for 1 second") + } } } } func (p *Processor) Stop() { - p.pool.StopAndWait() -} - -func (p *Processor) processBlock(block types.Block) error { - ctx := context.Background() - blockNumber := block.NumberU64() - - _, err := p.chain.GetTransactions(ctx, block) - if err != nil { - return err - } - - receiptsResp, err := p.chain.GetReceipts(ctx, block) - if err != nil { - return err - } - - for _, receipt := range receiptsResp { - if receipt.Status < 1 { - // - } - } - - if err := p.db.SetValue(blockNumber); err != nil { - return err - } - p.logg.Debug("successfully processed block", "block", blockNumber) - - return nil + p.logg.Info("signaling processor shutdown") + p.quit <- struct{}{} } diff --git a/internal/syncer/realtime.go b/internal/syncer/realtime.go index 39bf723..8c72ddf 100644 --- a/internal/syncer/realtime.go +++ b/internal/syncer/realtime.go @@ -2,6 +2,7 @@ package syncer import ( "context" + "errors" "fmt" "time" @@ -15,48 +16,23 @@ type ( ) const ( - resubscribeInterval = 5 * time.Second + resubscribeInterval = 15 * time.Second ) -// func (s *Syncer) StartRealtimeSyncer(ctx context.Context) error { -// newHeadersReceiver := make(chan *types.Header, 1) - -// sub := event.ResubscribeErr(resubscribeInterval, func(ctx context.Context, err error) (event.Subscription, error) { -// if err != nil { -// s.logg.Error("realtime syncer resubscribe error", "error", err) -// } -// return s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) -// }) -// defer sub.Unsubscribe() - -// for { -// select { -// case <-ctx.Done(): -// s.logg.Info("realtime syncer shutting down") -// return nil -// case header := <-newHeadersReceiver: -// blockNumber := header.Number.Uint64() -// block, err := s.chain.GetBlock(context.Background(), blockNumber) -// if err != nil { -// s.logg.Error("realtime block fetcher error", "block", blockNumber, "error", err) -// } -// s.blocksQueue.PushFront(block) -// } -// } -// } - func (s *Syncer) StartRealtime() { s.realtimeSub = event.ResubscribeErr(resubscribeInterval, s.resubscribeFn()) } func (s *Syncer) StopRealtime() { - s.realtimeSub.Unsubscribe() - s.realtimeSub = nil + if s.realtimeSub != nil { + s.realtimeSub.Unsubscribe() + } } func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (celo.Subscription, error) { newHeadersReceiver := make(chan *types.Header, 10) sub, err := s.ethClient.SubscribeNewHead(ctx, newHeadersReceiver) + s.logg.Info("realtime syncer connected to ws endpoint") if err != nil { return nil, err } @@ -69,6 +45,7 @@ func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (ce go func() { select { case <-quit: + s.logg.Info("realtime syncer stopping") eventsCancel() case <-eventsCtx.Done(): return @@ -80,9 +57,12 @@ func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (ce case header := <-newHeadersReceiver: s.logg.Debug("received block", "block", header.Number.Uint64()) if err := fn(eventsCtx, header.Number.Uint64()); err != nil { - s.logg.Error("realtime block queuer error", "error", err) + if !errors.Is(err, context.Canceled) { + s.logg.Error("realtime block queuer error", "error", err) + } } case <-eventsCtx.Done(): + s.logg.Info("realtime syncer shutting down") return nil case err := <-sub.Err(): return err @@ -94,7 +74,9 @@ func (s *Syncer) receiveRealtimeBlocks(ctx context.Context, fn BlockQueueFn) (ce func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) error { block, err := s.chain.GetBlock(ctx, blockNumber) if err != nil { - return fmt.Errorf("block %d error: %v", blockNumber, err) + if !errors.Is(err, context.Canceled) { + return fmt.Errorf("block %d error: %v", blockNumber, err) + } } s.blocksQueue.PushFront(block) s.logg.Debug("queued block", "block", blockNumber) @@ -104,7 +86,7 @@ func (s *Syncer) queueRealtimeBlock(ctx context.Context, blockNumber uint64) err func (s *Syncer) resubscribeFn() event.ResubscribeErrFunc { return func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { - s.logg.Error("resubscribing after failed suibscription", "error", err) + s.logg.Error("resubscribing after failed subscription", "error", err) } return s.receiveRealtimeBlocks(ctx, s.queueRealtimeBlock) } diff --git a/test b/test new file mode 100755 index 0000000..77a40b1 Binary files /dev/null and b/test differ