diff --git a/helpers_test.go b/helpers_test.go index c1c4c8a..e2bdcb6 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -4,9 +4,7 @@ import ( "context" "git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-mana-component" - "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" - "sync" "testing" "time" ) @@ -92,57 +90,6 @@ func (p *mockPool) receive(data []byte) { } } -// Mock request session harness - -type mockSessionHarness struct { - ctx context.Context - id string - filters [][]byte - req []byte - inbox chan sessionMessage - events chan ReqEvent - closed chan ReqClosed - closedOnce *sync.Once - done chan struct{} - sent chan []byte - send func([]byte) error - preterminate func() - terminatedWith chan terminateReason - terminate func(terminateReason) -} - -func newMockSessionHarness() *mockSessionHarness { - ctx := component.MustNew(context.Background(), "prism", "test") - filters := [][]byte{[]byte(`{}`)} - id := "TESTREQ" - sent := make(chan []byte, 2) - send := func(data []byte) error { - sent <- data - return nil - } - inbox := make(chan sessionMessage, 16) - preterminate := func() { close(inbox) } - terminatedWith := make(chan terminateReason, 1) - terminate := func(r terminateReason) { terminatedWith <- r } - - return &mockSessionHarness{ - ctx: ctx, - id: id, - filters: filters, - req: envelope.EncloseReq(id, filters), - inbox: inbox, - events: make(chan ReqEvent, 16), - closed: make(chan ReqClosed, 16), - closedOnce: &sync.Once{}, - done: make(chan struct{}), - sent: sent, - send: send, - preterminate: preterminate, - terminatedWith: terminatedWith, - terminate: terminate, - } -} - // MockEnvoy func newMockEnvoy(t *testing.T) (*mockPool, *Envoy) { diff --git a/request.go b/request.go index 8f788c1..90310a7 100644 --- a/request.go +++ b/request.go @@ -29,11 +29,8 @@ type ReqClosed struct { } type RequestManager struct { - reqs map[string]*request - sessions map[string]*session - inboxSubs map[string]chan<- sessionMessage - done chan struct{} - sessionWg sync.WaitGroup + reqs map[string]*request + sessions map[string]*session envoy *Envoy events <-chan OutboundPoolEvent @@ -54,46 +51,15 @@ type request struct { events chan ReqEvent closed chan ReqClosed deregisterOnce sync.Once - closedOnce sync.Once } type session struct { - id string - req []byte - - inbox <-chan sessionMessage - forwardEvent chan<- ReqEvent - forwardClosed chan<- ReqClosed - closedOnce *sync.Once - - done chan struct{} - send func([]byte) error - preterminate func() - terminate func(terminateReason) - closeOnEOSE bool - - ctx context.Context - cancel context.CancelFunc - logger *slog.Logger + id string + req []byte + isQuery bool + request *request } -type sessionMessage struct { - label string - peerID string - receivedAt time.Time - data []byte -} - -type terminateReason int - -const ( - termSendFailed terminateReason = iota - termClosedOnEOSE - termReceivedClosed - termDone - termCancelled -) - // ---------------------------------------------------------------------------- // Helpers // ---------------------------------------------------------------------------- @@ -118,9 +84,8 @@ func NewRequestManager(e *Envoy) *RequestManager { component.MustExtend(e.Context(), "request_manager")) m := &RequestManager{ - reqs: make(map[string]*request), - sessions: make(map[string]*session), - inboxSubs: make(map[string]chan<- sessionMessage), + reqs: make(map[string]*request), + sessions: make(map[string]*session), envoy: e, events: e.SubscribeEvents(), @@ -136,8 +101,8 @@ func NewRequestManager(e *Envoy) *RequestManager { m.logger = slog.New(h).With(slog.Any("component", comp)) } - // start event handler - m.wg.Add(1) + m.wg.Add(2) + go m.handleEvents() go m.routeInbox() return m @@ -194,7 +159,6 @@ func (m *RequestManager) Query( m.mu.Lock() m.reqs[id] = req - m.spawnSession(req, true) m.mu.Unlock() @@ -229,8 +193,9 @@ func (m *RequestManager) Cancel(id string) error { return fmt.Errorf("Cancel: unknown id %q", id) } - if sess, ok := m.sessions[id]; ok { - sess.Close() + if _, ok := m.sessions[id]; ok { + go m.envoy.Send(envelope.EncloseClose(id)) + delete(m.sessions, id) } req.deregisterOnce.Do(func() { @@ -246,82 +211,79 @@ func (m *RequestManager) Close() { m.cancel() m.wg.Wait() - m.mu.RLock() - sessions := make(map[string]*session) - for id, s := range m.sessions { - sessions[id] = s - } - m.mu.RUnlock() - - for _, sess := range sessions { - sess.Close() - } - - m.sessionWg.Wait() - m.mu.Lock() + defer m.mu.Unlock() + for id, req := range m.reqs { + if _, ok := m.sessions[id]; ok { + go m.envoy.Send(envelope.EncloseClose(id)) + } req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) delete(m.reqs, id) } - m.mu.Unlock() + for id := range m.sessions { + delete(m.sessions, id) + } } func (m *RequestManager) spawnSession(req *request, query bool) { - sessionInbox := make(chan sessionMessage, 64) - m.inboxSubs[req.id] = sessionInbox - - var once sync.Once - preterminate := func() { - m.mu.Lock() - delete(m.inboxSubs, req.id) - m.mu.Unlock() - sessionInbox <- sessionMessage{label: "EOF"} + sess := &session{ + id: req.id, + req: envelope.EncloseReq(req.id, req.filters), + isQuery: query, + request: req, } - - terminate := func(r terminateReason) { - once.Do(func() { - m.mu.Lock() - delete(m.sessions, req.id) - m.mu.Unlock() - m.sessionWg.Done() - if r == termReceivedClosed || r == termClosedOnEOSE { - req.deregisterOnce.Do(func() { - close(req.buffer) - close(req.closed) - }) - m.mu.Lock() - delete(m.reqs, req.id) - m.mu.Unlock() - } - }) - } - - req_env := envelope.EncloseReq(req.id, req.filters) - sess := newSession( - m.ctx, req.id, req_env, sessionInbox, req.buffer, req.closed, &req.closedOnce, - m.done, m.envoy.Send, preterminate, terminate, query, m.handler, - ) m.sessions[req.id] = sess - m.sessionWg.Add(1) - go sess.run() + go m.envoy.Send(sess.req) +} + +func (m *RequestManager) deregister(req *request) { + req.deregisterOnce.Do(func() { + close(req.buffer) + close(req.closed) + }) + delete(m.reqs, req.id) + delete(m.sessions, req.id) } func (m *RequestManager) start() { - // start all request sessions + m.mu.Lock() + defer m.mu.Unlock() + for _, req := range m.reqs { + m.spawnSession(req, false) + } } func (m *RequestManager) stop() { - // stop all running sessions + m.mu.Lock() + defer m.mu.Unlock() + for id := range m.sessions { + delete(m.sessions, id) + } } func (m *RequestManager) handleEvents() { defer m.wg.Done() - // start/stop sessions on connect/disconnect + for { + select { + case <-m.ctx.Done(): + return + case ev, ok := <-m.events: + if !ok { + return + } + switch ev.Kind { + case EventConnected: + m.start() + case EventDisconnected: + m.stop() + } + } + } } func (m *RequestManager) routeInbox() { @@ -353,171 +315,51 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { return } m.mu.RLock() - sub, ok := m.inboxSubs[subID] + req, ok := m.reqs[subID] m.mu.RUnlock() if !ok { return } - sub <- sessionMessage{ - label: "EVENT", - peerID: msg.ID, - receivedAt: msg.ReceivedAt, - data: event, + req.buffer <- ReqEvent{ + PeerID: msg.ID, + ReceivedAt: msg.ReceivedAt, + Data: event, } + case "EOSE": subID, err := envelope.FindEOSE(msg.Data) if err != nil { return } - m.mu.RLock() - sub, ok := m.inboxSubs[subID] - m.mu.RUnlock() + m.mu.Lock() + sess, ok := m.sessions[subID] if !ok { + m.mu.Unlock() return } - sub <- sessionMessage{ - label: "EOSE", - peerID: msg.ID, - receivedAt: msg.ReceivedAt, + if sess.isQuery { + m.deregister(sess.request) + go m.envoy.Send(envelope.EncloseClose(subID)) } + m.mu.Unlock() + case "CLOSED": subID, message, err := envelope.FindClosed(msg.Data) if err != nil { return } - m.mu.RLock() - sub, ok := m.inboxSubs[subID] - m.mu.RUnlock() + m.mu.Lock() + req, ok := m.reqs[subID] if !ok { + m.mu.Unlock() return } - sub <- sessionMessage{ - label: "CLOSED", - peerID: msg.ID, - receivedAt: msg.ReceivedAt, - data: []byte(message), + req.closed <- ReqClosed{ + PeerID: msg.ID, + ReceivedAt: msg.ReceivedAt, + Data: message, } + m.deregister(req) + m.mu.Unlock() } } - -// ---------------------------------------------------------------------------- -// Session -// ---------------------------------------------------------------------------- - -func newSession( - ctx context.Context, - id string, - req []byte, - inbox <-chan sessionMessage, - forwardEvent chan<- ReqEvent, - forwardClosed chan<- ReqClosed, - closedOnce *sync.Once, - done chan struct{}, - send func(data []byte) error, - preterminate func(), - terminate func(terminateReason), - isQuery bool, - handler slog.Handler, -) *session { - ctx, cancel := context.WithCancel(component.MustExtend(ctx, "session")) - s := &session{ - id: id, - req: req, - inbox: inbox, - forwardEvent: forwardEvent, - forwardClosed: forwardClosed, - closedOnce: closedOnce, - done: done, - send: send, - preterminate: preterminate, - terminate: terminate, - closeOnEOSE: isQuery, - ctx: ctx, - cancel: cancel, - } - // create logger if handler is supplied - return s -} - -func (s *session) run() { - // send initial REQ; goroutine allows done/ctx cancellation to abort the wait - sent := make(chan error, 1) - go func() { sent <- s.send(s.req) }() - - drain := func() { - for msg := range s.inbox { - if msg.label == "EOF" { - return - } - switch msg.label { - case "EVENT": - s.forwardEvent <- ReqEvent{ - PeerID: msg.peerID, - ReceivedAt: msg.receivedAt, - Data: msg.data, - } - case "EOSE": - case "CLOSED": - s.closedOnce.Do(func() { - s.forwardClosed <- ReqClosed{ - PeerID: msg.peerID, - ReceivedAt: msg.receivedAt, - Data: string(msg.data), - } - }) - } - } - } - - exit := func(tr terminateReason) { - s.preterminate() - drain() - s.terminate(tr) - } - - for { - select { - case err := <-sent: - if err != nil { - exit(termSendFailed) - return - } - case <-s.done: - exit(termDone) - return - case <-s.ctx.Done(): - s.send(envelope.EncloseClose(s.id)) - exit(termCancelled) - return - case msg := <-s.inbox: - switch msg.label { - case "EVENT": - s.forwardEvent <- ReqEvent{ - PeerID: msg.peerID, - ReceivedAt: msg.receivedAt, - Data: msg.data, - } - case "EOSE": - if s.closeOnEOSE { - s.send(envelope.EncloseClose(s.id)) - exit(termClosedOnEOSE) - return - } - case "CLOSED": - s.closedOnce.Do(func() { - s.forwardClosed <- ReqClosed{ - PeerID: msg.peerID, - ReceivedAt: msg.receivedAt, - Data: string(msg.data), - } - }) - exit(termReceivedClosed) - return - } - } - } -} - -func (s *session) Close() { - s.cancel() -} diff --git a/request_test.go b/request_test.go index 0a162b7..79aa58c 100644 --- a/request_test.go +++ b/request_test.go @@ -1,212 +1,12 @@ package prism import ( - "fmt" "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" "testing" "time" ) -// Session tests exercise the session struct in isolation. -// The session is constructed directly with mock channels and callbacks. -// These tests do not go through RequestManager. -func TestRequestManager_Session(t *testing.T) { - t.Run("sends req on start", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, false, nil) - go s.run() - - var got []byte - Eventually(t, func() bool { - select { - case got = <-h.sent: - return true - default: - return false - } - }, "expected send") - - assert.Equal(t, []byte(h.req), got) - }) - - t.Run("terminates on failed req send", func(t *testing.T) { - h := newMockSessionHarness() - send := func([]byte) error { return fmt.Errorf("send failed") } - - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, send, h.preterminate, h.terminate, false, nil) - go s.run() - - Eventually(t, func() bool { - select { - case r := <-h.terminatedWith: - return r == termSendFailed - default: - return false - } - }, "expected termSendFailed") - }) - - t.Run("ignores eose if stream", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, false, nil) - go s.run() - - // wait for initial REQ send before proceeding - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "expected initial send") - - h.inbox <- sessionMessage{label: "EOSE"} - - Never(t, func() bool { - select { - case <-h.terminatedWith: - return true - default: - return false - } - }, "terminate should not be called on eose for stream") - }) - - t.Run("sends close on eose if query", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, true, nil) - go s.run() - - // drain initial REQ send - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "expected initial REQ send") - - h.inbox <- sessionMessage{label: "EOSE"} - - var got []byte - Eventually(t, func() bool { - select { - case got = <-h.sent: - return true - default: - return false - } - }, "expected CLOSE send") - - assert.Equal(t, []byte(envelope.EncloseClose(h.id)), got) - - Eventually(t, func() bool { - select { - case r := <-h.terminatedWith: - return r == termClosedOnEOSE - default: - return false - } - }, "expected termCloseSent") - }) - - t.Run("terminates on done close", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, false, nil) - go s.run() - - // wait for initial req - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "expected initial send") - - // close with done - close(h.done) - Eventually(t, func() bool { - select { - case r := <-h.terminatedWith: - return r == termDone - default: - return false - } - }, "expected termDone after done closed") - }) - - t.Run("terminates on context cancel", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, false, nil) - go s.run() - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "expected initial send") - - s.Close() - - Eventually(t, func() bool { - select { - case r := <-h.terminatedWith: - return r == termCancelled - default: - return false - } - }, "expected termCancelled after context cancel") - }) - - t.Run("terminates on closed signal", func(t *testing.T) { - h := newMockSessionHarness() - s := newSession( - h.ctx, h.id, h.req, h.inbox, h.events, h.closed, h.closedOnce, - h.done, h.send, h.preterminate, h.terminate, false, nil) - go s.run() - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "expected initial send") - - h.inbox <- sessionMessage{label: "CLOSED"} - - Eventually(t, func() bool { - select { - case r := <-h.terminatedWith: - return r == termReceivedClosed - default: - return false - } - }, "expected termReceivedClosed") - }) -} - func TestRequestManager_Stream(t *testing.T) { t.Run("spawns session and sends req when connected", func(t *testing.T) { p, envoy := newMockEnvoy(t) @@ -329,7 +129,7 @@ func TestRequestManager_Stream(t *testing.T) { _, ok := m.sessions[id] m.mu.RUnlock() return !ok - }, "session should not terminate after eose") + }, "session should not be removed after eose") Never(t, func() bool { select { @@ -410,40 +210,6 @@ func TestRequestManager_Stream(t *testing.T) { m.mu.RUnlock() assert.False(t, ok, "registration should be removed from reqs") }) - - t.Run("duplicate closed does not panic", func(t *testing.T) { - p, envoy := newMockEnvoy(t) - p.connect() - Eventually(t, envoy.IsConnected, "envoy should be connected") - - m := NewRequestManager(envoy) - t.Cleanup(func() { m.Close() }) - filters := [][]byte{[]byte(`{}`)} - id, _, _ := m.Stream(filters) - - // drain the REQ send - Eventually(t, func() bool { - select { - case <-p.sent: - return true - default: - return false - } - }, "expected REQ send") - - // inject both before the router can process either - p.receive(envelope.EncloseClosed(id, "error: first")) - p.receive(envelope.EncloseClosed(id, "error: second")) - - // if the router panics, the test will fail with a goroutine crash; - // assert the session eventually terminates cleanly as a liveness check - Eventually(t, func() bool { - m.mu.RLock() - _, ok := m.sessions[id] - m.mu.RUnlock() - return !ok - }, "session should terminate after closed") - }) } func TestRequestManager_Cancel(t *testing.T) { @@ -481,17 +247,12 @@ func TestRequestManager_Cancel(t *testing.T) { }, "expected CLOSE send") assert.Equal(t, []byte(envelope.EncloseClose(id)), got) - Eventually(t, func() bool { - m.mu.RLock() - _, ok := m.sessions[id] - m.mu.RUnlock() - return !ok - }, "session should be removed") - m.mu.RLock() - _, ok := m.reqs[id] + _, sessOk := m.sessions[id] + _, reqOk := m.reqs[id] m.mu.RUnlock() - assert.False(t, ok, "registration should be removed from reqs") + assert.False(t, sessOk, "session should be removed") + assert.False(t, reqOk, "registration should be removed from reqs") Eventually(t, func() bool { select { @@ -573,7 +334,7 @@ func TestRequestManager_Query(t *testing.T) { assert.Len(t, events, 3) assert.Nil(t, closed) - // CLOSE envelope should have been sent by the session + // CLOSE envelope should have been sent after EOSE var closeEnv []byte select { case closeEnv = <-p.sent: @@ -666,7 +427,6 @@ func _TestRequestManager_Reconnect(t *testing.T) { // connect, open two streams // send a disconnect event into the mock events channel // assert both sessions are removed from sessions map - // assert sessionWg reaches zero }) t.Run("registrations survive disconnect", func(t *testing.T) {