mirror of
				https://github.com/grassrootseconomics/cic-chain-events.git
				synced 2025-11-04 01:21:19 +01:00 
			
		
		
		
	pkg: change namespace
* remove unecessary filters
This commit is contained in:
		
							parent
							
								
									a323a93441
								
							
						
					
					
						commit
						c7300aada3
					
				
							
								
								
									
										4
									
								
								.github/workflows/release.yaml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/workflows/release.yaml
									
									
									
									
										vendored
									
									
								
							@ -53,8 +53,8 @@ jobs:
 | 
			
		||||
        cache-from: type=local,src=/tmp/.buildx-cache
 | 
			
		||||
        cache-to: type=local,dest=/tmp/.buildx-cache
 | 
			
		||||
        tags: |
 | 
			
		||||
          ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest
 | 
			
		||||
          ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:${{ env.RELEASE_TAG }}
 | 
			
		||||
          ghcr.io/inethi/inethi-cic-chain-events/cic-chain-events:latest
 | 
			
		||||
          ghcr.io/inethi/inethi-cic-chain-events/cic-chain-events:${{ env.RELEASE_TAG }}
 | 
			
		||||
 | 
			
		||||
  goreleaser:
 | 
			
		||||
    runs-on: ubuntu-latest
 | 
			
		||||
 | 
			
		||||
@ -1,8 +1,8 @@
 | 
			
		||||
# cic-chain-events
 | 
			
		||||
 | 
			
		||||

 | 
			
		||||

 | 
			
		||||
[](https://goreportcard.com/report/github.com/grassrootseconomics/cic-chain-events)
 | 
			
		||||

 | 
			
		||||

 | 
			
		||||
[](https://goreportcard.com/report/github.com/inethi/inethi-cic-chain-events)
 | 
			
		||||
 | 
			
		||||
> CIC Chain Events
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -2,7 +2,7 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/VictoriaMetrics/metrics"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/echopprof"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/echopprof"
 | 
			
		||||
	"github.com/labstack/echo/v4"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,71 +1,17 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/big"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch/module/eth"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch/w3types"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pub"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func initAddressFilter(celoProvider *celoutils.Provider, cache *sync.Map) filter.Filter {
 | 
			
		||||
	var (
 | 
			
		||||
		tokenIndexEntryCount big.Int
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	registryMap, err := celoProvider.RegistryMap(ctx, celoutils.HexToAddress(ko.MustString("chain.registry_address")))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lo.Fatal("init: critical error creating address filter", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for k, v := range registryMap {
 | 
			
		||||
		cache.Store(strings.ToLower(v.Hex()), k)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := celoProvider.Client.CallCtx(
 | 
			
		||||
		ctx,
 | 
			
		||||
		eth.CallFunc(w3.MustNewFunc("entryCount()", "uint256"), registryMap[celoutils.TokenIndex]).Returns(&tokenIndexEntryCount),
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		lo.Fatal("init: critical error creating address filter", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	calls := make([]w3types.Caller, tokenIndexEntryCount.Int64())
 | 
			
		||||
	tokenAddresses := make([]common.Address, tokenIndexEntryCount.Int64())
 | 
			
		||||
 | 
			
		||||
	entrySig := w3.MustNewFunc("entry(uint256 _idx)", "address")
 | 
			
		||||
 | 
			
		||||
	// TODO: There is a 5MB limit to a RPC batch call size.
 | 
			
		||||
	// Test if 10k entries will raise an error (future proofed for a lot of years)
 | 
			
		||||
	for i := 0; i < int(tokenIndexEntryCount.Int64()); i++ {
 | 
			
		||||
		calls[i] = eth.CallFunc(entrySig, registryMap[celoutils.TokenIndex], new(big.Int).SetInt64(int64(i))).Returns(&tokenAddresses[i])
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := celoProvider.Client.CallCtx(
 | 
			
		||||
		ctx,
 | 
			
		||||
		calls...,
 | 
			
		||||
	); err != nil {
 | 
			
		||||
		lo.Fatal("init: critical error creating address filter", "error", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, v := range tokenAddresses {
 | 
			
		||||
		cache.Store(strings.ToLower(v.Hex()), fmt.Sprintf("TOKEN_%d", i))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return filter.NewAddressFilter(filter.AddressFilterOpts{
 | 
			
		||||
		Cache: cache,
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
		Logg: lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -76,32 +22,3 @@ func initTransferFilter(pub *pub.Pub) filter.Filter {
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initGasGiftFilter(pub *pub.Pub) filter.Filter {
 | 
			
		||||
	return filter.NewGasFilter(filter.GasFilterOpts{
 | 
			
		||||
		Pub:  pub,
 | 
			
		||||
		Logg: lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initRegisterFilter(pub *pub.Pub) filter.Filter {
 | 
			
		||||
	return filter.NewRegisterFilter(filter.RegisterFilterOpts{
 | 
			
		||||
		Pub:  pub,
 | 
			
		||||
		Logg: lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initApproveFilter(pub *pub.Pub) filter.Filter {
 | 
			
		||||
	return filter.NewApproveFilter(filter.ApproveFilterOpts{
 | 
			
		||||
		Pub:  pub,
 | 
			
		||||
		Logg: lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initTokenIndexFilter(cache *sync.Map, pub *pub.Pub) filter.Filter {
 | 
			
		||||
	return filter.NewTokenIndexFilter(filter.TokenIndexFilterOpts{
 | 
			
		||||
		Cache: cache,
 | 
			
		||||
		Pub:   pub,
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -7,10 +7,10 @@ import (
 | 
			
		||||
 | 
			
		||||
	"github.com/alitto/pond"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pool"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pool"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/jackc/pgx/v5"
 | 
			
		||||
	"github.com/knadh/goyesql/v2"
 | 
			
		||||
	"github.com/knadh/koanf/parsers/toml"
 | 
			
		||||
 | 
			
		||||
@ -7,10 +7,10 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/syncer"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/syncer"
 | 
			
		||||
	"github.com/knadh/koanf/v2"
 | 
			
		||||
	"github.com/labstack/echo/v4"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
@ -62,11 +62,7 @@ func main() {
 | 
			
		||||
		BlockFetcher: graphqlFetcher,
 | 
			
		||||
		Filters: []filter.Filter{
 | 
			
		||||
			initAddressFilter(celoProvider, cache),
 | 
			
		||||
			initGasGiftFilter(jsPub),
 | 
			
		||||
			initTransferFilter(jsPub),
 | 
			
		||||
			initRegisterFilter(jsPub),
 | 
			
		||||
			initApproveFilter(jsPub),
 | 
			
		||||
			initTokenIndexFilter(cache, jsPub),
 | 
			
		||||
		},
 | 
			
		||||
		Logg:  lo,
 | 
			
		||||
		Store: pgStore,
 | 
			
		||||
 | 
			
		||||
@ -12,8 +12,7 @@ address = ":5001"
 | 
			
		||||
graphql_endpoint = ""
 | 
			
		||||
ws_endpoint      = ""
 | 
			
		||||
rpc_endpoint     = ""
 | 
			
		||||
testnet          = true
 | 
			
		||||
registry_address = ""
 | 
			
		||||
testnet          = false
 | 
			
		||||
 | 
			
		||||
# Syncer configs
 | 
			
		||||
[syncer]
 | 
			
		||||
 | 
			
		||||
@ -22,7 +22,7 @@ The base config is described in `config.toml`. Values can be overriden with env
 | 
			
		||||
 | 
			
		||||
**Compiling the binary**
 | 
			
		||||
 | 
			
		||||
Run `make build` or download pre-compiled binaries from the [releases](https://github.com/grassrootseconomics/cic-chain-events/releases) page.
 | 
			
		||||
Run `make build` or download pre-compiled binaries from the [releases](https://github.com/inethi/inethi-cic-chain-events/releases) page.
 | 
			
		||||
 | 
			
		||||
Then start the service with `./cic-chain-events`
 | 
			
		||||
 | 
			
		||||
@ -36,7 +36,7 @@ Optional flags:
 | 
			
		||||
 | 
			
		||||
To pull the pre-built docker image:
 | 
			
		||||
 | 
			
		||||
`docker pull ghcr.io/grassrootseconomics/cic-chain-events/cic-chain-events:latest`
 | 
			
		||||
`docker pull ghcr.io/inethi/inethi-cic-chain-events/cic-chain-events:latest`
 | 
			
		||||
 | 
			
		||||
Or to build it:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							@ -1,4 +1,4 @@
 | 
			
		||||
module github.com/grassrootseconomics/cic-chain-events
 | 
			
		||||
module github.com/inethi/inethi-cic-chain-events
 | 
			
		||||
 | 
			
		||||
go 1.20
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,7 @@ import (
 | 
			
		||||
	"net/http"
 | 
			
		||||
 | 
			
		||||
	"github.com/alitto/pond"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/syncer"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/syncer"
 | 
			
		||||
	"github.com/labstack/echo/v4"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -2,33 +2,33 @@ package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	KRNVoucherAddress = "0x8bab657c88eb3c724486d113e650d2c659aa23d2"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	AddressFilterOpts struct {
 | 
			
		||||
		Cache *sync.Map
 | 
			
		||||
		Logg  logf.Logger
 | 
			
		||||
		Logg logf.Logger
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	AddressFilter struct {
 | 
			
		||||
		cache *sync.Map
 | 
			
		||||
		logg  logf.Logger
 | 
			
		||||
		logg logf.Logger
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewAddressFilter(o AddressFilterOpts) Filter {
 | 
			
		||||
	return &AddressFilter{
 | 
			
		||||
		cache: o.Cache,
 | 
			
		||||
		logg:  o.Logg,
 | 
			
		||||
		logg: o.Logg,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *AddressFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	if _, found := f.cache.Load(transaction.To.Address); found {
 | 
			
		||||
	if transaction.To.Address == KRNVoucherAddress {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,69 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/stretchr/testify/suite"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AddressFilterSuite struct {
 | 
			
		||||
	suite.Suite
 | 
			
		||||
	filter Filter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *AddressFilterSuite) SetupSuite() {
 | 
			
		||||
	addressCache := &sync.Map{}
 | 
			
		||||
 | 
			
		||||
	addressCache.Store("0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f", "")
 | 
			
		||||
 | 
			
		||||
	s.filter = NewAddressFilter(AddressFilterOpts{
 | 
			
		||||
		Cache: addressCache,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *AddressFilterSuite) TestAddresses() {
 | 
			
		||||
	type testCase struct {
 | 
			
		||||
		transactionData fetch.Transaction
 | 
			
		||||
		want            bool
 | 
			
		||||
		wantErr         bool
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Generated with eth-encode
 | 
			
		||||
	tests := []testCase{
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				To: struct {
 | 
			
		||||
					Address string "json:\"address\""
 | 
			
		||||
				}{
 | 
			
		||||
					Address: "0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9f",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			want:    true,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				To: struct {
 | 
			
		||||
					Address string "json:\"address\""
 | 
			
		||||
				}{
 | 
			
		||||
					Address: "0x6914ba1c49d3c3f32a9e65a0661d7656cb292e9x",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			want:    false,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), &test.transactionData)
 | 
			
		||||
		s.NoError(err)
 | 
			
		||||
		s.Equal(test.want, next)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAddressFilterSuite(t *testing.T) {
 | 
			
		||||
	suite.Run(t, new(AddressFilterSuite))
 | 
			
		||||
}
 | 
			
		||||
@ -1,85 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"math/big"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	ApproveFilterOpts struct {
 | 
			
		||||
		Logg logf.Logger
 | 
			
		||||
		Pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ApproveFilter struct {
 | 
			
		||||
		logg logf.Logger
 | 
			
		||||
		pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	approveEventSubject = "CHAIN.approve"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	approveSig = w3.MustNewFunc("approve(address, uint256)", "bool")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewApproveFilter(o ApproveFilterOpts) Filter {
 | 
			
		||||
	return &ApproveFilter{
 | 
			
		||||
		logg: o.Logg,
 | 
			
		||||
		pub:  o.Pub,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *ApproveFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if transaction.InputData[:10] == "0x095ea7b3" {
 | 
			
		||||
		var (
 | 
			
		||||
			address common.Address
 | 
			
		||||
			value   big.Int
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		if err := approveSig.DecodeArgs(w3.B(transaction.InputData), &address, &value); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		approveEvent := &pub.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
 | 
			
		||||
			Timestamp:       hexutil.MustDecodeUint64(transaction.Block.Timestamp),
 | 
			
		||||
			From:            celoutils.ChecksumAddress(transaction.From.Address),
 | 
			
		||||
			To:              address.Hex(),
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			TXType:          "approve",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			approveEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.pub.Publish(
 | 
			
		||||
			approveEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			approveEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
@ -3,7 +3,7 @@ package filter
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/fetch"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Filter defines a read only filter which must return next as true/false or an error
 | 
			
		||||
 | 
			
		||||
@ -1,80 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	gasEventSubject = "CHAIN.gas"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	giveToSig = w3.MustNewFunc("giveTo(address)", "uint256")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	GasFilterOpts struct {
 | 
			
		||||
		Logg logf.Logger
 | 
			
		||||
		Pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	GasFilter struct {
 | 
			
		||||
		logg logf.Logger
 | 
			
		||||
		pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewGasFilter(o GasFilterOpts) Filter {
 | 
			
		||||
	return &GasFilter{
 | 
			
		||||
		logg: o.Logg,
 | 
			
		||||
		pub:  o.Pub,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *GasFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if transaction.InputData[:10] == "0x63e4bff4" {
 | 
			
		||||
		var address common.Address
 | 
			
		||||
 | 
			
		||||
		if err := giveToSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		giveToEvent := &pub.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
 | 
			
		||||
			Timestamp:       hexutil.MustDecodeUint64(transaction.Block.Timestamp),
 | 
			
		||||
			To:              address.Hex(),
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			TXType:          "gas",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			giveToEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.pub.Publish(
 | 
			
		||||
			gasEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			giveToEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
@ -1,80 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	registerEventSubject = "CHAIN.register"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	registerSig = w3.MustNewFunc("register(address)", "")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	RegisterFilterOpts struct {
 | 
			
		||||
		Logg logf.Logger
 | 
			
		||||
		Pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	RegisterFilter struct {
 | 
			
		||||
		logg logf.Logger
 | 
			
		||||
		pub  *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewRegisterFilter(o RegisterFilterOpts) Filter {
 | 
			
		||||
	return &RegisterFilter{
 | 
			
		||||
		logg: o.Logg,
 | 
			
		||||
		pub:  o.Pub,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *RegisterFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if transaction.InputData[:10] == "0x4420e486" {
 | 
			
		||||
		var address common.Address
 | 
			
		||||
 | 
			
		||||
		if err := registerSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		addEvent := &pub.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
 | 
			
		||||
			Timestamp:       hexutil.MustDecodeUint64(transaction.Block.Timestamp),
 | 
			
		||||
			To:              address.Hex(),
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			TXType:          "register",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			addEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.pub.Publish(
 | 
			
		||||
			registerEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			addEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
@ -1,84 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	tokenIndexFilterEventSubject = "CHAIN.tokenAdded"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	addSig = w3.MustNewFunc("add(address)", "bool")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type (
 | 
			
		||||
	TokenIndexFilterOpts struct {
 | 
			
		||||
		Cache *sync.Map
 | 
			
		||||
		Logg  logf.Logger
 | 
			
		||||
		Pub   *pub.Pub
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	TokenIndexFilter struct {
 | 
			
		||||
		pub   *pub.Pub
 | 
			
		||||
		cache *sync.Map
 | 
			
		||||
		logg  logf.Logger
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewTokenIndexFilter(o TokenIndexFilterOpts) Filter {
 | 
			
		||||
	return &TokenIndexFilter{
 | 
			
		||||
		cache: o.Cache,
 | 
			
		||||
		logg:  o.Logg,
 | 
			
		||||
		pub:   o.Pub,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *TokenIndexFilter) Execute(_ context.Context, transaction *fetch.Transaction) (bool, error) {
 | 
			
		||||
	if len(transaction.InputData) < 10 {
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if transaction.InputData[:10] == "0x0a3b0a4f" {
 | 
			
		||||
		var address common.Address
 | 
			
		||||
 | 
			
		||||
		if err := addSig.DecodeArgs(w3.B(transaction.InputData), &address); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		f.cache.Store(strings.ToLower(address.Hex()), transaction.Hash)
 | 
			
		||||
 | 
			
		||||
		addEvent := &pub.MinimalTxInfo{
 | 
			
		||||
			Block:           transaction.Block.Number,
 | 
			
		||||
			ContractAddress: celoutils.ChecksumAddress(transaction.To.Address),
 | 
			
		||||
			Timestamp:       hexutil.MustDecodeUint64(transaction.Block.Timestamp),
 | 
			
		||||
			To:              address.Hex(),
 | 
			
		||||
			TxHash:          transaction.Hash,
 | 
			
		||||
			TxIndex:         transaction.Index,
 | 
			
		||||
			TXType:          "tokenAdd",
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if transaction.Status == 1 {
 | 
			
		||||
			addEvent.Success = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := f.pub.Publish(
 | 
			
		||||
			tokenIndexFilterEventSubject,
 | 
			
		||||
			transaction.Hash,
 | 
			
		||||
			addEvent,
 | 
			
		||||
		); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
@ -7,9 +7,9 @@ import (
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/common/hexutil"
 | 
			
		||||
	"github.com/grassrootseconomics/celoutils"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/grassrootseconomics/w3-celo-patch"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pub"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,77 +0,0 @@
 | 
			
		||||
package filter
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"testing"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/stretchr/testify/suite"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type TransferFilterSuite struct {
 | 
			
		||||
	suite.Suite
 | 
			
		||||
	filter Filter
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *TransferFilterSuite) SetupSuite() {
 | 
			
		||||
	logg := logf.New(
 | 
			
		||||
		logf.Opts{
 | 
			
		||||
			Level: logf.DebugLevel,
 | 
			
		||||
		},
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	s.filter = NewTransferFilter(TransferFilterOpts{
 | 
			
		||||
		Logg: logg,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *TransferFilterSuite) TestTranfserInputs() {
 | 
			
		||||
	type testCase struct {
 | 
			
		||||
		transactionData fetch.Transaction
 | 
			
		||||
		want            bool
 | 
			
		||||
		wantErr         bool
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Generated with eth-encode
 | 
			
		||||
	tests := []testCase{
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				InputData: "0xa9059cbb000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
 | 
			
		||||
			},
 | 
			
		||||
			want:    true,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				InputData: "0x23b872dd000000000000000000000000000000000000000000000000000000000000dEaD000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000003e8",
 | 
			
		||||
			},
 | 
			
		||||
			want:    true,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				InputData: "0x449a52f8000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
 | 
			
		||||
			},
 | 
			
		||||
			want:    true,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			transactionData: fetch.Transaction{
 | 
			
		||||
				InputData: "0x8d72ec9d000000000000000000000000000000000000000000000000000000000000dEaD00000000000000000000000000000000000000000000000000000000000003e8",
 | 
			
		||||
			},
 | 
			
		||||
			want:    false,
 | 
			
		||||
			wantErr: false,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, test := range tests {
 | 
			
		||||
		next, err := s.filter.Execute(context.Background(), &test.transactionData)
 | 
			
		||||
		s.NoError(err)
 | 
			
		||||
		s.Equal(test.want, next)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestTransferFilterSuite(t *testing.T) {
 | 
			
		||||
	suite.Run(t, new(TransferFilterSuite))
 | 
			
		||||
}
 | 
			
		||||
@ -3,9 +3,9 @@ package pipeline
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/filter"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/pkg/fetch"
 | 
			
		||||
	"github.com/jackc/pgx/v5"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@ -8,12 +8,12 @@ import (
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/core/types"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/ethclient"
 | 
			
		||||
	"github.com/celo-org/celo-blockchain/event"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	jobTimeout         = 5 * time.Second
 | 
			
		||||
	jobTimeout         = 15 * time.Second
 | 
			
		||||
	resubscribeBackoff = 2 * time.Second
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -5,8 +5,8 @@ import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/alitto/pond"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/grassrootseconomics/cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/pipeline"
 | 
			
		||||
	"github.com/inethi/inethi-cic-chain-events/internal/store"
 | 
			
		||||
	"github.com/jackc/pgx/v5"
 | 
			
		||||
	"github.com/zerodha/logf"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user