From 0aa1db902ea20424a655d9f86094bbd29009a4c4 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Wed, 17 Apr 2024 13:36:26 +0800 Subject: [PATCH] feat: add cache implementation --- go.mod | 1 + go.sum | 2 ++ internal/cache/cache.go | 38 +++++++++++++++++++++ internal/cache/map.go | 39 ++++++++++++++++++++++ internal/emitter/{stdout.go => console.go} | 10 ++++-- internal/emitter/emitter.go | 17 ++++++---- internal/handler/handler.go | 5 ++- internal/handler/transfer.go | 5 +-- internal/processor/block.go | 4 ++- 9 files changed, 107 insertions(+), 14 deletions(-) create mode 100644 internal/cache/cache.go create mode 100644 internal/cache/map.go rename internal/emitter/{stdout.go => console.go} (57%) diff --git a/go.mod b/go.mod index 117ff27..3cb4c44 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/tsdb v0.7.1 // indirect + github.com/puzpuzpuz/xsync/v3 v3.1.0 // indirect github.com/rivo/uniseg v0.4.2 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect diff --git a/go.sum b/go.sum index a4bccec..a5cf641 100644 --- a/go.sum +++ b/go.sum @@ -435,6 +435,8 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/puzpuzpuz/xsync/v3 v3.1.0 h1:EewKT7/LNac5SLiEblJeUu8z5eERHrmRLnMQL2d7qX4= +github.com/puzpuzpuz/xsync/v3 v3.1.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8= diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..6788572 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,38 @@ +package cache + +import ( + "log/slog" + + "github.com/grassrootseconomics/celo-tracker/internal/chain" +) + +type ( + Cache interface { + Purge() error + Exists(string) bool + Add(string) bool + Size() int + } + + CacheOpts struct { + Logg *slog.Logger + Chain *chain.Chain + CacheType string + } +) + +func New(o CacheOpts) Cache { + var ( + cache Cache + ) + + switch o.CacheType { + case "map": + cache = NewMapCache() + default: + cache = NewMapCache() + } + o.Logg.Debug("bootstrapping cache") + + return cache +} diff --git a/internal/cache/map.go b/internal/cache/map.go new file mode 100644 index 0000000..5613c13 --- /dev/null +++ b/internal/cache/map.go @@ -0,0 +1,39 @@ +package cache + +import ( + "log/slog" + + "github.com/puzpuzpuz/xsync/v3" +) + +type ( + MapCache struct { + mapCache *xsync.Map + logg *slog.Logger + } +) + +func NewMapCache() *MapCache { + return &MapCache{ + mapCache: xsync.NewMap(), + } +} + +func (c *MapCache) Purge() error { + c.mapCache.Clear() + return nil +} + +func (c *MapCache) Exists(key string) bool { + _, ok := c.mapCache.Load(key) + return ok +} + +func (c *MapCache) Add(key string) bool { + c.mapCache.Store(key, nil) + return true +} + +func (c *MapCache) Size() int { + return c.mapCache.Size() +} diff --git a/internal/emitter/stdout.go b/internal/emitter/console.go similarity index 57% rename from internal/emitter/stdout.go rename to internal/emitter/console.go index a4f4316..9666326 100644 --- a/internal/emitter/stdout.go +++ b/internal/emitter/console.go @@ -7,12 +7,18 @@ import ( ) type ( - LogEmitter struct { + ConsoleEmitter struct { logg *slog.Logger } ) -func (l *LogEmitter) Emit(_ context.Context, payload []byte) error { +func NewConsoleEmitter(logg *slog.Logger) *ConsoleEmitter { + return &ConsoleEmitter{ + logg: logg, + } +} + +func (l *ConsoleEmitter) Emit(_ context.Context, payload []byte) error { var event map[string]interface{} if err := json.Unmarshal(payload, &event); err != nil { diff --git a/internal/emitter/emitter.go b/internal/emitter/emitter.go index e53ba09..47786f4 100644 --- a/internal/emitter/emitter.go +++ b/internal/emitter/emitter.go @@ -1,15 +1,20 @@ package emitter import ( + "context" "log/slog" - - "github.com/grassrootseconomics/celo-tracker/internal/handler" ) -func New(logg *slog.Logger) handler.EmitterEmitFunc { - stdOutEmitter := &LogEmitter{ - logg: logg, +type ( + Emitter interface { + Emit(context.Context, []byte) error } - return stdOutEmitter.Emit + EmitterOpts struct { + Logg *slog.Logger + } +) + +func New(o EmitterOpts) Emitter { + return NewConsoleEmitter(o.Logg) } diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 02b8138..23c0fc5 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -4,14 +4,13 @@ import ( "context" "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/w3-celo" ) type ( - EmitterEmitFunc func(context.Context, []byte) error - Handler interface { - Handle(context.Context, *types.Log, EmitterEmitFunc) error + Handle(context.Context, *types.Log, emitter.Emitter) error } ) diff --git a/internal/handler/transfer.go b/internal/handler/transfer.go index 6cf72df..ffe571a 100644 --- a/internal/handler/transfer.go +++ b/internal/handler/transfer.go @@ -7,6 +7,7 @@ import ( "github.com/celo-org/celo-blockchain/common" "github.com/celo-org/celo-blockchain/core/types" + "github.com/grassrootseconomics/celo-tracker/internal/emitter" "github.com/grassrootseconomics/w3-celo" ) @@ -24,7 +25,7 @@ type ( } ) -func (h *TransferHandler) Handle(ctx context.Context, log *types.Log, emitFn EmitterEmitFunc) error { +func (h *TransferHandler) Handle(ctx context.Context, log *types.Log, emitter emitter.Emitter) error { if log.Topics[0] == h.topicHash { var ( from common.Address @@ -48,7 +49,7 @@ func (h *TransferHandler) Handle(ctx context.Context, log *types.Log, emitFn Emi return err } - return emitFn(ctx, jsonData) + return emitter.Emit(ctx, jsonData) } return nil diff --git a/internal/processor/block.go b/internal/processor/block.go index 8d6f561..d16061b 100644 --- a/internal/processor/block.go +++ b/internal/processor/block.go @@ -46,7 +46,9 @@ func (p *Processor) processBlock(ctx context.Context, block types.Block) error { } func (p *Processor) handleLogs(ctx context.Context, log *types.Log) error { - defaultEmitter := emitter.New(p.logg) + defaultEmitter := emitter.New(emitter.EmitterOpts{ + Logg: p.logg, + }) for _, handler := range p.handlers { if err := handler.Handle(ctx, log, defaultEmitter); err != nil {