Enable CLI driver of async session #49

Merged
lash merged 14 commits from lash/async-driver into master 2024-09-14 16:52:07 +02:00
4 changed files with 165 additions and 67 deletions
Showing only changes of commit dde9f552a6 - Show all commits

View File

@ -112,6 +112,7 @@ func getResource(resourceDir string, ctx context.Context) (resource.Resource, er
return rfs, nil return rfs, nil
} }
func main() { func main() {
var dbDir string var dbDir string
var resourceDir string var resourceDir string

View File

@ -0,0 +1,47 @@
package handlers
import (
"context"
"errors"
"io"
"git.defalsify.org/vise.git/engine"
"git.defalsify.org/vise.git/resource"
"git.defalsify.org/vise.git/persist"
"git.grassecon.net/urdt/ussd/internal/storage"
)
var (
ErrInvalidRequest = errors.New("invalid request for context")
ErrSessionMissing = errors.New("missing session")
ErrInvalidInput = errors.New("invalid input")
ErrStorage = errors.New("storage retrieval fail")
ErrEngineType = errors.New("incompatible engine")
ErrEngineInit = errors.New("engine init fail")
ErrEngineExec = errors.New("engine exec fail")
)
type RequestSession struct {
Ctx context.Context
Config engine.Config
Engine engine.Engine
Input []byte
Storage storage.Storage
Writer io.Writer
}
type engineMaker func(cfg engine.Config, rs resource.Resource, pr *persist.Persister) engine.Engine
type RequestParser interface {
GetSessionId(rq any) (string, error)
GetInput(rq any) ([]byte, error)
}
type RequestHandler interface {
GetEngine(cfg engine.Config, rs resource.Resource, pe *persist.Persister) engine.Engine
Process(rs RequestSession) (RequestSession, error)
Output(rs RequestSession) (RequestSession, error)
Reset(rs RequestSession) (RequestSession, error)
ShutDown()
}

View File

@ -1,9 +1,9 @@
package http package http
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strconv"
"git.defalsify.org/vise.git/db" "git.defalsify.org/vise.git/db"
"git.defalsify.org/vise.git/engine" "git.defalsify.org/vise.git/engine"
@ -11,32 +11,38 @@ import (
"git.defalsify.org/vise.git/persist" "git.defalsify.org/vise.git/persist"
"git.defalsify.org/vise.git/resource" "git.defalsify.org/vise.git/resource"
"git.grassecon.net/urdt/ussd/internal/handlers"
"git.grassecon.net/urdt/ussd/internal/handlers/ussd" "git.grassecon.net/urdt/ussd/internal/handlers/ussd"
"git.grassecon.net/urdt/ussd/internal/storage"
) )
var ( var (
logg = logging.NewVanilla().WithDomain("httpserver") logg = logging.NewVanilla().WithDomain("httpserver")
) )
type RequestParser interface {
GetSessionId(rq *http.Request) (string, error)
GetInput(rq *http.Request) ([]byte, error)
}
type DefaultRequestParser struct { type DefaultRequestParser struct {
} }
func(rp *DefaultRequestParser) GetSessionId(rq *http.Request) (string, error) {
v := rq.Header.Get("X-Vise-Session") func(rp *DefaultRequestParser) GetSessionId(rq any) (string, error) {
rqv, ok := rq.(*http.Request)
if !ok {
return "", handlers.ErrInvalidRequest
}
v := rqv.Header.Get("X-Vise-Session")
if v == "" { if v == "" {
return "", fmt.Errorf("no session found") return "", handlers.ErrSessionMissing
} }
return v, nil return v, nil
} }
func(rp *DefaultRequestParser) GetInput(rq *http.Request) ([]byte, error) { func(rp *DefaultRequestParser) GetInput(rq any) ([]byte, error) {
defer rq.Body.Close() rqv, ok := rq.(*http.Request)
v, err := ioutil.ReadAll(rq.Body) if !ok {
return nil, handlers.ErrInvalidRequest
}
defer rqv.Body.Close()
v, err := ioutil.ReadAll(rqv.Body)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -45,33 +51,30 @@ func(rp *DefaultRequestParser) GetInput(rq *http.Request) ([]byte, error) {
type SessionHandler struct { type SessionHandler struct {
cfgTemplate engine.Config cfgTemplate engine.Config
rp RequestParser rp handlers.RequestParser
rs resource.Resource rs resource.Resource
//first resource.EntryFunc
hn *ussd.Handlers hn *ussd.Handlers
provider StorageProvider provider storage.StorageProvider
} }
//func NewSessionHandler(cfg engine.Config, rs resource.Resource, stateDb db.Db, userdataDb db.Db, rp RequestParser, first resource.EntryFunc) *SessionHandler { func NewSessionHandler(cfg engine.Config, rs resource.Resource, stateDb db.Db, userdataDb db.Db, rp handlers.RequestParser, hn *ussd.Handlers) *SessionHandler {
func NewSessionHandler(cfg engine.Config, rs resource.Resource, stateDb db.Db, userdataDb db.Db, rp RequestParser, hn *ussd.Handlers) *SessionHandler {
return &SessionHandler{ return &SessionHandler{
cfgTemplate: cfg, cfgTemplate: cfg,
rs: rs, rs: rs,
//first: first,
hn: hn, hn: hn,
rp: rp, rp: rp,
provider: NewSimpleStorageProvider(stateDb, userdataDb), provider: storage.NewSimpleStorageProvider(stateDb, userdataDb),
} }
} }
func(f *SessionHandler) writeError(w http.ResponseWriter, code int, msg string, err error) { func(f *SessionHandler) writeError(w http.ResponseWriter, code int, err error) {
w.Header().Set("X-Vise", msg + ": " + err.Error()) s := err.Error()
w.Header().Set("Content-Length", "0") w.Header().Set("Content-Length", strconv.Itoa(len(s)))
w.WriteHeader(code) w.WriteHeader(code)
_, err = w.Write([]byte{}) _, err = w.Write([]byte{})
if err != nil { if err != nil {
logg.Errorf("error writing error!!", "err", err, "olderr", s)
w.WriteHeader(500) w.WriteHeader(500)
w.Header().Set("X-Vise", err.Error())
} }
return return
} }
@ -83,68 +86,115 @@ func(f* SessionHandler) Shutdown() {
} }
} }
func(f *SessionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { func(f *SessionHandler) GetEngine(cfg engine.Config, rs resource.Resource, pr *persist.Persister) engine.Engine {
en := engine.NewEngine(cfg, rs)
en = en.WithPersister(pr)
return en
}
func(f *SessionHandler) Process(rqs handlers.RequestSession) (handlers.RequestSession, error) {
var r bool var r bool
sessionId, err := f.rp.GetSessionId(req) var err error
if err != nil { var ok bool
f.writeError(w, 400, "Session missing", err)
return logg.InfoCtxf(rqs.Ctx, "new request", rqs)
}
input, err := f.rp.GetInput(req)
if err != nil {
f.writeError(w, 400, "Input read fail", err)
return
}
ctx := req.Context()
cfg := f.cfgTemplate
cfg.SessionId = sessionId
logg.InfoCtxf(ctx, "new request", "session", cfg.SessionId, "input", input) rqs.Storage, err = f.provider.Get(rqs.Config.SessionId)
storage, err := f.provider.Get(cfg.SessionId)
if err != nil { if err != nil {
f.writeError(w, 500, "Storage retrieval fail", err) logg.ErrorCtxf(rqs.Ctx, "", "storage error", "err", err)
return return rqs, handlers.ErrStorage
}
f.hn = f.hn.WithPersister(rqs.Storage.Persister)
eni := f.GetEngine(rqs.Config, f.rs, rqs.Storage.Persister)
en, ok := eni.(*engine.DefaultEngine)
if !ok {
return rqs, handlers.ErrEngineType
} }
f.hn = f.hn.WithPersister(storage.Persister)
defer f.provider.Put(cfg.SessionId, storage)
en := getEngine(cfg, f.rs, storage.Persister)
en = en.WithFirst(f.hn.Init) en = en.WithFirst(f.hn.Init)
if cfg.EngineDebug { if rqs.Config.EngineDebug {
en = en.WithDebug(nil) en = en.WithDebug(nil)
} }
rqs.Engine = en
r, err = en.Init(ctx) r, err = rqs.Engine.Init(rqs.Ctx)
if err != nil { if err != nil {
f.writeError(w, 500, "Engine init fail", err) return rqs, err
}
if r && len(rqs.Input) > 0 {
r, err = rqs.Engine.Exec(rqs.Ctx, rqs.Input)
}
if err != nil {
return rqs, err
}
_ = r
return rqs, nil
}
func(f *SessionHandler) Output(rqs handlers.RequestSession) error {
var err error
_, err = rqs.Engine.WriteResult(rqs.Ctx, rqs.Writer)
return err
}
func(f *SessionHandler) Reset(rqs handlers.RequestSession) error {
defer f.provider.Put(rqs.Config.SessionId, rqs.Storage)
return rqs.Engine.Finish()
}
func(f *SessionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
var code int
var err error
rqs := handlers.RequestSession{
Ctx: req.Context(),
Writer: w,
}
cfg := f.cfgTemplate
cfg.SessionId, err = f.rp.GetSessionId(req)
if err != nil {
logg.ErrorCtxf(rqs.Ctx, "", "header processing error", err)
f.writeError(w, 400, err)
}
rqs.Config = cfg
rqs.Input, err = f.rp.GetInput(req)
if err != nil {
logg.ErrorCtxf(rqs.Ctx, "", "header processing error", err)
f.writeError(w, 400, err)
return return
} }
if r && len(input) > 0 {
r, err = en.Exec(ctx, input) rqs, err = f.Process(rqs)
switch err {
case handlers.ErrStorage:
code = 500
case handlers.ErrEngineInit:
code = 500
case handlers.ErrEngineExec:
code = 500
default:
code = 200
} }
if err != nil {
f.writeError(w, 500, "Engine exec fail", err) if code != 200 {
f.writeError(w, 500, err)
return return
} }
w.WriteHeader(200) w.WriteHeader(200)
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/plain")
_, err = en.WriteResult(ctx, w) err = f.Output(rqs)
if err != nil { if err != nil {
f.writeError(w, 500, "Write result fail", err) f.writeError(w, 500, err)
return
}
err = en.Finish()
if err != nil {
f.writeError(w, 500, "Engine finish fail", err)
return return
} }
_ = r err = f.Reset(rqs)
} if err != nil {
f.writeError(w, 500, err)
func getEngine(cfg engine.Config, rs resource.Resource, pr *persist.Persister) *engine.DefaultEngine { return
en := engine.NewEngine(cfg, rs) }
en = en.WithPersister(pr)
return en
} }

View File

@ -1,4 +1,4 @@
package http package storage
import ( import (
"git.defalsify.org/vise.git/db" "git.defalsify.org/vise.git/db"