diff --git a/cmd/main.go b/cmd/main.go index dead575..7deede9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,10 +9,15 @@ import ( "git.defalsify.org/vise.git/db/mem" + "git.grassecon.net/urdt/ussd/initializers" "git.grassecon.net/term/config" "git.grassecon.net/term/event/nats" ) +func init() { + initializers.LoadEnvVariables() +} + func main() { ctx := context.Background() db := mem.NewMemDb() diff --git a/common/hex.go b/common/hex.go deleted file mode 100644 index f5aa7ed..0000000 --- a/common/hex.go +++ /dev/null @@ -1,18 +0,0 @@ -package common - -import ( - "encoding/hex" -) - -func NormalizeHex(s string) (string, error) { - if len(s) >= 2 { - if s[:2] == "0x" { - s = s[2:] - } - } - r, err := hex.DecodeString(s) - if err != nil { - return "", err - } - return hex.EncodeToString(r), nil -} diff --git a/config/config.go b/config/config.go index 88f9f33..0ef516b 100644 --- a/config/config.go +++ b/config/config.go @@ -10,9 +10,6 @@ var ( JetstreamClientName string ) -func init() { - initializers.LoadEnvVariables() -} func LoadConfig() { urdtconfig.LoadConfig() diff --git a/event/custodial_registration.go b/event/custodial_registration.go index 0760158..ba5cca2 100644 --- a/event/custodial_registration.go +++ b/event/custodial_registration.go @@ -13,10 +13,12 @@ const ( evReg = "CUSTODIAL_REGISTRATION" ) +// fields used for handling custodial registration event. type eventCustodialRegistration struct { Account string } +// attempt to coerce event as custodial registration. func asCustodialRegistrationEvent(gev *geEvent.Event) (*eventCustodialRegistration, bool) { var ok bool var ev eventCustodialRegistration @@ -32,6 +34,7 @@ func asCustodialRegistrationEvent(gev *geEvent.Event) (*eventCustodialRegistrati return &ev, true } +// handle custodial registration. func handleCustodialRegistration(ctx context.Context, store *common.UserDataStore, ev *eventCustodialRegistration) error { identity, err := lookup.IdentityFromAddress(ctx, store, ev.Account) if err != nil { diff --git a/event/nats/nats.go b/event/nats/nats.go index af8f64a..223b8da 100644 --- a/event/nats/nats.go +++ b/event/nats/nats.go @@ -19,6 +19,9 @@ var ( logg = logging.NewVanilla().WithDomain("term-nats") ) +// NatsSubscription encapsulates the jetstream session providing events. +// +// Extends Router. type NatsSubscription struct { event.Router ctx context.Context @@ -28,6 +31,7 @@ type NatsSubscription struct { cctx jetstream.ConsumeContext } +// NewNatsSubscription creates a new NatsSubscription with the given user store. func NewNatsSubscription(store db.Db) *NatsSubscription { return &NatsSubscription{ Router: event.Router{ @@ -38,19 +42,12 @@ func NewNatsSubscription(store db.Db) *NatsSubscription { } } -func toServerInfo(conn *nats.Conn) string { - return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion()) -} - -func disconnectHandler(conn *nats.Conn, err error) { - logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err) -} - -func reconnectHandler(conn *nats.Conn) { - serverInfo := toServerInfo(conn) - logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo) -} - +// Connect sets up the connection to the nats server and a consumer for the +// "Jetstream". +// +// Fails if connection fails or the "Jetstream" consumer cannot be set up. +// +// Once connected, it will attempt to reconnect if disconnected. func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error { var err error @@ -84,6 +81,7 @@ func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error { return nil } +// Close cleanly brings down the nats and jetstream connection. func(n *NatsSubscription) Close() error { n.cctx.Stop() select { @@ -93,13 +91,7 @@ func(n *NatsSubscription) Close() error { return nil } -func fail(m jetstream.Msg) { - err := m.Nak() - if err != nil { - logg.Errorf("nats nak fail", "err", err) - } -} - +// jetstream message handler and acknowledger. func(n *NatsSubscription) handleEvent(m jetstream.Msg) { var ev geEvent.Event @@ -123,3 +115,27 @@ func(n *NatsSubscription) handleEvent(m jetstream.Msg) { } logg.DebugCtxf(n.ctx, "handle msg complete") } + +// used if message should be retried. +func fail(m jetstream.Msg) { + err := m.Nak() + if err != nil { + logg.Errorf("nats nak fail", "err", err) + } +} + +// server info string for debug. +func toServerInfo(conn *nats.Conn) string { + return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion()) +} + +// on nats disconnection. +func disconnectHandler(conn *nats.Conn, err error) { + logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err) +} + +// on nats reconnection. +func reconnectHandler(conn *nats.Conn) { + serverInfo := toServerInfo(conn) + logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo) +} diff --git a/event/route.go b/event/route.go index 54a1c34..2266388 100644 --- a/event/route.go +++ b/event/route.go @@ -10,19 +10,21 @@ import ( "git.grassecon.net/urdt/ussd/common" ) -// TODO: this vocabulary should be public in and provided by the eth-tracker repo -const ( - evGive = "FAUCET_GIVE" -) - var ( logg = logging.NewVanilla().WithDomain("term-event") ) +// Router is responsible for invoking handlers corresponding to events. type Router struct { + // User data store abstraction over application data. Store *common.UserDataStore } +// Route parses an event from the event stream, and resolves the handler +// corresponding to the event. +// +// An error will be returned if no handler can be found, or if the resolved +// handler fails to successfully execute. func(r *Router) Route(ctx context.Context, gev *geEvent.Event) error { logg.DebugCtxf(ctx, "have event", "ev", gev) evCC, ok := asCustodialRegistrationEvent(gev) diff --git a/event/sub.go b/event/sub.go deleted file mode 100644 index e754ac6..0000000 --- a/event/sub.go +++ /dev/null @@ -1,12 +0,0 @@ -package event - -import ( - "context" - "io" -) - -type Subscription interface { - io.Closer - Connect(ctx context.Context, connStr string) error - Next() error -} diff --git a/event/token.go b/event/token.go index 2113261..9b94054 100644 --- a/event/token.go +++ b/event/token.go @@ -16,14 +16,11 @@ import ( const ( evTokenTransfer = "TOKEN_TRANSFER" - // TODO: use export from urdt storage + // TODO: export from urdt storage package DATATYPE_USERSUB = 64 ) -func renderTx() { - -} - +// fields used for handling token transfer event. type eventTokenTransfer struct { From string To string @@ -32,10 +29,14 @@ type eventTokenTransfer struct { VoucherAddress string } +// formatter for transaction data +// +// TODO: current formatting is a placeholder. func formatTransaction(idx int, tx dataserviceapi.Last10TxResponse) string { return fmt.Sprintf("%d %s %s", idx, tx.DateBlock, tx.TxHash[:10]) } +// refresh and store transaction history. func updateTokenTransferList(ctx context.Context, store *common.UserDataStore, identity lookup.Identity) error { var r []string @@ -53,6 +54,9 @@ func updateTokenTransferList(ctx context.Context, store *common.UserDataStore, i return store.WriteEntry(ctx, identity.SessionId, common.DATA_TRANSACTIONS, []byte(s)) } +// refresh and store token list. +// +// TODO: when subprefixdb has been exported, can use function in ...urdt/ussd/common/ instead func updateTokenList(ctx context.Context, store *common.UserDataStore, identity lookup.Identity) error { holdings, err := lookup.Api.FetchVouchers(ctx, identity.ChecksumAddress) if err != nil { @@ -61,7 +65,6 @@ func updateTokenList(ctx context.Context, store *common.UserDataStore, identity metadata := common.ProcessVouchers(holdings) _ = metadata - // TODO: export subprefixdb and use that instead // TODO: make sure subprefixdb is thread safe when using gdbm // TODO: why is address session here unless explicitly set store.Db.SetSession(identity.SessionId) @@ -94,6 +97,7 @@ func updateTokenList(ctx context.Context, store *common.UserDataStore, identity return nil } +// set default token to given symbol. func updateDefaultToken(ctx context.Context, store *common.UserDataStore, identity lookup.Identity, activeSym string) error { pfxDb := common.StoreToPrefixDb(store, []byte("vouchers")) // TODO: the activeSym input should instead be newline separated list? @@ -101,14 +105,15 @@ func updateDefaultToken(ctx context.Context, store *common.UserDataStore, identi if err != nil { return err } - logg.TraceCtxf(ctx, "tokendaa", "d", tokenData) return common.UpdateVoucherData(ctx, store, identity.SessionId, tokenData) } +// waiter to check whether object is available on dependency endpoints. func updateWait(ctx context.Context) error { return nil } +// use api to resolve address to token symbol. func toSym(ctx context.Context, address string) ([]byte, error) { voucherData, err := lookup.Api.VoucherData(ctx, address) if err != nil { @@ -117,6 +122,7 @@ func toSym(ctx context.Context, address string) ([]byte, error) { return []byte(voucherData.TokenSymbol), nil } +// execute all func updateToken(ctx context.Context, store *common.UserDataStore, identity lookup.Identity, tokenAddress string) error { err := updateTokenList(ctx, store, identity) if err != nil { @@ -152,6 +158,7 @@ func updateToken(ctx context.Context, store *common.UserDataStore, identity look return nil } +// attempt to coerce event as token transfer event. func asTokenTransferEvent(gev *geEvent.Event) (*eventTokenTransfer, bool) { var err error var ok bool @@ -162,7 +169,7 @@ func asTokenTransferEvent(gev *geEvent.Event) (*eventTokenTransfer, bool) { } pl := gev.Payload - // assuming from and to are checksum addresses + // we are assuming from and to are checksum addresses ev.From, ok = pl["from"].(string) if !ok { return nil, false @@ -192,6 +199,9 @@ func asTokenTransferEvent(gev *geEvent.Event) (*eventTokenTransfer, bool) { return &ev, true } +// handle token transfer. +// +// if from and to are NOT the same, handle code will be executed once for each side of the transfer. func handleTokenTransfer(ctx context.Context, store *common.UserDataStore, ev *eventTokenTransfer) error { identity, err := lookup.IdentityFromAddress(ctx, store, ev.From) if err != nil { @@ -204,15 +214,18 @@ func handleTokenTransfer(ctx context.Context, store *common.UserDataStore, ev *e return err } } - identity, err = lookup.IdentityFromAddress(ctx, store, ev.To) - if err != nil { - if !db.IsNotFound(err) { - return err - } - } else { - err = updateToken(ctx, store, identity, ev.VoucherAddress) + + if strings.Compare(ev.To, ev.From) { + identity, err = lookup.IdentityFromAddress(ctx, store, ev.To) if err != nil { - return err + if !db.IsNotFound(err) { + return err + } + } else { + err = updateToken(ctx, store, identity, ev.VoucherAddress) + if err != nil { + return err + } } } diff --git a/handler/error.go b/handler/error.go deleted file mode 100644 index 66d96e5..0000000 --- a/handler/error.go +++ /dev/null @@ -1,9 +0,0 @@ -package handler - -import ( - "fmt" -) - -var ( - ErrInvalidPayload = fmt.Errorf("Invalid event payload") -) diff --git a/lookup/db.go b/lookup/db.go index 9ddf34f..3a1dc63 100644 --- a/lookup/db.go +++ b/lookup/db.go @@ -12,12 +12,19 @@ var ( logg = logging.NewVanilla().WithDomain("term-lookup") ) +// Identity contains all flavors of identifiers used across stream, api and +// client for a single agent. type Identity struct { NormalAddress string ChecksumAddress string SessionId string } +// IdentityFromAddress fully populates and Identity object from a given +// checksum address. +// +// It is the caller's responsibility to ensure that a valid checksum address +// is passed. func IdentityFromAddress(ctx context.Context, store *common.UserDataStore, address string) (Identity, error) { var err error var ident Identity @@ -34,6 +41,7 @@ func IdentityFromAddress(ctx context.Context, store *common.UserDataStore, addre return ident, nil } +// load matching session from address from db store. func getSessionIdByAddress(ctx context.Context, store *common.UserDataStore, address string) (string, error) { // TODO: replace with userdatastore when double sessionid issue fixed //r, err := store.ReadEntry(ctx, address, common.DATA_PUBLIC_KEY_REVERSE) diff --git a/lookup/token.go b/lookup/token.go index f4b26d4..0172fac 100644 --- a/lookup/token.go +++ b/lookup/token.go @@ -5,5 +5,6 @@ import ( ) var ( + // Api provides the api implementation for all external lookups. Api remote.AccountServiceInterface = &remote.AccountService{} )