From 2a4b8ee5db4425f68bd1b0f1484a5cf4181ff602 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 11:18:37 -0400 Subject: [PATCH] Created skeleton for request manager --- .gitignore | 1 + embassy_test.go | 84 +--- helpers_test.go | 91 ++--- request.go | 700 ++++++++------------------------- request_test.go | 1005 +++++++++-------------------------------------- 5 files changed, 395 insertions(+), 1486 deletions(-) diff --git a/.gitignore b/.gitignore index f3d4377..d629cad 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ draft +vibe diff --git a/embassy_test.go b/embassy_test.go index 714906f..463e712 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -1,7 +1,6 @@ package prism import ( - "context" "git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" @@ -10,63 +9,15 @@ import ( "time" ) -func TestEmbassy_TEMPLATE(t *testing.T) { - ctx := context.Background() - url := "wss://test" - - connect := func(url string) error { return nil } - remove := func(url string) error { return nil } - sent := false - _ = sent - send := func(url string, data []byte) error { - sent = true - return nil - } - events := make(chan honeybee.OutboundPoolEvent) - inbox := make(chan honeybee.InboxMessage) - pool := EmbassyPlugin{ - Connect: connect, - Remove: remove, - Send: send, - Events: events, - Inbox: inbox, - } - - embassy := NewEmbassy(ctx, pool, nil) - embassy.Dispatch(url) - envoy := embassy.Call(url) - assert.NotNil(t, envoy) -} - func TestEmbassy_Dispatch(t *testing.T) { - ctx := context.Background() - url := "wss://test" + p := newMockPool(t) - connectCalled := make(chan struct{}) - removeCalled := make(chan struct{}) - connect := func(url string) error { close(connectCalled); return nil } - remove := func(url string) error { close(removeCalled); return nil } - sent := false - send := func(url string, data []byte) error { - sent = true - return nil - } - events := make(chan honeybee.OutboundPoolEvent) - inbox := make(chan honeybee.InboxMessage) - pool := EmbassyPlugin{ - Connect: connect, - Remove: remove, - Send: send, - Events: events, - Inbox: inbox, - } - - embassy := NewEmbassy(ctx, pool, nil) - embassy.Dispatch(url) - envoy := embassy.Call(url) + embassy := NewEmbassy(p.ctx, p.plugin, nil) + embassy.Dispatch(p.url) + envoy := embassy.Call(p.url) assert.NotNil(t, envoy) - _, ok := <-connectCalled + _, ok := <-p.added assert.False(t, ok) eventSub := envoy.SubscribeEvents() @@ -89,16 +40,16 @@ func TestEmbassy_Dispatch(t *testing.T) { close(inboxDone) }() - events <- honeybee.OutboundPoolEvent{ - ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()} - events <- honeybee.OutboundPoolEvent{ + p.events <- honeybee.OutboundPoolEvent{ + ID: p.url, Kind: honeybee.OutboundEventConnected, At: time.Now()} + p.events <- honeybee.OutboundPoolEvent{ ID: "wss://other", Kind: honeybee.OutboundEventConnected, At: time.Now()} - inbox <- honeybee.InboxMessage{ - ID: url, + p.inbox <- honeybee.InboxMessage{ + ID: p.url, Data: envelope.EncloseEvent([]byte("{}")), ReceivedAt: time.Now(), } - inbox <- honeybee.InboxMessage{ + p.inbox <- honeybee.InboxMessage{ ID: "wss://other", Data: envelope.EncloseEvent([]byte("{}")), ReceivedAt: time.Now(), @@ -116,11 +67,18 @@ func TestEmbassy_Dispatch(t *testing.T) { "should have only gotten one inbox message") envoy.Send([]byte("hello")) - assert.True(t, sent) + Eventually(t, func() bool { + select { + default: + return false + case msg := <-p.sent: + return string(msg) == "hello" + } + }, "should have sent message") envoy.Dismiss() - _, ok = <-removeCalled + _, ok = <-p.removed assert.False(t, ok) _, ok = <-eventDone @@ -130,6 +88,6 @@ func TestEmbassy_Dispatch(t *testing.T) { assert.False(t, ok) // envoy no longer in embassy - envoy = embassy.Call(url) + envoy = embassy.Call(p.url) assert.Nil(t, envoy) } diff --git a/helpers_test.go b/helpers_test.go index ed3caf2..768118d 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -4,7 +4,6 @@ import ( "context" "git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-mana-component" - "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" "testing" "time" @@ -26,95 +25,63 @@ func Never(t *testing.T, condition func() bool, msg string) { assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) } -func mustEncloseReq(id string, filters [][]byte) []byte { - return []byte(envelope.EncloseReq(id, filters)) -} - -// managerHarness wires up a real Envoy and RequestManager backed by -// controllable channels. Callers drive the envoy state and inbox by writing -// to the exported channels. -type managerHarness struct { - envoy *Envoy - manager *RequestManager +type mockPool struct { + plugin EmbassyPlugin + ctx context.Context + url string + added chan struct{} + removed chan struct{} events chan honeybee.OutboundPoolEvent inbox chan honeybee.InboxMessage sent chan []byte } -func newManagerHarness(t *testing.T) *managerHarness { +func newMockPool(t *testing.T) *mockPool { t.Helper() ctx := component.MustNew(context.Background(), "prism", "test") url := "wss://test" - events := make(chan honeybee.OutboundPoolEvent, 4) + added := make(chan struct{}) + removed := make(chan struct{}) + events := make(chan honeybee.OutboundPoolEvent, 16) inbox := make(chan honeybee.InboxMessage, 16) sent := make(chan []byte, 16) - pool := EmbassyPlugin{ - Connect: func(string) error { return nil }, - Remove: func(string) error { return nil }, + plugin := EmbassyPlugin{ + Connect: func(url string) error { close(added); return nil }, + Remove: func(url string) error { close(removed); return nil }, Send: func(_ string, data []byte) error { sent <- data; return nil }, Events: events, Inbox: inbox, } - embassy := NewEmbassy(ctx, pool, nil) - embassy.Dispatch(url) - envoy := embassy.Call(url) - - manager := NewRequestManager(envoy) - - return &managerHarness{ - envoy: envoy, - manager: manager, + return &mockPool{ + plugin: plugin, + ctx: ctx, + url: url, + added: added, + removed: removed, events: events, inbox: inbox, sent: sent, } } -// connect simulates the envoy becoming connected. -func (h *managerHarness) connect() { - h.events <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventConnected, - At: time.Now(), - } +func (p *mockPool) connect() { + p.events <- honeybee.OutboundPoolEvent{ + ID: p.url, Kind: honeybee.OutboundEventConnected, At: time.Now()} } -// disconnect simulates the envoy disconnecting. -func (h *managerHarness) disconnect() { - h.events <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventDisconnected, - At: time.Now(), - } +func (p *mockPool) disconnect() { + p.events <- honeybee.OutboundPoolEvent{ + ID: p.url, Kind: honeybee.OutboundEventDisconnected, At: time.Now()} } -// sendEvent delivers an EVENT envelope for the given subID to the inbox. -func (h *managerHarness) sendEvent(subID string, eventData []byte) { - h.inbox <- honeybee.InboxMessage{ - ID: "wss://test", - Data: envelope.EncloseSubscriptionEvent(subID, eventData), - ReceivedAt: time.Now(), - } -} - -// sendEOSE delivers an EOSE envelope for the given subID to the inbox. -func (h *managerHarness) sendEOSE(subID string) { - h.inbox <- honeybee.InboxMessage{ - ID: "wss://test", - Data: envelope.EncloseEOSE(subID), - ReceivedAt: time.Now(), - } -} - -// sendClosed delivers a CLOSED envelope for the given subID to the inbox. -func (h *managerHarness) sendClosed(subID, message string) { - h.inbox <- honeybee.InboxMessage{ - ID: "wss://test", - Data: envelope.EncloseClosed(subID, message), +func (p *mockPool) receive(data []byte) { + p.inbox <- honeybee.InboxMessage{ + ID: p.url, + Data: data, ReceivedAt: time.Now(), } } diff --git a/request.go b/request.go index 9784001..6a12bb2 100644 --- a/request.go +++ b/request.go @@ -13,60 +13,7 @@ import ( ) // ---------------------------------------------------------------------------- -// Parsed inbox message types -// ---------------------------------------------------------------------------- - -type inboxEvent struct { - subID string - data []byte - receivedAt time.Time -} - -type inboxEOSE struct { - subID string - receivedAt time.Time -} - -type inboxClosed struct { - subID string - message string - receivedAt time.Time -} - -// ---------------------------------------------------------------------------- -// Session inbox (per-session typed channels) -// ---------------------------------------------------------------------------- - -type sessionInbox struct { - events chan inboxEvent - eose chan inboxEOSE - closed chan inboxClosed -} - -const sessionInboxBuffer = 64 - -func newSessionInbox() *sessionInbox { - return &sessionInbox{ - events: make(chan inboxEvent, sessionInboxBuffer), - eose: make(chan inboxEOSE, 1), - closed: make(chan inboxClosed, 1), - } -} - -// ---------------------------------------------------------------------------- -// Registration (durable subscription identity) -// ---------------------------------------------------------------------------- - -type registration struct { - filters [][]byte - eventsIn chan ReqEvent - eventsOut <-chan ReqEvent - closed chan ReqClosed - deregister sync.Once -} - -// ---------------------------------------------------------------------------- -// Output types +// Types // ---------------------------------------------------------------------------- type ReqEvent struct { @@ -81,234 +28,12 @@ type ReqClosed struct { Data string } -// ---------------------------------------------------------------------------- -// Session options -// ---------------------------------------------------------------------------- - -type sessionOptions struct { - eoseClose bool - deregister func() - inbox *sessionInbox - forwardEvents chan<- ReqEvent - forwardClosed chan<- ReqClosed -} - -type SessionOption func(*sessionOptions) - -func withEOSEClose() SessionOption { - return func(o *sessionOptions) { - o.eoseClose = true - } -} - -func withDeregister(fn func()) SessionOption { - return func(o *sessionOptions) { - o.deregister = fn - } -} - -func withSessionInbox(si *sessionInbox) SessionOption { - return func(o *sessionOptions) { - o.inbox = si - } -} - -func withForwardEvents(ch chan<- ReqEvent) SessionOption { - return func(o *sessionOptions) { - o.forwardEvents = ch - } -} - -func withForwardClosed(ch chan<- ReqClosed) SessionOption { - return func(o *sessionOptions) { - o.forwardClosed = ch - } -} - -// ---------------------------------------------------------------------------- -// Session -// ---------------------------------------------------------------------------- - -type session struct { - id string - req []byte - send func([]byte) error - done <-chan struct{} - terminate func() - deregister func() - eoseClose bool - inbox *sessionInbox - forwardEvents chan<- ReqEvent - forwardClosed chan<- ReqClosed - - ctx context.Context - cancel context.CancelFunc - once sync.Once -} - -func newSession( - id string, - req []byte, - send func([]byte) error, - done <-chan struct{}, - terminate func(), - opts ...SessionOption, -) *session { - o := &sessionOptions{ - deregister: func() {}, - } - for _, opt := range opts { - opt(o) - } - - ctx, cancel := context.WithCancel(context.Background()) - - return &session{ - id: id, - req: req, - send: send, - done: done, - terminate: terminate, - deregister: o.deregister, - eoseClose: o.eoseClose, - inbox: o.inbox, - forwardEvents: o.forwardEvents, - forwardClosed: o.forwardClosed, - ctx: ctx, - cancel: cancel, - } -} - -func (s *session) run() { - defer s.exit() - - // Send step: launch send in goroutine, wait for result or done. - sent := make(chan error, 1) - go func() { sent <- s.send(s.req) }() - - select { - case <-s.done: - return - case <-s.ctx.Done(): - return - case err := <-sent: - if err != nil { - return - } - } - - if s.inbox == nil { - return - } - - // Message loop. - for { - select { - case <-s.done: - return - case <-s.ctx.Done(): - s.send(envelope.EncloseClose(s.id)) //nolint:errcheck - return - case ev, ok := <-s.inbox.events: - if !ok { - return - } - if s.forwardEvents != nil { - select { - case <-s.done: - return - case <-s.ctx.Done(): - return - case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}: - } - } - case _, ok := <-s.inbox.eose: - if !ok { - return - } - if s.eoseClose { - // Drain buffered events before closing. - for { - select { - case ev, ok := <-s.inbox.events: - if !ok { - s.send(envelope.EncloseClose(s.id)) //nolint:errcheck - return - } - if s.forwardEvents != nil { - select { - case <-s.done: - return - case <-s.ctx.Done(): - return - case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}: - } - } - default: - s.send(envelope.EncloseClose(s.id)) //nolint:errcheck - return - } - } - } - case cl, ok := <-s.inbox.closed: - if !ok { - return - } - if s.forwardClosed != nil { - select { - case <-s.done: - case <-s.ctx.Done(): - case s.forwardClosed <- ReqClosed{ReceivedAt: cl.receivedAt, Data: cl.message}: - } - } - s.doDeregister() - return - } - } -} - -func (s *session) exit() { - s.once.Do(func() { - s.terminate() - }) -} - -func (s *session) doDeregister() { - s.once.Do(func() { - s.terminate() - }) - s.deregister() -} - -func (s *session) Close() { - s.cancel() -} - -// ---------------------------------------------------------------------------- -// Helpers -// ---------------------------------------------------------------------------- - -var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) - -func generateID() string { - b := make([]byte, 5) - _, err := rand.Read(b) - if err != nil { - panic(fmt.Sprintf("generateID: %v", err)) - } - return encoder.EncodeToString(b) -} - -// ---------------------------------------------------------------------------- -// Request Manager -// ---------------------------------------------------------------------------- - type RequestManager struct { - regs map[string]*registration + reqs map[string]*request sessions map[string]*session - inboxSubs map[string]*sessionInbox + inboxSubs map[string]*sessionSub done chan struct{} - reqWg sync.WaitGroup + sessionWg sync.WaitGroup envoy *Envoy events <-chan OutboundPoolEvent @@ -322,142 +47,114 @@ type RequestManager struct { logger *slog.Logger } -func NewRequestManager(envoy *Envoy) *RequestManager { - ctx, cancel := context.WithCancel( - component.MustExtend(envoy.Context(), "request_manager")) +type request struct { + id string + filters [][]byte + buffer chan ReqEvent + events chan ReqEvent + closed chan ReqClosed + once sync.Once +} - m := &RequestManager{ - regs: make(map[string]*registration), - sessions: make(map[string]*session), - inboxSubs: make(map[string]*sessionInbox), - envoy: envoy, - events: envoy.SubscribeEvents(), - inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), - ctx: ctx, - cancel: cancel, +type session struct { + id string + req []byte + + eose <-chan struct{} + closed <-chan struct{} + + done chan struct{} + send func([]byte) error + terminate func(terminateReason) + closeOnEOSE bool + + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger +} + +type sessionSub struct { + eose chan<- struct{} + closed chan<- struct{} +} + +type terminateReason int + +const ( + termSendFailed terminateReason = iota + termCloseSent + termReceivedClosed + termExternal +) + +// ---------------------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------------------- + +var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) + +func generateID() string { + b := make([]byte, 5) + if _, err := rand.Read(b); err != nil { + panic(fmt.Sprintf("generateID: %v", err)) } - if h := envoy.Handler(); h != nil { + return encoder.EncodeToString(b) +} + +// ---------------------------------------------------------------------------- +// Request Manager +// ---------------------------------------------------------------------------- + +func NewRequestManager(e *Envoy) *RequestManager { + ctx, cancel := context.WithCancel( + component.MustExtend(e.Context(), "request_manager")) + + m := &RequestManager{ + reqs: make(map[string]*request), + sessions: make(map[string]*session), + inboxSubs: make(map[string]*sessionSub), + + envoy: e, + events: e.SubscribeEvents(), + inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), + + ctx: ctx, + cancel: cancel, + } + + if h := e.Handler(); h != nil { comp := component.FromContext(ctx) m.handler = h m.logger = slog.New(h).With(slog.Any("component", comp)) } - m.wg.Add(2) - go m.handleEvents() - go m.routeInbox() + // start event handler + // start inbox router return m } -func (m *RequestManager) Stream(filters [][]byte) (string, <-chan ReqEvent, <-chan ReqClosed) { - id := generateID() - - evIn := make(chan ReqEvent) - evOut := make(chan ReqEvent) - cl := make(chan ReqClosed, 1) - - reg := ®istration{ - filters: filters, - eventsIn: evIn, - closed: cl, - } - - go bufferedPipe(evIn, evOut) - reg.eventsOut = evOut - - m.mu.Lock() - m.regs[id] = reg - if m.envoy.IsConnected() { - m.spawnSessionLock(id, reg) - } - m.mu.Unlock() - - return id, reg.eventsOut, reg.closed +func (m *RequestManager) Stream( + filters [][]byte, +) (string, <-chan ReqEvent, <-chan ReqClosed) { + // generate id + // create channels + // register request + // spawn session if connected + return "", nil, nil } -func (m *RequestManager) Query(filters [][]byte, timeout time.Duration) ([]ReqEvent, *ReqClosed) { - if !m.envoy.IsConnected() { - return nil, nil - } - - ctx, cancel := context.WithTimeout(m.ctx, timeout) - defer cancel() - - id := generateID() - si := newSessionInbox() - - // Buffered collection channels so the session can forward without blocking. - evCh := make(chan ReqEvent, sessionInboxBuffer) - clCh := make(chan ReqClosed, 1) - sessionDone := make(chan struct{}) - - m.mu.Lock() - m.inboxSubs[id] = si - m.mu.Unlock() - - terminate := func() { - m.mu.Lock() - delete(m.inboxSubs, id) - m.mu.Unlock() - m.reqWg.Done() - close(sessionDone) - } - - m.reqWg.Add(1) - s := newSession( - id, - envelope.EncloseReq(id, filters), - m.envoy.Send, - m.done, - terminate, - withEOSEClose(), - withSessionInbox(si), - withForwardEvents(evCh), - withForwardClosed(clCh), - ) - go s.run() - - var events []ReqEvent - var closed *ReqClosed - - // Wait for the session to finish, or timeout. - select { - case <-ctx.Done(): - s.Close() - <-sessionDone - case <-sessionDone: - } - - // Drain whatever the session forwarded. - for { - select { - case ev := <-evCh: - events = append(events, ev) - default: - goto drained - } - } -drained: - select { - case cl := <-clCh: - closed = &cl - default: - } - - return events, closed -} - -func (m *RequestManager) Cancel(id string) error { - m.mu.Lock() - defer m.mu.Unlock() - - s, ok := m.sessions[id] - if !ok { - return fmt.Errorf("session not found: %s", id) - } - s.Close() - return nil +func (m *RequestManager) Query( + filters [][]byte, + timeout time.Duration, +) (events []ReqEvent, closed *ReqClosed) { + // return if disconnected + // generate id + // create channels + // spawn session + // collect events + return } func (m *RequestManager) Close() { @@ -465,163 +162,74 @@ func (m *RequestManager) Close() { m.wg.Wait() } -func (m *RequestManager) spawnSessionLock(id string, reg *registration) { - si := newSessionInbox() - m.inboxSubs[id] = si - - terminate := func() { - m.mu.Lock() - defer m.mu.Unlock() - delete(m.inboxSubs, id) - delete(m.sessions, id) - m.reqWg.Done() - } - - deregister := func() { - m.mu.Lock() - defer m.mu.Unlock() - reg.deregister.Do(func() { - delete(m.regs, id) - close(reg.eventsIn) - reg.closed <- ReqClosed{} - close(reg.closed) - }) - } - - m.reqWg.Add(1) - s := newSession( - id, - envelope.EncloseReq(id, reg.filters), - m.envoy.Send, - m.done, - terminate, - withDeregister(deregister), - withSessionInbox(si), - withForwardEvents(reg.eventsIn), - withForwardClosed(reg.closed), - ) - m.sessions[id] = s - go s.run() -} - func (m *RequestManager) start() { - m.mu.Lock() - defer m.mu.Unlock() - - m.done = make(chan struct{}) - for id, reg := range m.regs { - if _, active := m.sessions[id]; !active { - m.spawnSessionLock(id, reg) - } - } + // start all request sessions } func (m *RequestManager) stop() { - m.mu.Lock() - done := m.done - m.mu.Unlock() - - if done != nil { - close(done) - } - m.reqWg.Wait() - - m.mu.Lock() - m.sessions = make(map[string]*session) - m.inboxSubs = make(map[string]*sessionInbox) - m.mu.Unlock() + // stop all running sessions } func (m *RequestManager) handleEvents() { defer m.wg.Done() - for { - select { - case <-m.ctx.Done(): - return - case ev, ok := <-m.events: - if !ok { - return - } - switch ev.Kind { - case EventConnected: - m.start() - case EventDisconnected: - m.stop() - } - } - } + // start/stop sessions on connect/disconnect } func (m *RequestManager) routeInbox() { defer m.wg.Done() - for { - select { - case <-m.ctx.Done(): - return - case msg, ok := <-m.inbox: - if !ok { - return - } - - label, err := envelope.GetLabel(msg.Data) - if err != nil { - continue - } - - switch string(label) { - case "EVENT": - subID, data, err := envelope.FindSubscriptionEvent(msg.Data) - if err != nil { - continue - } - m.mu.RLock() - si, ok := m.inboxSubs[subID] - m.mu.RUnlock() - if !ok { - continue - } - select { - case <-m.ctx.Done(): - return - case si.events <- inboxEvent{subID: subID, data: data, receivedAt: msg.ReceivedAt}: - } - - case "EOSE": - subID, err := envelope.FindEOSE(msg.Data) - if err != nil { - continue - } - m.mu.RLock() - si, ok := m.inboxSubs[subID] - m.mu.RUnlock() - if !ok { - continue - } - select { - case <-m.ctx.Done(): - return - case si.eose <- inboxEOSE{subID: subID, receivedAt: msg.ReceivedAt}: - } - - case "CLOSED": - subID, message, err := envelope.FindClosed(msg.Data) - if err != nil { - continue - } - m.mu.RLock() - si, ok := m.inboxSubs[subID] - m.mu.RUnlock() - if !ok { - continue - } - select { - case <-m.ctx.Done(): - return - case si.closed <- inboxClosed{subID: subID, message: message, receivedAt: msg.ReceivedAt}: - } - } - } - } + // unpack/route inbox message + // events forward directly to request event buffer + // eose goes to session + // closed goes both to session and request + // uses read lock for map lookups +} + +// ---------------------------------------------------------------------------- +// Session +// ---------------------------------------------------------------------------- + +func newSession( + ctx context.Context, + id string, + req []byte, + eose <-chan struct{}, + closed <-chan struct{}, + done chan struct{}, + send func(data []byte) error, + terminate func(terminateReason), + isQuery bool, + handler slog.Handler, +) *session { + ctx, cancel := context.WithCancel(component.MustExtend(ctx, "session")) + s := &session{ + id: id, + req: req, + eose: eose, + closed: closed, + done: done, + send: send, + terminate: terminate, + closeOnEOSE: isQuery, + ctx: ctx, + cancel: cancel, + } + // create logger if handler is supplied + // run main loop + return s +} + +func (s *session) run() { + var tr terminateReason + defer s.terminate(tr) + + // send inital req + + // run main loop + // switch on done, context, eose, and closed -- terminal paths +} + +func (s *session) Close() { + s.cancel() } diff --git a/request_test.go b/request_test.go index 6816f10..e833191 100644 --- a/request_test.go +++ b/request_test.go @@ -1,854 +1,229 @@ package prism import ( - "fmt" - "sync" "testing" - "time" - - "git.wisehodl.dev/jay/go-honeybee" - "git.wisehodl.dev/jay/go-roots-ws" - "github.com/stretchr/testify/assert" ) -// ---------------------------------------------------------------------------- -// Session -// ---------------------------------------------------------------------------- +// Session tests exercise the session struct in isolation. +// The session is constructed directly with mock channels and callbacks. +// These tests do not go through RequestManager. +func TestRequestManager_Session(t *testing.T) { + t.Run("sends req on start", func(t *testing.T) { + // construct a session with a mock send func + // run the session + // assert the mock send was called with the exact req bytes + }) -func TestSession_SendFailure_Terminates(t *testing.T) { - sendFn := func(data []byte) error { - return fmt.Errorf("write error") - } + t.Run("terminates on failed req send", func(t *testing.T) { + // construct a session with a send func that returns an error + // run the session + // assert terminate was called with termSendFailed + }) - terminateCalled := make(chan struct{}) - terminate := func() { close(terminateCalled) } + t.Run("ignores eose if stream", func(t *testing.T) { + // construct a session with closeOnEOSE = false + // send a value into the eose channel + // assert terminate is never called + // assert a subsequent event still causes the session to run normally + }) - done := make(chan struct{}) - s := newSession("TESTID01", []byte(`["REQ"]`), sendFn, done, terminate) - go s.run() + t.Run("sends close on eose if query", func(t *testing.T) { + // construct a session with closeOnEOSE = true + // send a value into the eose channel + // assert the mock send was called with a CLOSE envelope for the session id + // assert terminate was called with termCloseSent + }) - Eventually(t, func() bool { - select { - case <-terminateCalled: - return true - default: - return false - } - }, "terminate should be called on send failure") + t.Run("terminates on done close", func(t *testing.T) { + // construct a session with a done channel + // close the done channel + // assert terminate was called with termExternal + }) + + t.Run("terminates on context cancel", func(t *testing.T) { + // construct a session, hold its cancel func + // call cancel + // assert terminate was called with termExternal + // this covers the path that Cancel() exercises on the session + }) + + t.Run("terminates on closed signal", func(t *testing.T) { + // construct a session with a closed signal channel + // send a value into the closed channel + // assert terminate was called with termReceivedClosed + // the session does not forward the message; routing is the manager's job + }) } -// ---------------------------------------------------------------------------- -// Query -// ---------------------------------------------------------------------------- +func TestRequestManager_Stream(t *testing.T) { + t.Run("spawns session and sends req when connected", func(t *testing.T) { + // connect the envoy before calling Stream + // call Stream with filters + // assert the mock send was called with a REQ envelope + // assert the generated id appears in the REQ envelope + }) -func TestQuery_ReturnsEventsBeforeEOSE(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") + t.Run("registers but does not spawn session when disconnected", func(t *testing.T) { + // do not connect the envoy + // call Stream + // assert mock send is never called + // assert the returned channels are non-nil and open + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} + t.Run("forwards events to caller", func(t *testing.T) { + // connect, call Stream, get events channel + // inject two EVENT envelopes for the correct sub id into mock inbox + // inject one EVENT envelope for an unrelated sub id + // assert exactly two events appear on the caller's events channel + }) - var events []ReqEvent - var queryDone = make(chan struct{}) - go func() { - events, _ = h.manager.Query(filters, TestTimeout) - close(queryDone) - }() + t.Run("ignores eose", func(t *testing.T) { + // connect, call Stream + // inject an EOSE envelope for the sub id + // assert the events channel does not close + // assert the closed channel receives nothing + // assert a subsequent EVENT is still forwarded + }) - // Intercept the outbound REQ to learn the sub-ID. - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendEvent(subID, []byte(`{"id":"aaa"}`)) - h.sendEvent(subID, []byte(`{"id":"bbb"}`)) - h.sendEOSE(subID) - - Eventually(t, func() bool { - select { - case <-queryDone: - return true - default: - return false - } - }, "query should return after EOSE") - - assert.Len(t, events, 2) + t.Run("closed deregisters and signals caller", func(t *testing.T) { + // connect, call Stream + // inject a CLOSED envelope with a reason string + // assert the closed channel yields a ReqClosed with the correct message + // assert the events channel eventually closes (buffer drained and deregistered) + // assert the registration is removed from reqs + }) } -func TestQuery_EOSE_SendsCLOSEAndReturns(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") +func TestRequestManager_Cancel(t *testing.T) { + t.Run("sends close, terminates session, deregisters", func(t *testing.T) { + // connect, call Stream, hold the id + // call Cancel(id) + // assert mock send was called with a CLOSE envelope for the id + // assert the session is removed from sessions + // assert the registration is removed from reqs + // assert the caller's events channel eventually closes + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - - var events []ReqEvent - queryDone := make(chan struct{}) - go func() { - events, _ = h.manager.Query(filters, TestTimeout) - close(queryDone) - }() - - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendEOSE(subID) - - Eventually(t, func() bool { - select { - case <-queryDone: - return true - default: - return false - } - }, "query should return after EOSE") - - assert.Empty(t, events) - - // Verify a CLOSE was sent after EOSE. - Eventually(t, func() bool { - select { - case msg := <-h.sent: - closedID, err := envelope.FindClose(msg) - return err == nil && closedID == subID - default: - return false - } - }, "manager should send CLOSE after EOSE") + t.Run("returns error for unknown id", func(t *testing.T) { + // call Cancel with an id that was never registered + // assert an error is returned + }) } -func TestQuery_CLOSED_ReturnsMessageAndEmptyEvents(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") +func TestRequestManager_Query(t *testing.T) { + t.Run("returns events and nil closed on eose", func(t *testing.T) { + // connect the envoy + // in a goroutine: inject three EVENT envelopes then EOSE for the query sub id + // call Query (blocks until return) + // assert the returned slice contains exactly three events + // assert closed is nil + // assert mock send was called with a CLOSE envelope (closeOnEOSE behavior) + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} + t.Run("returns empty events and closed on relay closed", func(t *testing.T) { + // connect the envoy + // in a goroutine: inject a CLOSED envelope before any EVENT + // call Query + // assert the returned slice is empty + // assert closed is non-nil and contains the relay's reason string + }) - var retEvents []ReqEvent - var retClosed *ReqClosed - queryDone := make(chan struct{}) - go func() { - retEvents, retClosed = h.manager.Query(filters, TestTimeout) - close(queryDone) - }() + t.Run("returns partial events on timeout", func(t *testing.T) { + // connect the envoy + // in a goroutine: inject two EVENTs then block (no EOSE, no CLOSED) + // call Query with a short timeout + // assert Query returns after the timeout + // assert the returned slice contains exactly two events + // assert closed is nil + }) - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendClosed(subID, "rate limited: too many subscriptions") - - Eventually(t, func() bool { - select { - case <-queryDone: - return true - default: - return false - } - }, "query should return after CLOSED") - - assert.Empty(t, retEvents) - assert.NotNil(t, retClosed) - assert.Equal(t, "rate limited: too many subscriptions", retClosed.Data) + t.Run("returns nil nil when disconnected", func(t *testing.T) { + // do not connect the envoy + // call Query + // assert it returns immediately with nil events and nil closed + }) } -func TestQuery_Timeout_ReturnsPartialEvents(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") +func TestRequestManager_Reconnect(t *testing.T) { + t.Run("sessions terminate on disconnect", func(t *testing.T) { + // connect, open two streams + // send a disconnect event into the mock events channel + // assert both sessions are removed from sessions map + // assert sessionWg reaches zero + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} + t.Run("registrations survive disconnect", func(t *testing.T) { + // connect, open two streams, hold both events and closed channels + // send a disconnect event + // after sessions terminate, assert both registrations remain in reqs + // assert both events channels are still open + // assert both closed channels are still open + }) - var retEvents []ReqEvent - queryDone := make(chan struct{}) - go func() { - retEvents, _ = h.manager.Query(filters, 200*time.Millisecond) - close(queryDone) - }() + t.Run("sessions respawn and resend req on reconnect", func(t *testing.T) { + // connect, open two streams + // disconnect, wait for sessions to terminate + // reconnect (send connect event) + // assert mock send is called again for each sub id (two new REQ envelopes) + }) - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendEvent(subID, []byte(`{"id":"aaa"}`)) - // No EOSE — let the timeout fire. - - Eventually(t, func() bool { - select { - case <-queryDone: - return true - default: - return false - } - }, "query should return after timeout") - - assert.Len(t, retEvents, 1) + t.Run("events resume on same channel after reconnect", func(t *testing.T) { + // connect, open a stream, hold the events channel + // disconnect, reconnect + // inject an EVENT for the sub id + // assert the event appears on the original events channel + // the caller's reference to the channel is unaffected by the reconnect cycle + }) } -// ---------------------------------------------------------------------------- -// Disconnect / Reconnect -// ---------------------------------------------------------------------------- +func TestRequestManager_InboxRouting(t *testing.T) { + t.Run("routes event to correct request buffer", func(t *testing.T) { + // connect, open two streams (sub ids A and B) + // inject an EVENT addressed to sub id A + // assert A's events channel receives the message + // assert B's events channel receives nothing + }) -func TestStream_Disconnect_TerminatesSession(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") + t.Run("drops event for unknown sub id", func(t *testing.T) { + // connect, open a stream + // inject an EVENT with a sub id that has no registration + // assert no panic, no deadlock, test completes cleanly + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - _, _, _ = h.manager.Stream(filters) + t.Run("drops unparseable envelope", func(t *testing.T) { + // connect, open a stream + // inject raw bytes that are not a valid envelope + // assert no panic, no deadlock, test completes cleanly + }) - // Wait for the REQ to be sent so we know the session is running. - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") + t.Run("routes eose to correct session", func(t *testing.T) { + // connect, open two streams (sub ids A and B), both with closeOnEOSE = false + // inject EOSE for sub id A + // assert A's session receives the signal (verify via a side effect, e.g. a counter) + // assert B's session does not receive the signal + }) - h.disconnect() - - // After disconnect, reqWg should reach zero, meaning the session exited. - wgDone := make(chan struct{}) - go func() { - h.manager.reqWg.Wait() - close(wgDone) - }() - - Eventually(t, func() bool { - select { - case <-wgDone: - return true - default: - return false - } - }, "reqWg should reach zero after disconnect") + t.Run("routes closed to session and request", func(t *testing.T) { + // connect, open a stream + // inject a CLOSED envelope with a reason string + // assert the session receives the closed signal and terminates + // assert request.closed yields a ReqClosed with the correct message + // both must receive the message: the session reacts, the caller is informed + }) } -func TestStream_Disconnect_PreservesRegistration(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") +func TestRequestManager_Close(t *testing.T) { + t.Run("terminates all sessions without deadlock", func(t *testing.T) { + // connect, open three streams + // call manager.Close() + // assert Close returns (does not deadlock) + // assert all sessions are terminated (sessions map empty) + }) - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - id, _, _ := h.manager.Stream(filters) - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - h.disconnect() - - wgDone := make(chan struct{}) - go func() { - h.manager.reqWg.Wait() - close(wgDone) - }() - Eventually(t, func() bool { - select { - case <-wgDone: - return true - default: - return false - } - }, "reqWg should clear") - - h.manager.mu.RLock() - _, regExists := h.manager.regs[id] - h.manager.mu.RUnlock() - - assert.True(t, regExists, "registration should survive disconnect") -} - -func TestStream_Reconnect_ResendsREQ(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - _, _, _ = h.manager.Stream(filters) - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "first REQ should be sent") - - h.disconnect() - wgDone := make(chan struct{}) - go func() { h.manager.reqWg.Wait(); close(wgDone) }() - Eventually(t, func() bool { - select { - case <-wgDone: - return true - default: - return false - } - }, "reqWg should clear after disconnect") - - h.connect() - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "REQ should be resent after reconnect") -} - -func TestStream_Reconnect_ResumesForwardingEvents(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - _, eventsCh, _ := h.manager.Stream(filters) - - var firstREQ []byte - Eventually(t, func() bool { - select { - case firstREQ = <-h.sent: - return true - default: - return false - } - }, "first REQ should be sent") - - firstSubID, _, err := envelope.FindReq(firstREQ) - assert.NoError(t, err) - - h.sendEvent(firstSubID, []byte(`{"id":"before"}`)) - Eventually(t, func() bool { - select { - case <-eventsCh: - return true - default: - return false - } - }, "event before disconnect should arrive") - - h.disconnect() - wgDone := make(chan struct{}) - go func() { h.manager.reqWg.Wait(); close(wgDone) }() - Eventually(t, func() bool { - select { - case <-wgDone: - return true - default: - return false - } - }, "reqWg should clear after disconnect") - - h.connect() - - var secondREQ []byte - Eventually(t, func() bool { - select { - case secondREQ = <-h.sent: - return true - default: - return false - } - }, "second REQ should be sent after reconnect") - - secondSubID, _, err := envelope.FindReq(secondREQ) - assert.NoError(t, err) - - h.sendEvent(secondSubID, []byte(`{"id":"after"}`)) - var gotAfter ReqEvent - Eventually(t, func() bool { - select { - case gotAfter = <-eventsCh: - return true - default: - return false - } - }, "event after reconnect should arrive on same channel") - assert.Equal(t, []byte(`{"id":"after"}`), gotAfter.Data) -} - -func TestQuery_Disconnect_ReturnsEmpty(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - - var retEvents []ReqEvent - queryDone := make(chan struct{}) - go func() { - retEvents, _ = h.manager.Query(filters, TestTimeout) - close(queryDone) - }() - - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "manager should send REQ") - - h.disconnect() - - Eventually(t, func() bool { - select { - case <-queryDone: - return true - default: - return false - } - }, "query should return after disconnect") - - assert.Empty(t, retEvents) -} - -// ---------------------------------------------------------------------------- -// Inbox routing -// ---------------------------------------------------------------------------- - -func TestRouteInbox_RoutesEventToCorrectSession(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - idA, evA, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) - idB, evB, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[2]}`)}) - - // Collect both outbound REQs (order is non-deterministic). - reqs := make(map[string]bool) - Eventually(t, func() bool { - for { - select { - case msg := <-h.sent: - subID, _, err := envelope.FindReq(msg) - if err == nil { - reqs[subID] = true - } - default: - return len(reqs) >= 2 - } - } - }, "both REQs should be sent") - - assert.Contains(t, reqs, idA) - assert.Contains(t, reqs, idB) - - // Send an event targeted at idA's subscription. - h.sendEvent(idA, []byte(`{"id":"for-a"}`)) - - var gotA ReqEvent - Eventually(t, func() bool { - select { - case gotA = <-evA: - return true - default: - return false - } - }, "event for idA should arrive on evA") - assert.Equal(t, []byte(`{"id":"for-a"}`), gotA.Data) - - Never(t, func() bool { - select { - case <-evB: - return true - default: - return false - } - }, "event for idA should not appear on evB") -} - -func TestRouteInbox_DropsUnknownSubID(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - // Send an EVENT for an ID no session knows about. - h.sendEvent("UNKNOWN1", []byte(`{"id":"ghost"}`)) - - // No panic, no block, manager continues operating. - _, evCh, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) - - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "REQ should be sent") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendEvent(subID, []byte(`{"id":"real"}`)) - Eventually(t, func() bool { - select { - case <-evCh: - return true - default: - return false - } - }, "real event should still arrive after unknown drop") -} - -func TestRouteInbox_DropsUnparseable(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - // Inject a malformed message directly into the inbox. - h.inbox <- honeybee.InboxMessage{ - ID: "wss://test", - Data: []byte(`not valid json at all`), - ReceivedAt: time.Now(), - } - - // Manager should still be alive and routing correctly. - _, evCh, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) - - var req []byte - Eventually(t, func() bool { - select { - case req = <-h.sent: - return true - default: - return false - } - }, "REQ should be sent after malformed message") - - subID, _, err := envelope.FindReq(req) - assert.NoError(t, err) - - h.sendEvent(subID, []byte(`{"id":"ok"}`)) - Eventually(t, func() bool { - select { - case <-evCh: - return true - default: - return false - } - }, "event should arrive after malformed message was dropped") -} - -// ---------------------------------------------------------------------------- -// Manager lifecycle -// ---------------------------------------------------------------------------- - -func TestManager_StreamWhileDisconnected_SessionSpawnedOnConnect(t *testing.T) { - h := newManagerHarness(t) - // Do NOT connect yet. - - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - _, _, _ = h.manager.Stream(filters) - - // No REQ should be sent while disconnected. - Never(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "no REQ should be sent before connect") - - // Now connect — session should spawn and REQ should be sent. - h.connect() - Eventually(t, func() bool { - select { - case <-h.sent: - return true - default: - return false - } - }, "REQ should be sent after connect") -} - -func TestManager_QueryWhileDisconnected_ReturnsEmpty(t *testing.T) { - h := newManagerHarness(t) - // Do NOT connect. - - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - events, closed := h.manager.Query(filters, TestTimeout) - - assert.Empty(t, events) - assert.Nil(t, closed) -} - -func TestManager_Close_TerminatesAllSessions(t *testing.T) { - h := newManagerHarness(t) - h.connect() - Eventually(t, h.envoy.IsConnected, "envoy should be connected") - - h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) - h.manager.Stream([][]byte{[]byte(`{"kinds":[2]}`)}) - - // Wait for both REQs. - Eventually(t, func() bool { - for { - select { - case <-h.sent: - default: - return len(h.manager.sessions) >= 2 - } - } - }, "both sessions should be active") - - closed := make(chan struct{}) - go func() { - h.manager.Close() - close(closed) - }() - - Eventually(t, func() bool { - select { - case <-closed: - return true - default: - return false - } - }, "manager.Close() should return") -} - -// ---------------------------------------------------------------------------- -// Stream (unit-level session tests) -// ---------------------------------------------------------------------------- - -func TestStream_ForwardsEvents(t *testing.T) { - si := newSessionInbox() - done := make(chan struct{}) - terminate := func() {} - - eventsCh := make(chan ReqEvent, 4) - - s := newSession( - "TESTID01", - []byte(`["REQ"]`), - func([]byte) error { return nil }, - done, - terminate, - withSessionInbox(si), - withForwardEvents(eventsCh), - ) - go s.run() - - eventData := []byte(`{"id":"abc"}`) - si.events <- inboxEvent{ - subID: "TESTID01", - data: eventData, - receivedAt: time.Now(), - } - - Eventually(t, func() bool { return len(eventsCh) > 0 }, "event should be forwarded") - - got := <-eventsCh - assert.Equal(t, eventData, got.Data) - - close(done) -} - -func TestStream_IgnoresEOSE(t *testing.T) { - si := newSessionInbox() - done := make(chan struct{}) - terminateCalled := make(chan struct{}) - terminate := func() { close(terminateCalled) } - - s := newSession( - "TESTID01", - []byte(`["REQ"]`), - func([]byte) error { return nil }, - done, - terminate, - withSessionInbox(si), - ) - go s.run() - - si.eose <- inboxEOSE{subID: "TESTID01", receivedAt: time.Now()} - - Never(t, func() bool { - select { - case <-terminateCalled: - return true - default: - return false - } - }, "stream session should not terminate on EOSE") - - close(done) -} - -func TestStream_CLOSED_ForwardsMessageAndTerminates(t *testing.T) { - si := newSessionInbox() - done := make(chan struct{}) - terminateCalled := make(chan struct{}) - deregisterCalled := make(chan struct{}) - terminate := func() { close(terminateCalled) } - deregister := func() { close(deregisterCalled) } - - forwardedClosed := make(chan ReqClosed, 1) - - s := newSession( - "TESTID01", - []byte(`["REQ"]`), - func([]byte) error { return nil }, - done, - terminate, - withSessionInbox(si), - withDeregister(deregister), - withForwardClosed(forwardedClosed), - ) - go s.run() - - si.closed <- inboxClosed{ - subID: "TESTID01", - message: "rate limited: too many subscriptions", - receivedAt: time.Now(), - } - - Eventually(t, func() bool { - select { - case <-terminateCalled: - return true - default: - return false - } - }, "terminate should be called on CLOSED") - - Eventually(t, func() bool { - select { - case <-deregisterCalled: - return true - default: - return false - } - }, "deregister should be called on CLOSED") - - assert.Len(t, forwardedClosed, 1) - got := <-forwardedClosed - assert.Equal(t, "rate limited: too many subscriptions", got.Data) -} - -func TestStream_Cancel_SendsCLOSEAndTerminates(t *testing.T) { - si := newSessionInbox() - done := make(chan struct{}) - terminateCalled := make(chan struct{}) - terminate := func() { close(terminateCalled) } - - var sentMessages [][]byte - var sentMu sync.Mutex - sendFn := func(data []byte) error { - sentMu.Lock() - sentMessages = append(sentMessages, data) - sentMu.Unlock() - return nil - } - - id := "TESTID01" - s := newSession( - id, - []byte(`["REQ"]`), - sendFn, - done, - terminate, - withSessionInbox(si), - ) - go s.run() - - // Wait for REQ to be sent before cancelling. - Eventually(t, func() bool { - sentMu.Lock() - defer sentMu.Unlock() - return len(sentMessages) > 0 - }, "REQ should be sent before cancel") - - s.Close() - - Eventually(t, func() bool { - select { - case <-terminateCalled: - return true - default: - return false - } - }, "terminate should be called on cancel") - - sentMu.Lock() - defer sentMu.Unlock() - expectedCLOSE := []byte(envelope.EncloseClose(id)) - assert.Contains(t, sentMessages, expectedCLOSE) -} - -func TestSession_SendsREQOnStart(t *testing.T) { - sent := make(chan []byte, 1) - sendFn := func(data []byte) error { - sent <- data - return nil - } - - id := "TESTID01" - filters := [][]byte{[]byte(`{"kinds":[1]}`)} - req := mustEncloseReq(id, filters) - done := make(chan struct{}) - terminate := func() {} - - s := newSession(id, req, sendFn, done, terminate) - go s.run() - - Eventually(t, func() bool { return len(sent) > 0 }, "session should send REQ") - - got := <-sent - assert.Equal(t, req, got) - - close(done) + t.Run("does not deregister requests on close", func(t *testing.T) { + // connect, open two streams + // call manager.Close() + // assert registrations remain in reqs + // termExternal does not deregister; that is the caller's domain via Cancel + }) }