diff --git a/README.md b/README.md index 014b302..98fdac9 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,19 @@ ![GitHub release (latest by date)](https://img.shields.io/github/v/release/grassrootseconomics/cic-chain-events) ![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/grassrootseconomics/cic-chain-events/build.yaml) +[![Go Report Card](https://goreportcard.com/badge/github.com/grassrootseconomics/cic-chain-events)](https://goreportcard.com/report/github.com/grassrootseconomics/cic-chain-events) > CIC Chain Events +Filters live (and past) transactions on Celo and emits relevant events to a sink for further processing/indexing. + ## Documentation +- [Config and usage](docs/usage.md) +- [Functionality](docs/functionality.md) +- [Writing filters](docs/filters.md) - [API](docs/api.md) + +## License + +[AGPL-3.0](LICENSE) diff --git a/config.toml b/config.toml index 37deb26..72ef033 100644 --- a/config.toml +++ b/config.toml @@ -1,9 +1,10 @@ -# Exposes Prometheus metrics [metrics] +# Exposes Prometheus metrics go_process = true # API server [api] +# Host and port address = ":8080" # Geth API endpoints diff --git a/docs/filters.md b/docs/filters.md new file mode 100644 index 0000000..1a4512b --- /dev/null +++ b/docs/filters.md @@ -0,0 +1,11 @@ +## Writing filters + +Filters must conform to the interface: + +```go +type Filter interface { + Execute(ctx context.Context, inputTransaction fetch.Transaction) (next bool, err error) +} +``` + +See examples in the `internal/filter` folder. diff --git a/docs/functionality.md b/docs/functionality.md new file mode 100644 index 0000000..30e6c8e --- /dev/null +++ b/docs/functionality.md @@ -0,0 +1,30 @@ +## Functionality + +### Head syncer + +Opens a websocket connection and processes live transactions. + +### Janitor + +Periodically checks for missed (and historical) blocks missed by the head syncer and queues them for processing. A gap range is processed twice to guarantee there is no missing block. + +### Pipeline + +Fetches a block and executes the filters in serial order for every transaction in the block before finally committing the block to the store. + +### Filter + +Processes a transaction and passes it on to the next filter or terminates the pipeline for that transaction if it is irrelevant. + +### Store schema + +- The `blocks` table keeps track of processed blocks. +- The `syncer_meta` table keeps track of the lower_bound cursor. Below the lower_bound cursor, all blocks are guarnteed to have been processsed hence it is safe to trim the `blocks` table below that pointer. + +### GraphQL + +- Fetches a block (and some of its header details), transactions and transaction receipts embedded within the transaction object in a single call. + +## Caveats + +- Blocks are not guaranteed to be processed in order, however a low concurrency setting would somewhat give an "in-order" behaviour (not to be relied upon in any case). diff --git a/docs/usage.md b/docs/usage.md new file mode 100644 index 0000000..f5d01cb --- /dev/null +++ b/docs/usage.md @@ -0,0 +1,36 @@ +## Requirements + +- Celo (geth) node with GraphQL enabled +- Postgres 14+ + +## Running + +### 1. Run migrations + +Run the migrations inside the `migrations` folder. + +### 2. Update the config + +The base config is described in `config.toml`. Values can be overriden with env variables e.g. to disable metrics, set `METRICS_GO_PROCESS=false`. + +### 3. Start the service: + +**Compiling**: + +- Requires CGO_ENABLED=1 +- Prebuilt binaries (for amd64 only) available on the releases page + +**Docker**: + +- `docker pull ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest` + +After compiling or within a Docker container: + +`$ ./cic-chain-events` + +Optional flags: + +- `-config` - `config.toml` file path +- `-debug` - Enable/disable debug level logs +- `-queries` - `queries.sql` file path + diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index a1dbd70..da87149 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -39,7 +39,7 @@ func NewPipeline(o PipelineOpts) *Pipeline { // 3. Commits the block to store as successfully processed // // Note: -// - Blocks are processed atomically, a failure inbetween will process the block from the start +// - Blocks are processed atomically, a failure in-between will process the block from the start // - Therefore, any side effect/event sink in the filter should support dedup func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { md.logg.Debug("pipeline: processing block", "block", blockNumber) @@ -63,7 +63,7 @@ func (md *Pipeline) Run(ctx context.Context, blockNumber uint64) error { if err := md.store.CommitBlock(ctx, blockNumber); err != nil { return err } - md.logg.Debug("pipeline: commited block", "block", blockNumber) + md.logg.Debug("pipeline: committed block", "block", blockNumber) return nil } diff --git a/internal/syncer/head.go b/internal/syncer/head.go index 6c86be9..dad70ae 100644 --- a/internal/syncer/head.go +++ b/internal/syncer/head.go @@ -41,7 +41,7 @@ func NewHeadSyncer(o HeadSyncerOpts) (*HeadSyncer, error) { }, nil } -// Start creates a websocket subscription and actively receives new blocks untill stopped +// Start creates a websocket subscription and actively receives new blocks until stopped // or a critical error occurs. func (hs *HeadSyncer) Start(ctx context.Context) error { headerReceiver := make(chan *types.Header, 1) diff --git a/migrations/001_init.sql b/migrations/001_init.sql index cd2ecd4..536f6f7 100644 --- a/migrations/001_init.sql +++ b/migrations/001_init.sql @@ -5,8 +5,3 @@ CREATE TABLE IF NOT EXISTS blocks ( CREATE TABLE syncer_meta ( lower_bound INT ); - ----- create above / drop below ---- - -DROP TABLE syncer_meta; -DROP TABLE blocks;