compressed session into manager

This commit is contained in:
Jay
2026-05-17 19:34:41 -04:00
parent c2503922fc
commit d8a5a7a58c
3 changed files with 91 additions and 542 deletions
-53
View File
@@ -4,9 +4,7 @@ import (
"context"
"git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
)
@@ -92,57 +90,6 @@ func (p *mockPool) receive(data []byte) {
}
}
// Mock request session harness
type mockSessionHarness struct {
ctx context.Context
id string
filters [][]byte
req []byte
inbox chan sessionMessage
events chan ReqEvent
closed chan ReqClosed
closedOnce *sync.Once
done chan struct{}
sent chan []byte
send func([]byte) error
preterminate func()
terminatedWith chan terminateReason
terminate func(terminateReason)
}
func newMockSessionHarness() *mockSessionHarness {
ctx := component.MustNew(context.Background(), "prism", "test")
filters := [][]byte{[]byte(`{}`)}
id := "TESTREQ"
sent := make(chan []byte, 2)
send := func(data []byte) error {
sent <- data
return nil
}
inbox := make(chan sessionMessage, 16)
preterminate := func() { close(inbox) }
terminatedWith := make(chan terminateReason, 1)
terminate := func(r terminateReason) { terminatedWith <- r }
return &mockSessionHarness{
ctx: ctx,
id: id,
filters: filters,
req: envelope.EncloseReq(id, filters),
inbox: inbox,
events: make(chan ReqEvent, 16),
closed: make(chan ReqClosed, 16),
closedOnce: &sync.Once{},
done: make(chan struct{}),
sent: sent,
send: send,
preterminate: preterminate,
terminatedWith: terminatedWith,
terminate: terminate,
}
}
// MockEnvoy
func newMockEnvoy(t *testing.T) (*mockPool, *Envoy) {
+75 -233
View File
@@ -31,9 +31,6 @@ type ReqClosed struct {
type RequestManager struct {
reqs map[string]*request
sessions map[string]*session
inboxSubs map[string]chan<- sessionMessage
done chan struct{}
sessionWg sync.WaitGroup
envoy *Envoy
events <-chan OutboundPoolEvent
@@ -54,46 +51,15 @@ type request struct {
events chan ReqEvent
closed chan ReqClosed
deregisterOnce sync.Once
closedOnce sync.Once
}
type session struct {
id string
req []byte
inbox <-chan sessionMessage
forwardEvent chan<- ReqEvent
forwardClosed chan<- ReqClosed
closedOnce *sync.Once
done chan struct{}
send func([]byte) error
preterminate func()
terminate func(terminateReason)
closeOnEOSE bool
ctx context.Context
cancel context.CancelFunc
logger *slog.Logger
isQuery bool
request *request
}
type sessionMessage struct {
label string
peerID string
receivedAt time.Time
data []byte
}
type terminateReason int
const (
termSendFailed terminateReason = iota
termClosedOnEOSE
termReceivedClosed
termDone
termCancelled
)
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
@@ -120,7 +86,6 @@ func NewRequestManager(e *Envoy) *RequestManager {
m := &RequestManager{
reqs: make(map[string]*request),
sessions: make(map[string]*session),
inboxSubs: make(map[string]chan<- sessionMessage),
envoy: e,
events: e.SubscribeEvents(),
@@ -136,8 +101,8 @@ func NewRequestManager(e *Envoy) *RequestManager {
m.logger = slog.New(h).With(slog.Any("component", comp))
}
// start event handler
m.wg.Add(1)
m.wg.Add(2)
go m.handleEvents()
go m.routeInbox()
return m
@@ -194,7 +159,6 @@ func (m *RequestManager) Query(
m.mu.Lock()
m.reqs[id] = req
m.spawnSession(req, true)
m.mu.Unlock()
@@ -229,8 +193,9 @@ func (m *RequestManager) Cancel(id string) error {
return fmt.Errorf("Cancel: unknown id %q", id)
}
if sess, ok := m.sessions[id]; ok {
sess.Close()
if _, ok := m.sessions[id]; ok {
go m.envoy.Send(envelope.EncloseClose(id))
delete(m.sessions, id)
}
req.deregisterOnce.Do(func() {
@@ -246,82 +211,79 @@ func (m *RequestManager) Close() {
m.cancel()
m.wg.Wait()
m.mu.RLock()
sessions := make(map[string]*session)
for id, s := range m.sessions {
sessions[id] = s
}
m.mu.RUnlock()
for _, sess := range sessions {
sess.Close()
}
m.sessionWg.Wait()
m.mu.Lock()
defer m.mu.Unlock()
for id, req := range m.reqs {
if _, ok := m.sessions[id]; ok {
go m.envoy.Send(envelope.EncloseClose(id))
}
req.deregisterOnce.Do(func() {
close(req.buffer)
close(req.closed)
})
delete(m.reqs, id)
}
m.mu.Unlock()
for id := range m.sessions {
delete(m.sessions, id)
}
}
func (m *RequestManager) spawnSession(req *request, query bool) {
sessionInbox := make(chan sessionMessage, 64)
m.inboxSubs[req.id] = sessionInbox
var once sync.Once
preterminate := func() {
m.mu.Lock()
delete(m.inboxSubs, req.id)
m.mu.Unlock()
sessionInbox <- sessionMessage{label: "EOF"}
sess := &session{
id: req.id,
req: envelope.EncloseReq(req.id, req.filters),
isQuery: query,
request: req,
}
m.sessions[req.id] = sess
go m.envoy.Send(sess.req)
}
terminate := func(r terminateReason) {
once.Do(func() {
m.mu.Lock()
delete(m.sessions, req.id)
m.mu.Unlock()
m.sessionWg.Done()
if r == termReceivedClosed || r == termClosedOnEOSE {
func (m *RequestManager) deregister(req *request) {
req.deregisterOnce.Do(func() {
close(req.buffer)
close(req.closed)
})
m.mu.Lock()
delete(m.reqs, req.id)
m.mu.Unlock()
}
})
}
req_env := envelope.EncloseReq(req.id, req.filters)
sess := newSession(
m.ctx, req.id, req_env, sessionInbox, req.buffer, req.closed, &req.closedOnce,
m.done, m.envoy.Send, preterminate, terminate, query, m.handler,
)
m.sessions[req.id] = sess
m.sessionWg.Add(1)
go sess.run()
delete(m.sessions, req.id)
}
func (m *RequestManager) start() {
// start all request sessions
m.mu.Lock()
defer m.mu.Unlock()
for _, req := range m.reqs {
m.spawnSession(req, false)
}
}
func (m *RequestManager) stop() {
// stop all running sessions
m.mu.Lock()
defer m.mu.Unlock()
for id := range m.sessions {
delete(m.sessions, id)
}
}
func (m *RequestManager) handleEvents() {
defer m.wg.Done()
// start/stop sessions on connect/disconnect
for {
select {
case <-m.ctx.Done():
return
case ev, ok := <-m.events:
if !ok {
return
}
switch ev.Kind {
case EventConnected:
m.start()
case EventDisconnected:
m.stop()
}
}
}
}
func (m *RequestManager) routeInbox() {
@@ -353,171 +315,51 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
return
}
m.mu.RLock()
sub, ok := m.inboxSubs[subID]
req, ok := m.reqs[subID]
m.mu.RUnlock()
if !ok {
return
}
sub <- sessionMessage{
label: "EVENT",
peerID: msg.ID,
receivedAt: msg.ReceivedAt,
data: event,
req.buffer <- ReqEvent{
PeerID: msg.ID,
ReceivedAt: msg.ReceivedAt,
Data: event,
}
case "EOSE":
subID, err := envelope.FindEOSE(msg.Data)
if err != nil {
return
}
m.mu.RLock()
sub, ok := m.inboxSubs[subID]
m.mu.RUnlock()
m.mu.Lock()
sess, ok := m.sessions[subID]
if !ok {
m.mu.Unlock()
return
}
sub <- sessionMessage{
label: "EOSE",
peerID: msg.ID,
receivedAt: msg.ReceivedAt,
if sess.isQuery {
m.deregister(sess.request)
go m.envoy.Send(envelope.EncloseClose(subID))
}
m.mu.Unlock()
case "CLOSED":
subID, message, err := envelope.FindClosed(msg.Data)
if err != nil {
return
}
m.mu.RLock()
sub, ok := m.inboxSubs[subID]
m.mu.RUnlock()
m.mu.Lock()
req, ok := m.reqs[subID]
if !ok {
m.mu.Unlock()
return
}
sub <- sessionMessage{
label: "CLOSED",
peerID: msg.ID,
receivedAt: msg.ReceivedAt,
data: []byte(message),
req.closed <- ReqClosed{
PeerID: msg.ID,
ReceivedAt: msg.ReceivedAt,
Data: message,
}
m.deregister(req)
m.mu.Unlock()
}
}
}
// ----------------------------------------------------------------------------
// Session
// ----------------------------------------------------------------------------
func newSession(
ctx context.Context,
id string,
req []byte,
inbox <-chan sessionMessage,
forwardEvent chan<- ReqEvent,
forwardClosed chan<- ReqClosed,
closedOnce *sync.Once,
done chan struct{},
send func(data []byte) error,
preterminate func(),
terminate func(terminateReason),
isQuery bool,
handler slog.Handler,
) *session {
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "session"))
s := &session{
id: id,
req: req,
inbox: inbox,
forwardEvent: forwardEvent,
forwardClosed: forwardClosed,
closedOnce: closedOnce,
done: done,
send: send,
preterminate: preterminate,
terminate: terminate,
closeOnEOSE: isQuery,
ctx: ctx,
cancel: cancel,
}
// create logger if handler is supplied
return s
}
func (s *session) run() {
// send initial REQ; goroutine allows done/ctx cancellation to abort the wait
sent := make(chan error, 1)
go func() { sent <- s.send(s.req) }()
drain := func() {
for msg := range s.inbox {
if msg.label == "EOF" {
return
}
switch msg.label {
case "EVENT":
s.forwardEvent <- ReqEvent{
PeerID: msg.peerID,
ReceivedAt: msg.receivedAt,
Data: msg.data,
}
case "EOSE":
case "CLOSED":
s.closedOnce.Do(func() {
s.forwardClosed <- ReqClosed{
PeerID: msg.peerID,
ReceivedAt: msg.receivedAt,
Data: string(msg.data),
}
})
}
}
}
exit := func(tr terminateReason) {
s.preterminate()
drain()
s.terminate(tr)
}
for {
select {
case err := <-sent:
if err != nil {
exit(termSendFailed)
return
}
case <-s.done:
exit(termDone)
return
case <-s.ctx.Done():
s.send(envelope.EncloseClose(s.id))
exit(termCancelled)
return
case msg := <-s.inbox:
switch msg.label {
case "EVENT":
s.forwardEvent <- ReqEvent{
PeerID: msg.peerID,
ReceivedAt: msg.receivedAt,
Data: msg.data,
}
case "EOSE":
if s.closeOnEOSE {
s.send(envelope.EncloseClose(s.id))
exit(termClosedOnEOSE)
return
}
case "CLOSED":
s.closedOnce.Do(func() {
s.forwardClosed <- ReqClosed{
PeerID: msg.peerID,
ReceivedAt: msg.receivedAt,
Data: string(msg.data),
}
})
exit(termReceivedClosed)
return
}
}
}
}
func (s *session) Close() {
s.cancel()
}
+6 -246
View File
@@ -1,212 +1,12 @@
package prism
import (
"fmt"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
// Session tests exercise the session struct in isolation.
// The session is constructed directly with mock channels and callbacks.
// These tests do not go through RequestManager.
func TestRequestManager_Session(t *testing.T) {
t.Run("sends req on start", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, false, nil)
go s.run()
var got []byte
Eventually(t, func() bool {
select {
case got = <-h.sent:
return true
default:
return false
}
}, "expected send")
assert.Equal(t, []byte(h.req), got)
})
t.Run("terminates on failed req send", func(t *testing.T) {
h := newMockSessionHarness()
send := func([]byte) error { return fmt.Errorf("send failed") }
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, send, h.preterminate, h.terminate, false, nil)
go s.run()
Eventually(t, func() bool {
select {
case r := <-h.terminatedWith:
return r == termSendFailed
default:
return false
}
}, "expected termSendFailed")
})
t.Run("ignores eose if stream", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, false, nil)
go s.run()
// wait for initial REQ send before proceeding
Eventually(t, func() bool {
select {
case <-h.sent:
return true
default:
return false
}
}, "expected initial send")
h.inbox <- sessionMessage{label: "EOSE"}
Never(t, func() bool {
select {
case <-h.terminatedWith:
return true
default:
return false
}
}, "terminate should not be called on eose for stream")
})
t.Run("sends close on eose if query", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, true, nil)
go s.run()
// drain initial REQ send
Eventually(t, func() bool {
select {
case <-h.sent:
return true
default:
return false
}
}, "expected initial REQ send")
h.inbox <- sessionMessage{label: "EOSE"}
var got []byte
Eventually(t, func() bool {
select {
case got = <-h.sent:
return true
default:
return false
}
}, "expected CLOSE send")
assert.Equal(t, []byte(envelope.EncloseClose(h.id)), got)
Eventually(t, func() bool {
select {
case r := <-h.terminatedWith:
return r == termClosedOnEOSE
default:
return false
}
}, "expected termCloseSent")
})
t.Run("terminates on done close", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, false, nil)
go s.run()
// wait for initial req
Eventually(t, func() bool {
select {
case <-h.sent:
return true
default:
return false
}
}, "expected initial send")
// close with done
close(h.done)
Eventually(t, func() bool {
select {
case r := <-h.terminatedWith:
return r == termDone
default:
return false
}
}, "expected termDone after done closed")
})
t.Run("terminates on context cancel", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, false, nil)
go s.run()
Eventually(t, func() bool {
select {
case <-h.sent:
return true
default:
return false
}
}, "expected initial send")
s.Close()
Eventually(t, func() bool {
select {
case r := <-h.terminatedWith:
return r == termCancelled
default:
return false
}
}, "expected termCancelled after context cancel")
})
t.Run("terminates on closed signal", func(t *testing.T) {
h := newMockSessionHarness()
s := newSession(
h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce,
h.done, h.send, h.preterminate, h.terminate, false, nil)
go s.run()
Eventually(t, func() bool {
select {
case <-h.sent:
return true
default:
return false
}
}, "expected initial send")
h.inbox <- sessionMessage{label: "CLOSED"}
Eventually(t, func() bool {
select {
case r := <-h.terminatedWith:
return r == termReceivedClosed
default:
return false
}
}, "expected termReceivedClosed")
})
}
func TestRequestManager_Stream(t *testing.T) {
t.Run("spawns session and sends req when connected", func(t *testing.T) {
p, envoy := newMockEnvoy(t)
@@ -329,7 +129,7 @@ func TestRequestManager_Stream(t *testing.T) {
_, ok := m.sessions[id]
m.mu.RUnlock()
return !ok
}, "session should not terminate after eose")
}, "session should not be removed after eose")
Never(t, func() bool {
select {
@@ -410,40 +210,6 @@ func TestRequestManager_Stream(t *testing.T) {
m.mu.RUnlock()
assert.False(t, ok, "registration should be removed from reqs")
})
t.Run("duplicate closed does not panic", func(t *testing.T) {
p, envoy := newMockEnvoy(t)
p.connect()
Eventually(t, envoy.IsConnected, "envoy should be connected")
m := NewRequestManager(envoy)
t.Cleanup(func() { m.Close() })
filters := [][]byte{[]byte(`{}`)}
id, _, _ := m.Stream(filters)
// drain the REQ send
Eventually(t, func() bool {
select {
case <-p.sent:
return true
default:
return false
}
}, "expected REQ send")
// inject both before the router can process either
p.receive(envelope.EncloseClosed(id, "error: first"))
p.receive(envelope.EncloseClosed(id, "error: second"))
// if the router panics, the test will fail with a goroutine crash;
// assert the session eventually terminates cleanly as a liveness check
Eventually(t, func() bool {
m.mu.RLock()
_, ok := m.sessions[id]
m.mu.RUnlock()
return !ok
}, "session should terminate after closed")
})
}
func TestRequestManager_Cancel(t *testing.T) {
@@ -481,17 +247,12 @@ func TestRequestManager_Cancel(t *testing.T) {
}, "expected CLOSE send")
assert.Equal(t, []byte(envelope.EncloseClose(id)), got)
Eventually(t, func() bool {
m.mu.RLock()
_, ok := m.sessions[id]
_, sessOk := m.sessions[id]
_, reqOk := m.reqs[id]
m.mu.RUnlock()
return !ok
}, "session should be removed")
m.mu.RLock()
_, ok := m.reqs[id]
m.mu.RUnlock()
assert.False(t, ok, "registration should be removed from reqs")
assert.False(t, sessOk, "session should be removed")
assert.False(t, reqOk, "registration should be removed from reqs")
Eventually(t, func() bool {
select {
@@ -573,7 +334,7 @@ func TestRequestManager_Query(t *testing.T) {
assert.Len(t, events, 3)
assert.Nil(t, closed)
// CLOSE envelope should have been sent by the session
// CLOSE envelope should have been sent after EOSE
var closeEnv []byte
select {
case closeEnv = <-p.sent:
@@ -666,7 +427,6 @@ func _TestRequestManager_Reconnect(t *testing.T) {
// connect, open two streams
// send a disconnect event into the mock events channel
// assert both sessions are removed from sessions map
// assert sessionWg reaches zero
})
t.Run("registrations survive disconnect", func(t *testing.T) {