diff --git a/helpers_test.go b/helpers_test.go index d820c12..ed3caf2 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -1,6 +1,10 @@ package prism import ( + "context" + "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" "testing" "time" @@ -21,3 +25,96 @@ func Never(t *testing.T, condition func() bool, msg string) { t.Helper() assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) } + +func mustEncloseReq(id string, filters [][]byte) []byte { + return []byte(envelope.EncloseReq(id, filters)) +} + +// managerHarness wires up a real Envoy and RequestManager backed by +// controllable channels. Callers drive the envoy state and inbox by writing +// to the exported channels. +type managerHarness struct { + envoy *Envoy + manager *RequestManager + events chan honeybee.OutboundPoolEvent + inbox chan honeybee.InboxMessage + sent chan []byte +} + +func newManagerHarness(t *testing.T) *managerHarness { + t.Helper() + + ctx := component.MustNew(context.Background(), "prism", "test") + url := "wss://test" + + events := make(chan honeybee.OutboundPoolEvent, 4) + inbox := make(chan honeybee.InboxMessage, 16) + sent := make(chan []byte, 16) + + pool := EmbassyPlugin{ + Connect: func(string) error { return nil }, + Remove: func(string) error { return nil }, + Send: func(_ string, data []byte) error { sent <- data; return nil }, + Events: events, + Inbox: inbox, + } + + embassy := NewEmbassy(ctx, pool, nil) + embassy.Dispatch(url) + envoy := embassy.Call(url) + + manager := NewRequestManager(envoy) + + return &managerHarness{ + envoy: envoy, + manager: manager, + events: events, + inbox: inbox, + sent: sent, + } +} + +// connect simulates the envoy becoming connected. +func (h *managerHarness) connect() { + h.events <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventConnected, + At: time.Now(), + } +} + +// disconnect simulates the envoy disconnecting. +func (h *managerHarness) disconnect() { + h.events <- honeybee.OutboundPoolEvent{ + ID: "wss://test", + Kind: honeybee.OutboundEventDisconnected, + At: time.Now(), + } +} + +// sendEvent delivers an EVENT envelope for the given subID to the inbox. +func (h *managerHarness) sendEvent(subID string, eventData []byte) { + h.inbox <- honeybee.InboxMessage{ + ID: "wss://test", + Data: envelope.EncloseSubscriptionEvent(subID, eventData), + ReceivedAt: time.Now(), + } +} + +// sendEOSE delivers an EOSE envelope for the given subID to the inbox. +func (h *managerHarness) sendEOSE(subID string) { + h.inbox <- honeybee.InboxMessage{ + ID: "wss://test", + Data: envelope.EncloseEOSE(subID), + ReceivedAt: time.Now(), + } +} + +// sendClosed delivers a CLOSED envelope for the given subID to the inbox. +func (h *managerHarness) sendClosed(subID, message string) { + h.inbox <- honeybee.InboxMessage{ + ID: "wss://test", + Data: envelope.EncloseClosed(subID, message), + ReceivedAt: time.Now(), + } +} diff --git a/request.go b/request.go index dedbe94..9784001 100644 --- a/request.go +++ b/request.go @@ -2,9 +2,9 @@ package prism import ( "context" + "crypto/rand" "encoding/base32" "fmt" - "git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-mana-component" "git.wisehodl.dev/jay/go-roots-ws" "log/slog" @@ -13,7 +13,60 @@ import ( ) // ---------------------------------------------------------------------------- -// Types +// Parsed inbox message types +// ---------------------------------------------------------------------------- + +type inboxEvent struct { + subID string + data []byte + receivedAt time.Time +} + +type inboxEOSE struct { + subID string + receivedAt time.Time +} + +type inboxClosed struct { + subID string + message string + receivedAt time.Time +} + +// ---------------------------------------------------------------------------- +// Session inbox (per-session typed channels) +// ---------------------------------------------------------------------------- + +type sessionInbox struct { + events chan inboxEvent + eose chan inboxEOSE + closed chan inboxClosed +} + +const sessionInboxBuffer = 64 + +func newSessionInbox() *sessionInbox { + return &sessionInbox{ + events: make(chan inboxEvent, sessionInboxBuffer), + eose: make(chan inboxEOSE, 1), + closed: make(chan inboxClosed, 1), + } +} + +// ---------------------------------------------------------------------------- +// Registration (durable subscription identity) +// ---------------------------------------------------------------------------- + +type registration struct { + filters [][]byte + eventsIn chan ReqEvent + eventsOut <-chan ReqEvent + closed chan ReqClosed + deregister sync.Once +} + +// ---------------------------------------------------------------------------- +// Output types // ---------------------------------------------------------------------------- type ReqEvent struct { @@ -28,9 +81,232 @@ type ReqClosed struct { Data string } +// ---------------------------------------------------------------------------- +// Session options +// ---------------------------------------------------------------------------- + +type sessionOptions struct { + eoseClose bool + deregister func() + inbox *sessionInbox + forwardEvents chan<- ReqEvent + forwardClosed chan<- ReqClosed +} + +type SessionOption func(*sessionOptions) + +func withEOSEClose() SessionOption { + return func(o *sessionOptions) { + o.eoseClose = true + } +} + +func withDeregister(fn func()) SessionOption { + return func(o *sessionOptions) { + o.deregister = fn + } +} + +func withSessionInbox(si *sessionInbox) SessionOption { + return func(o *sessionOptions) { + o.inbox = si + } +} + +func withForwardEvents(ch chan<- ReqEvent) SessionOption { + return func(o *sessionOptions) { + o.forwardEvents = ch + } +} + +func withForwardClosed(ch chan<- ReqClosed) SessionOption { + return func(o *sessionOptions) { + o.forwardClosed = ch + } +} + +// ---------------------------------------------------------------------------- +// Session +// ---------------------------------------------------------------------------- + +type session struct { + id string + req []byte + send func([]byte) error + done <-chan struct{} + terminate func() + deregister func() + eoseClose bool + inbox *sessionInbox + forwardEvents chan<- ReqEvent + forwardClosed chan<- ReqClosed + + ctx context.Context + cancel context.CancelFunc + once sync.Once +} + +func newSession( + id string, + req []byte, + send func([]byte) error, + done <-chan struct{}, + terminate func(), + opts ...SessionOption, +) *session { + o := &sessionOptions{ + deregister: func() {}, + } + for _, opt := range opts { + opt(o) + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &session{ + id: id, + req: req, + send: send, + done: done, + terminate: terminate, + deregister: o.deregister, + eoseClose: o.eoseClose, + inbox: o.inbox, + forwardEvents: o.forwardEvents, + forwardClosed: o.forwardClosed, + ctx: ctx, + cancel: cancel, + } +} + +func (s *session) run() { + defer s.exit() + + // Send step: launch send in goroutine, wait for result or done. + sent := make(chan error, 1) + go func() { sent <- s.send(s.req) }() + + select { + case <-s.done: + return + case <-s.ctx.Done(): + return + case err := <-sent: + if err != nil { + return + } + } + + if s.inbox == nil { + return + } + + // Message loop. + for { + select { + case <-s.done: + return + case <-s.ctx.Done(): + s.send(envelope.EncloseClose(s.id)) //nolint:errcheck + return + case ev, ok := <-s.inbox.events: + if !ok { + return + } + if s.forwardEvents != nil { + select { + case <-s.done: + return + case <-s.ctx.Done(): + return + case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}: + } + } + case _, ok := <-s.inbox.eose: + if !ok { + return + } + if s.eoseClose { + // Drain buffered events before closing. + for { + select { + case ev, ok := <-s.inbox.events: + if !ok { + s.send(envelope.EncloseClose(s.id)) //nolint:errcheck + return + } + if s.forwardEvents != nil { + select { + case <-s.done: + return + case <-s.ctx.Done(): + return + case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}: + } + } + default: + s.send(envelope.EncloseClose(s.id)) //nolint:errcheck + return + } + } + } + case cl, ok := <-s.inbox.closed: + if !ok { + return + } + if s.forwardClosed != nil { + select { + case <-s.done: + case <-s.ctx.Done(): + case s.forwardClosed <- ReqClosed{ReceivedAt: cl.receivedAt, Data: cl.message}: + } + } + s.doDeregister() + return + } + } +} + +func (s *session) exit() { + s.once.Do(func() { + s.terminate() + }) +} + +func (s *session) doDeregister() { + s.once.Do(func() { + s.terminate() + }) + s.deregister() +} + +func (s *session) Close() { + s.cancel() +} + +// ---------------------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------------------- + +var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) + +func generateID() string { + b := make([]byte, 5) + _, err := rand.Read(b) + if err != nil { + panic(fmt.Sprintf("generateID: %v", err)) + } + return encoder.EncodeToString(b) +} + +// ---------------------------------------------------------------------------- +// Request Manager +// ---------------------------------------------------------------------------- + type RequestManager struct { - reqs map[string][][]byte - inboxSubs map[string]chan<- InboxMessage + regs map[string]*registration + sessions map[string]*session + inboxSubs map[string]*sessionInbox done chan struct{} reqWg sync.WaitGroup @@ -46,49 +322,19 @@ type RequestManager struct { logger *slog.Logger } -type request struct { - id string - req []byte - query bool - - inbox <-chan InboxMessage - stop chan struct{} - terminate func() - - events chan ReqEvent - closed chan ReqClosed - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - logger *slog.Logger -} - -// ---------------------------------------------------------------------------- -// Helpers -// ---------------------------------------------------------------------------- - -var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) - -func generateID() string { - return "" -} - -// ---------------------------------------------------------------------------- -// Request Manager -// ---------------------------------------------------------------------------- - func NewRequestManager(envoy *Envoy) *RequestManager { ctx, cancel := context.WithCancel( component.MustExtend(envoy.Context(), "request_manager")) m := &RequestManager{ - reqs: make(map[string]*request), - envoy: envoy, - events: envoy.SubscribeEvents(), - inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), - ctx: ctx, - cancel: cancel, + regs: make(map[string]*registration), + sessions: make(map[string]*session), + inboxSubs: make(map[string]*sessionInbox), + envoy: envoy, + events: envoy.SubscribeEvents(), + inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), + ctx: ctx, + cancel: cancel, } if h := envoy.Handler(); h != nil { @@ -97,85 +343,120 @@ func NewRequestManager(envoy *Envoy) *RequestManager { m.logger = slog.New(h).With(slog.Any("component", comp)) } - m.wg.Add(1) + m.wg.Add(2) go m.handleEvents() + go m.routeInbox() return m } -func (m *RequestManager) Stream( - filters [][]byte, -) ( - reqID string, - events <-chan ReqEvent, - closed <-chan ReqClosed, -) { - ctx := component.MustExtend(m.ctx, "stream") +func (m *RequestManager) Stream(filters [][]byte) (string, <-chan ReqEvent, <-chan ReqClosed) { id := generateID() - terminate := func() { - m.mu.Lock() - defer m.mu.Unlock() - m.unsubscribeInboxLock(id) - delete(m.reqs, id) - m.reqWg.Done() + + evIn := make(chan ReqEvent) + evOut := make(chan ReqEvent) + cl := make(chan ReqClosed, 1) + + reg := ®istration{ + filters: filters, + eventsIn: evIn, + closed: cl, } + go bufferedPipe(evIn, evOut) + reg.eventsOut = evOut + m.mu.Lock() - defer m.mu.Unlock() - - m.reqWg.Add(1) - r := newStreamRequest(ctx, id, envelope.EncloseReq(id, filters), - m.subscribeInboxLock(id), m.done, terminate, m.handler) - - m.reqs[id] = r - - return id, r.Events(), r.Closed() -} - -func (m *RequestManager) Query( - filters [][]byte, - timeout time.Duration, -) ( - events []ReqEvent, - closed *ReqClosed, -) { - ctx, _ := context.WithTimeout(component.MustExtend(m.ctx, "query"), timeout) - - id := generateID() - terminate := func() { - m.mu.Lock() - defer m.mu.Unlock() - m.unsubscribeInboxLock(id) + m.regs[id] = reg + if m.envoy.IsConnected() { + m.spawnSessionLock(id, reg) } - - m.mu.Lock() - r := newQueryRequest(ctx, id, envelope.EncloseReq(id, filters), - m.subscribeInboxLock(id), m.done, terminate, m.handler) m.mu.Unlock() - for { - select { - case <-m.ctx.Done(): - return - case rEvent, ok := <-r.Events(): - if !ok { - return - } - events = append(events, rEvent) - case rClosed := <-r.Closed(): - closed = &rClosed - return - } + return id, reg.eventsOut, reg.closed +} + +func (m *RequestManager) Query(filters [][]byte, timeout time.Duration) ([]ReqEvent, *ReqClosed) { + if !m.envoy.IsConnected() { + return nil, nil } + ctx, cancel := context.WithTimeout(m.ctx, timeout) + defer cancel() + + id := generateID() + si := newSessionInbox() + + // Buffered collection channels so the session can forward without blocking. + evCh := make(chan ReqEvent, sessionInboxBuffer) + clCh := make(chan ReqClosed, 1) + sessionDone := make(chan struct{}) + + m.mu.Lock() + m.inboxSubs[id] = si + m.mu.Unlock() + + terminate := func() { + m.mu.Lock() + delete(m.inboxSubs, id) + m.mu.Unlock() + m.reqWg.Done() + close(sessionDone) + } + + m.reqWg.Add(1) + s := newSession( + id, + envelope.EncloseReq(id, filters), + m.envoy.Send, + m.done, + terminate, + withEOSEClose(), + withSessionInbox(si), + withForwardEvents(evCh), + withForwardClosed(clCh), + ) + go s.run() + + var events []ReqEvent + var closed *ReqClosed + + // Wait for the session to finish, or timeout. + select { + case <-ctx.Done(): + s.Close() + <-sessionDone + case <-sessionDone: + } + + // Drain whatever the session forwarded. + for { + select { + case ev := <-evCh: + events = append(events, ev) + default: + goto drained + } + } +drained: + select { + case cl := <-clCh: + closed = &cl + default: + } + + return events, closed } func (m *RequestManager) Cancel(id string) error { - req, ok := m.reqs[id] + m.mu.Lock() + defer m.mu.Unlock() + + s, ok := m.sessions[id] if !ok { - return fmt.Errorf("req not found: %s", id) + return fmt.Errorf("session not found: %s", id) } - req.Close() + s.Close() return nil } @@ -184,27 +465,71 @@ func (m *RequestManager) Close() { m.wg.Wait() } -func (m *RequestManager) start() { +func (m *RequestManager) spawnSessionLock(id string, reg *registration) { + si := newSessionInbox() + m.inboxSubs[id] = si + terminate := func() { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.inboxSubs, id) + delete(m.sessions, id) + m.reqWg.Done() + } + + deregister := func() { + m.mu.Lock() + defer m.mu.Unlock() + reg.deregister.Do(func() { + delete(m.regs, id) + close(reg.eventsIn) + reg.closed <- ReqClosed{} + close(reg.closed) + }) + } + + m.reqWg.Add(1) + s := newSession( + id, + envelope.EncloseReq(id, reg.filters), + m.envoy.Send, + m.done, + terminate, + withDeregister(deregister), + withSessionInbox(si), + withForwardEvents(reg.eventsIn), + withForwardClosed(reg.closed), + ) + m.sessions[id] = s + go s.run() +} + +func (m *RequestManager) start() { + m.mu.Lock() + defer m.mu.Unlock() + + m.done = make(chan struct{}) + for id, reg := range m.regs { + if _, active := m.sessions[id]; !active { + m.spawnSessionLock(id, reg) + } + } } func (m *RequestManager) stop() { + m.mu.Lock() + done := m.done + m.mu.Unlock() -} - -func (m *RequestManager) subscribeInboxLock(id string) <-chan InboxMessage { - ch := make(chan InboxMessage) - m.inboxSubs[id] = ch - return ch -} - -func (m *RequestManager) unsubscribeInboxLock(id string) { - ch, ok := m.inboxSubs[id] - if !ok { - return + if done != nil { + close(done) } - close(ch) - delete(m.inboxSubs, id) + m.reqWg.Wait() + + m.mu.Lock() + m.sessions = make(map[string]*session) + m.inboxSubs = make(map[string]*sessionInbox) + m.mu.Unlock() } func (m *RequestManager) handleEvents() { @@ -214,11 +539,13 @@ func (m *RequestManager) handleEvents() { select { case <-m.ctx.Done(): return - case ev := <-m.events: + case ev, ok := <-m.events: + if !ok { + return + } switch ev.Kind { case EventConnected: m.start() - case EventDisconnected: m.stop() } @@ -233,107 +560,68 @@ func (m *RequestManager) routeInbox() { select { case <-m.ctx.Done(): return - case ev, ok := <-m.inbox: + case msg, ok := <-m.inbox: if !ok { return } - url, err := honeybee.NormalizeURL(ev.ID) + label, err := envelope.GetLabel(msg.Data) if err != nil { continue } - m.mu.RLock() - sub, ok := m.inboxSubs[url] - m.mu.RUnlock() + switch string(label) { + case "EVENT": + subID, data, err := envelope.FindSubscriptionEvent(msg.Data) + if err != nil { + continue + } + m.mu.RLock() + si, ok := m.inboxSubs[subID] + m.mu.RUnlock() + if !ok { + continue + } + select { + case <-m.ctx.Done(): + return + case si.events <- inboxEvent{subID: subID, data: data, receivedAt: msg.ReceivedAt}: + } - if !ok { - continue - } + case "EOSE": + subID, err := envelope.FindEOSE(msg.Data) + if err != nil { + continue + } + m.mu.RLock() + si, ok := m.inboxSubs[subID] + m.mu.RUnlock() + if !ok { + continue + } + select { + case <-m.ctx.Done(): + return + case si.eose <- inboxEOSE{subID: subID, receivedAt: msg.ReceivedAt}: + } - select { - case <-m.ctx.Done(): - return - case sub <- ev: + case "CLOSED": + subID, message, err := envelope.FindClosed(msg.Data) + if err != nil { + continue + } + m.mu.RLock() + si, ok := m.inboxSubs[subID] + m.mu.RUnlock() + if !ok { + continue + } + select { + case <-m.ctx.Done(): + return + case si.closed <- inboxClosed{subID: subID, message: message, receivedAt: msg.ReceivedAt}: + } } } } } - -// ---------------------------------------------------------------------------- -// Request -// ---------------------------------------------------------------------------- - -func newStreamRequest( - ctx context.Context, - id string, - req []byte, - inbox <-chan InboxMessage, - stop chan struct{}, - terminate func(), - handler slog.Handler, -) *request { - ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request")) - - r := &request{ - id: id, - req: req, - query: false, - inbox: inbox, - stop: stop, - terminate: terminate, - ctx: ctx, - cancel: cancel, - } - - if handler != nil { - comp := component.FromContext(ctx) - r.logger = slog.New(handler).With(slog.Any("component", comp)) - } - - return r -} - -func newQueryRequest( - ctx context.Context, - id string, - req []byte, - inbox <-chan InboxMessage, - stop chan struct{}, - terminate func(), - handler slog.Handler, -) *request { - ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request")) - - r := &request{ - id: id, - req: req, - query: true, - inbox: inbox, - stop: stop, - terminate: terminate, - ctx: ctx, - cancel: cancel, - } - - if handler != nil { - comp := component.FromContext(ctx) - r.logger = slog.New(handler).With(slog.Any("component", comp)) - } - - return r -} - -func (r *request) Close() { - r.cancel() - r.wg.Wait() - r.terminate() -} - -func (r *request) Events() <-chan ReqEvent { - return r.events -} - -func (r *request) Closed() <-chan ReqClosed { - return r.closed -} diff --git a/request_test.go b/request_test.go new file mode 100644 index 0000000..6816f10 --- /dev/null +++ b/request_test.go @@ -0,0 +1,854 @@ +package prism + +import ( + "fmt" + "sync" + "testing" + "time" + + "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-roots-ws" + "github.com/stretchr/testify/assert" +) + +// ---------------------------------------------------------------------------- +// Session +// ---------------------------------------------------------------------------- + +func TestSession_SendFailure_Terminates(t *testing.T) { + sendFn := func(data []byte) error { + return fmt.Errorf("write error") + } + + terminateCalled := make(chan struct{}) + terminate := func() { close(terminateCalled) } + + done := make(chan struct{}) + s := newSession("TESTID01", []byte(`["REQ"]`), sendFn, done, terminate) + go s.run() + + Eventually(t, func() bool { + select { + case <-terminateCalled: + return true + default: + return false + } + }, "terminate should be called on send failure") +} + +// ---------------------------------------------------------------------------- +// Query +// ---------------------------------------------------------------------------- + +func TestQuery_ReturnsEventsBeforeEOSE(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + + var events []ReqEvent + var queryDone = make(chan struct{}) + go func() { + events, _ = h.manager.Query(filters, TestTimeout) + close(queryDone) + }() + + // Intercept the outbound REQ to learn the sub-ID. + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendEvent(subID, []byte(`{"id":"aaa"}`)) + h.sendEvent(subID, []byte(`{"id":"bbb"}`)) + h.sendEOSE(subID) + + Eventually(t, func() bool { + select { + case <-queryDone: + return true + default: + return false + } + }, "query should return after EOSE") + + assert.Len(t, events, 2) +} + +func TestQuery_EOSE_SendsCLOSEAndReturns(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + + var events []ReqEvent + queryDone := make(chan struct{}) + go func() { + events, _ = h.manager.Query(filters, TestTimeout) + close(queryDone) + }() + + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendEOSE(subID) + + Eventually(t, func() bool { + select { + case <-queryDone: + return true + default: + return false + } + }, "query should return after EOSE") + + assert.Empty(t, events) + + // Verify a CLOSE was sent after EOSE. + Eventually(t, func() bool { + select { + case msg := <-h.sent: + closedID, err := envelope.FindClose(msg) + return err == nil && closedID == subID + default: + return false + } + }, "manager should send CLOSE after EOSE") +} + +func TestQuery_CLOSED_ReturnsMessageAndEmptyEvents(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + + var retEvents []ReqEvent + var retClosed *ReqClosed + queryDone := make(chan struct{}) + go func() { + retEvents, retClosed = h.manager.Query(filters, TestTimeout) + close(queryDone) + }() + + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendClosed(subID, "rate limited: too many subscriptions") + + Eventually(t, func() bool { + select { + case <-queryDone: + return true + default: + return false + } + }, "query should return after CLOSED") + + assert.Empty(t, retEvents) + assert.NotNil(t, retClosed) + assert.Equal(t, "rate limited: too many subscriptions", retClosed.Data) +} + +func TestQuery_Timeout_ReturnsPartialEvents(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + + var retEvents []ReqEvent + queryDone := make(chan struct{}) + go func() { + retEvents, _ = h.manager.Query(filters, 200*time.Millisecond) + close(queryDone) + }() + + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendEvent(subID, []byte(`{"id":"aaa"}`)) + // No EOSE — let the timeout fire. + + Eventually(t, func() bool { + select { + case <-queryDone: + return true + default: + return false + } + }, "query should return after timeout") + + assert.Len(t, retEvents, 1) +} + +// ---------------------------------------------------------------------------- +// Disconnect / Reconnect +// ---------------------------------------------------------------------------- + +func TestStream_Disconnect_TerminatesSession(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + _, _, _ = h.manager.Stream(filters) + + // Wait for the REQ to be sent so we know the session is running. + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + h.disconnect() + + // After disconnect, reqWg should reach zero, meaning the session exited. + wgDone := make(chan struct{}) + go func() { + h.manager.reqWg.Wait() + close(wgDone) + }() + + Eventually(t, func() bool { + select { + case <-wgDone: + return true + default: + return false + } + }, "reqWg should reach zero after disconnect") +} + +func TestStream_Disconnect_PreservesRegistration(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + id, _, _ := h.manager.Stream(filters) + + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + h.disconnect() + + wgDone := make(chan struct{}) + go func() { + h.manager.reqWg.Wait() + close(wgDone) + }() + Eventually(t, func() bool { + select { + case <-wgDone: + return true + default: + return false + } + }, "reqWg should clear") + + h.manager.mu.RLock() + _, regExists := h.manager.regs[id] + h.manager.mu.RUnlock() + + assert.True(t, regExists, "registration should survive disconnect") +} + +func TestStream_Reconnect_ResendsREQ(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + _, _, _ = h.manager.Stream(filters) + + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "first REQ should be sent") + + h.disconnect() + wgDone := make(chan struct{}) + go func() { h.manager.reqWg.Wait(); close(wgDone) }() + Eventually(t, func() bool { + select { + case <-wgDone: + return true + default: + return false + } + }, "reqWg should clear after disconnect") + + h.connect() + + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "REQ should be resent after reconnect") +} + +func TestStream_Reconnect_ResumesForwardingEvents(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + _, eventsCh, _ := h.manager.Stream(filters) + + var firstREQ []byte + Eventually(t, func() bool { + select { + case firstREQ = <-h.sent: + return true + default: + return false + } + }, "first REQ should be sent") + + firstSubID, _, err := envelope.FindReq(firstREQ) + assert.NoError(t, err) + + h.sendEvent(firstSubID, []byte(`{"id":"before"}`)) + Eventually(t, func() bool { + select { + case <-eventsCh: + return true + default: + return false + } + }, "event before disconnect should arrive") + + h.disconnect() + wgDone := make(chan struct{}) + go func() { h.manager.reqWg.Wait(); close(wgDone) }() + Eventually(t, func() bool { + select { + case <-wgDone: + return true + default: + return false + } + }, "reqWg should clear after disconnect") + + h.connect() + + var secondREQ []byte + Eventually(t, func() bool { + select { + case secondREQ = <-h.sent: + return true + default: + return false + } + }, "second REQ should be sent after reconnect") + + secondSubID, _, err := envelope.FindReq(secondREQ) + assert.NoError(t, err) + + h.sendEvent(secondSubID, []byte(`{"id":"after"}`)) + var gotAfter ReqEvent + Eventually(t, func() bool { + select { + case gotAfter = <-eventsCh: + return true + default: + return false + } + }, "event after reconnect should arrive on same channel") + assert.Equal(t, []byte(`{"id":"after"}`), gotAfter.Data) +} + +func TestQuery_Disconnect_ReturnsEmpty(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + + var retEvents []ReqEvent + queryDone := make(chan struct{}) + go func() { + retEvents, _ = h.manager.Query(filters, TestTimeout) + close(queryDone) + }() + + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "manager should send REQ") + + h.disconnect() + + Eventually(t, func() bool { + select { + case <-queryDone: + return true + default: + return false + } + }, "query should return after disconnect") + + assert.Empty(t, retEvents) +} + +// ---------------------------------------------------------------------------- +// Inbox routing +// ---------------------------------------------------------------------------- + +func TestRouteInbox_RoutesEventToCorrectSession(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + idA, evA, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) + idB, evB, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[2]}`)}) + + // Collect both outbound REQs (order is non-deterministic). + reqs := make(map[string]bool) + Eventually(t, func() bool { + for { + select { + case msg := <-h.sent: + subID, _, err := envelope.FindReq(msg) + if err == nil { + reqs[subID] = true + } + default: + return len(reqs) >= 2 + } + } + }, "both REQs should be sent") + + assert.Contains(t, reqs, idA) + assert.Contains(t, reqs, idB) + + // Send an event targeted at idA's subscription. + h.sendEvent(idA, []byte(`{"id":"for-a"}`)) + + var gotA ReqEvent + Eventually(t, func() bool { + select { + case gotA = <-evA: + return true + default: + return false + } + }, "event for idA should arrive on evA") + assert.Equal(t, []byte(`{"id":"for-a"}`), gotA.Data) + + Never(t, func() bool { + select { + case <-evB: + return true + default: + return false + } + }, "event for idA should not appear on evB") +} + +func TestRouteInbox_DropsUnknownSubID(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + // Send an EVENT for an ID no session knows about. + h.sendEvent("UNKNOWN1", []byte(`{"id":"ghost"}`)) + + // No panic, no block, manager continues operating. + _, evCh, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) + + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "REQ should be sent") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendEvent(subID, []byte(`{"id":"real"}`)) + Eventually(t, func() bool { + select { + case <-evCh: + return true + default: + return false + } + }, "real event should still arrive after unknown drop") +} + +func TestRouteInbox_DropsUnparseable(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + // Inject a malformed message directly into the inbox. + h.inbox <- honeybee.InboxMessage{ + ID: "wss://test", + Data: []byte(`not valid json at all`), + ReceivedAt: time.Now(), + } + + // Manager should still be alive and routing correctly. + _, evCh, _ := h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) + + var req []byte + Eventually(t, func() bool { + select { + case req = <-h.sent: + return true + default: + return false + } + }, "REQ should be sent after malformed message") + + subID, _, err := envelope.FindReq(req) + assert.NoError(t, err) + + h.sendEvent(subID, []byte(`{"id":"ok"}`)) + Eventually(t, func() bool { + select { + case <-evCh: + return true + default: + return false + } + }, "event should arrive after malformed message was dropped") +} + +// ---------------------------------------------------------------------------- +// Manager lifecycle +// ---------------------------------------------------------------------------- + +func TestManager_StreamWhileDisconnected_SessionSpawnedOnConnect(t *testing.T) { + h := newManagerHarness(t) + // Do NOT connect yet. + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + _, _, _ = h.manager.Stream(filters) + + // No REQ should be sent while disconnected. + Never(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "no REQ should be sent before connect") + + // Now connect — session should spawn and REQ should be sent. + h.connect() + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "REQ should be sent after connect") +} + +func TestManager_QueryWhileDisconnected_ReturnsEmpty(t *testing.T) { + h := newManagerHarness(t) + // Do NOT connect. + + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + events, closed := h.manager.Query(filters, TestTimeout) + + assert.Empty(t, events) + assert.Nil(t, closed) +} + +func TestManager_Close_TerminatesAllSessions(t *testing.T) { + h := newManagerHarness(t) + h.connect() + Eventually(t, h.envoy.IsConnected, "envoy should be connected") + + h.manager.Stream([][]byte{[]byte(`{"kinds":[1]}`)}) + h.manager.Stream([][]byte{[]byte(`{"kinds":[2]}`)}) + + // Wait for both REQs. + Eventually(t, func() bool { + for { + select { + case <-h.sent: + default: + return len(h.manager.sessions) >= 2 + } + } + }, "both sessions should be active") + + closed := make(chan struct{}) + go func() { + h.manager.Close() + close(closed) + }() + + Eventually(t, func() bool { + select { + case <-closed: + return true + default: + return false + } + }, "manager.Close() should return") +} + +// ---------------------------------------------------------------------------- +// Stream (unit-level session tests) +// ---------------------------------------------------------------------------- + +func TestStream_ForwardsEvents(t *testing.T) { + si := newSessionInbox() + done := make(chan struct{}) + terminate := func() {} + + eventsCh := make(chan ReqEvent, 4) + + s := newSession( + "TESTID01", + []byte(`["REQ"]`), + func([]byte) error { return nil }, + done, + terminate, + withSessionInbox(si), + withForwardEvents(eventsCh), + ) + go s.run() + + eventData := []byte(`{"id":"abc"}`) + si.events <- inboxEvent{ + subID: "TESTID01", + data: eventData, + receivedAt: time.Now(), + } + + Eventually(t, func() bool { return len(eventsCh) > 0 }, "event should be forwarded") + + got := <-eventsCh + assert.Equal(t, eventData, got.Data) + + close(done) +} + +func TestStream_IgnoresEOSE(t *testing.T) { + si := newSessionInbox() + done := make(chan struct{}) + terminateCalled := make(chan struct{}) + terminate := func() { close(terminateCalled) } + + s := newSession( + "TESTID01", + []byte(`["REQ"]`), + func([]byte) error { return nil }, + done, + terminate, + withSessionInbox(si), + ) + go s.run() + + si.eose <- inboxEOSE{subID: "TESTID01", receivedAt: time.Now()} + + Never(t, func() bool { + select { + case <-terminateCalled: + return true + default: + return false + } + }, "stream session should not terminate on EOSE") + + close(done) +} + +func TestStream_CLOSED_ForwardsMessageAndTerminates(t *testing.T) { + si := newSessionInbox() + done := make(chan struct{}) + terminateCalled := make(chan struct{}) + deregisterCalled := make(chan struct{}) + terminate := func() { close(terminateCalled) } + deregister := func() { close(deregisterCalled) } + + forwardedClosed := make(chan ReqClosed, 1) + + s := newSession( + "TESTID01", + []byte(`["REQ"]`), + func([]byte) error { return nil }, + done, + terminate, + withSessionInbox(si), + withDeregister(deregister), + withForwardClosed(forwardedClosed), + ) + go s.run() + + si.closed <- inboxClosed{ + subID: "TESTID01", + message: "rate limited: too many subscriptions", + receivedAt: time.Now(), + } + + Eventually(t, func() bool { + select { + case <-terminateCalled: + return true + default: + return false + } + }, "terminate should be called on CLOSED") + + Eventually(t, func() bool { + select { + case <-deregisterCalled: + return true + default: + return false + } + }, "deregister should be called on CLOSED") + + assert.Len(t, forwardedClosed, 1) + got := <-forwardedClosed + assert.Equal(t, "rate limited: too many subscriptions", got.Data) +} + +func TestStream_Cancel_SendsCLOSEAndTerminates(t *testing.T) { + si := newSessionInbox() + done := make(chan struct{}) + terminateCalled := make(chan struct{}) + terminate := func() { close(terminateCalled) } + + var sentMessages [][]byte + var sentMu sync.Mutex + sendFn := func(data []byte) error { + sentMu.Lock() + sentMessages = append(sentMessages, data) + sentMu.Unlock() + return nil + } + + id := "TESTID01" + s := newSession( + id, + []byte(`["REQ"]`), + sendFn, + done, + terminate, + withSessionInbox(si), + ) + go s.run() + + // Wait for REQ to be sent before cancelling. + Eventually(t, func() bool { + sentMu.Lock() + defer sentMu.Unlock() + return len(sentMessages) > 0 + }, "REQ should be sent before cancel") + + s.Close() + + Eventually(t, func() bool { + select { + case <-terminateCalled: + return true + default: + return false + } + }, "terminate should be called on cancel") + + sentMu.Lock() + defer sentMu.Unlock() + expectedCLOSE := []byte(envelope.EncloseClose(id)) + assert.Contains(t, sentMessages, expectedCLOSE) +} + +func TestSession_SendsREQOnStart(t *testing.T) { + sent := make(chan []byte, 1) + sendFn := func(data []byte) error { + sent <- data + return nil + } + + id := "TESTID01" + filters := [][]byte{[]byte(`{"kinds":[1]}`)} + req := mustEncloseReq(id, filters) + done := make(chan struct{}) + terminate := func() {} + + s := newSession(id, req, sendFn, done, terminate) + go s.run() + + Eventually(t, func() bool { return len(sent) > 0 }, "session should send REQ") + + got := <-sent + assert.Equal(t, req, got) + + close(done) +}