Add documentation, remove unused files

This commit is contained in:
lash
2024-11-03 19:04:44 +00:00
parent 4491f0155f
commit e420231f10
11 changed files with 89 additions and 83 deletions

View File

@@ -19,6 +19,9 @@ var (
logg = logging.NewVanilla().WithDomain("term-nats")
)
// NatsSubscription encapsulates the jetstream session providing events.
//
// Extends Router.
type NatsSubscription struct {
event.Router
ctx context.Context
@@ -28,6 +31,7 @@ type NatsSubscription struct {
cctx jetstream.ConsumeContext
}
// NewNatsSubscription creates a new NatsSubscription with the given user store.
func NewNatsSubscription(store db.Db) *NatsSubscription {
return &NatsSubscription{
Router: event.Router{
@@ -38,19 +42,12 @@ func NewNatsSubscription(store db.Db) *NatsSubscription {
}
}
func toServerInfo(conn *nats.Conn) string {
return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion())
}
func disconnectHandler(conn *nats.Conn, err error) {
logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err)
}
func reconnectHandler(conn *nats.Conn) {
serverInfo := toServerInfo(conn)
logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo)
}
// Connect sets up the connection to the nats server and a consumer for the
// "Jetstream".
//
// Fails if connection fails or the "Jetstream" consumer cannot be set up.
//
// Once connected, it will attempt to reconnect if disconnected.
func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
var err error
@@ -84,6 +81,7 @@ func(n *NatsSubscription) Connect(ctx context.Context, connStr string) error {
return nil
}
// Close cleanly brings down the nats and jetstream connection.
func(n *NatsSubscription) Close() error {
n.cctx.Stop()
select {
@@ -93,13 +91,7 @@ func(n *NatsSubscription) Close() error {
return nil
}
func fail(m jetstream.Msg) {
err := m.Nak()
if err != nil {
logg.Errorf("nats nak fail", "err", err)
}
}
// jetstream message handler and acknowledger.
func(n *NatsSubscription) handleEvent(m jetstream.Msg) {
var ev geEvent.Event
@@ -123,3 +115,27 @@ func(n *NatsSubscription) handleEvent(m jetstream.Msg) {
}
logg.DebugCtxf(n.ctx, "handle msg complete")
}
// used if message should be retried.
func fail(m jetstream.Msg) {
err := m.Nak()
if err != nil {
logg.Errorf("nats nak fail", "err", err)
}
}
// server info string for debug.
func toServerInfo(conn *nats.Conn) string {
return fmt.Sprintf("%s@%s (v%s)", conn.ConnectedServerName(), conn.ConnectedUrlRedacted(), conn.ConnectedServerVersion())
}
// on nats disconnection.
func disconnectHandler(conn *nats.Conn, err error) {
logg.Errorf("nats disconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "err", err)
}
// on nats reconnection.
func reconnectHandler(conn *nats.Conn) {
serverInfo := toServerInfo(conn)
logg.Errorf("nats reconnected", "status", conn.Status(), "reconnects", conn.Stats().Reconnects, "server", serverInfo)
}