From 715dfa17b0bd8e7e63c2498ac84c2dfde1e0d557 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 21:53:37 -0400 Subject: [PATCH] refactor: extract route funcs, unify newStream, fix query timeout leak --- request.go | 171 +++++++++++++++++++++++++++++------------------------ 1 file changed, 95 insertions(+), 76 deletions(-) diff --git a/request.go b/request.go index 3f9171c..6fab8c9 100644 --- a/request.go +++ b/request.go @@ -54,7 +54,8 @@ type request struct { events chan ReqEvent closed chan ReqClosed - once sync.Once + deregisterOnce sync.Once + closedOnce sync.Once } // ---------------------------------------------------------------------------- @@ -106,29 +107,47 @@ func NewRequestManager(e *Envoy) *RequestManager { func (m *RequestManager) Stream( filters [][]byte, +) (string, <-chan ReqEvent, <-chan ReqClosed) { + id, events, closed := m.newStream(filters, false) + return id, events, closed +} + +func (m *RequestManager) newStream( + filters [][]byte, + isQuery bool, ) (string, <-chan ReqEvent, <-chan ReqClosed) { id := generateID() buffer := make(chan ReqEvent, 64) - events := make(chan ReqEvent) closed := make(chan ReqClosed, 1) + var events chan ReqEvent + if isQuery { + // Query reads directly from buffer. bufferedPipe drains asynchronously, + // so EOSE can close the pipe's output before all buffered EVENTs have + // passed through — causing Query's collect loop to exit short. Reading + // from the buffered channel directly avoids that race. + events = buffer + } else { + events = make(chan ReqEvent) + go func() { + bufferedPipe(buffer, events) + close(events) + }() + } + req := &request{ id: id, filters: filters, buffer: buffer, events: events, closed: closed, - isQuery: false, + isQuery: isQuery, } m.mu.Lock() m.reqs[id] = req - go func() { - bufferedPipe(buffer, events) - close(events) - }() if m.envoy.IsConnected() { - m.activateLock(req) + m.activate(req) } m.mu.Unlock() @@ -138,27 +157,12 @@ func (m *RequestManager) Stream( func (m *RequestManager) Query( filters [][]byte, timeout time.Duration, -) (events []ReqEvent, closed *ReqClosed) { +) ([]ReqEvent, *ReqClosed) { if !m.envoy.IsConnected() { return nil, nil } - id := generateID() - eventsCh := make(chan ReqEvent) - closedCh := make(chan ReqClosed, 1) - - req := &request{ - id: id, - filters: filters, - buffer: eventsCh, - closed: closedCh, - isQuery: true, - } - - m.mu.Lock() - m.reqs[id] = req - m.activateLock(req) - m.mu.Unlock() + id, eventsCh, closedCh := m.newStream(filters, true) ctx, cancel := context.WithTimeout(m.ctx, timeout) defer cancel() @@ -177,6 +181,7 @@ func (m *RequestManager) Query( } return result, &cl case <-ctx.Done(): + m.Cancel(id) return result, nil } } @@ -196,7 +201,7 @@ func (m *RequestManager) Cancel(id string) error { req.active = false } - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -216,7 +221,7 @@ func (m *RequestManager) Close() { if req.active { go m.envoy.Send(envelope.EncloseClose(id)) } - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -224,14 +229,14 @@ func (m *RequestManager) Close() { } } -func (m *RequestManager) activateLock(req *request) { +func (m *RequestManager) activate(req *request) { req.active = true go m.envoy.Send(envelope.EncloseReq(req.id, req.filters)) } -func (m *RequestManager) removeLock(req *request) { +func (m *RequestManager) deregister(req *request) { req.active = false - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -254,7 +259,7 @@ func (m *RequestManager) handleEvents() { case EventConnected: for _, req := range m.reqs { if !req.isQuery { - m.activateLock(req) + m.activate(req) } } case EventDisconnected: @@ -291,56 +296,70 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { switch string(label) { case "EVENT": - subID, event, err := envelope.FindSubscriptionEvent(msg.Data) - if err != nil { - return - } - m.mu.RLock() - req, ok := m.reqs[subID] - m.mu.RUnlock() - if !ok { - return - } - req.buffer <- ReqEvent{ - PeerID: msg.ID, - ReceivedAt: msg.ReceivedAt, - Data: event, - } - + m.routeEvent(msg) case "EOSE": - subID, err := envelope.FindEOSE(msg.Data) - if err != nil { - return - } - m.mu.Lock() - req, ok := m.reqs[subID] - if !ok { - m.mu.Unlock() - return - } - if req.active && req.isQuery { - m.removeLock(req) - go m.envoy.Send(envelope.EncloseClose(subID)) - } - m.mu.Unlock() - + m.routeEOSE(msg) case "CLOSED": - subID, message, err := envelope.FindClosed(msg.Data) - if err != nil { - return - } - m.mu.Lock() - req, ok := m.reqs[subID] - if !ok { - m.mu.Unlock() - return - } + m.routeClosed(msg) + } +} + +// routeEvent, routeEOSE, and routeClosed use blocking sends into req.buffer. +// This preserves every event without dropping: req.buffer feeds either an +// unbounded bufferedPipe (streams) that absorbs a slow external reader, or +// is read directly by Query's collect loop in the caller's goroutine. +func (m *RequestManager) routeEvent(msg InboxMessage) { + subID, event, err := envelope.FindSubscriptionEvent(msg.Data) + if err != nil { + return + } + m.mu.RLock() + req, ok := m.reqs[subID] + m.mu.RUnlock() + if !ok { + return + } + req.buffer <- ReqEvent{ + PeerID: msg.ID, + ReceivedAt: msg.ReceivedAt, + Data: event, + } +} + +func (m *RequestManager) routeEOSE(msg InboxMessage) { + subID, err := envelope.FindEOSE(msg.Data) + if err != nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + req, ok := m.reqs[subID] + if !ok { + return + } + if req.active && req.isQuery { + m.deregister(req) + go m.envoy.Send(envelope.EncloseClose(subID)) + } +} + +func (m *RequestManager) routeClosed(msg InboxMessage) { + subID, message, err := envelope.FindClosed(msg.Data) + if err != nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + req, ok := m.reqs[subID] + if !ok { + return + } + req.closedOnce.Do(func() { req.closed <- ReqClosed{ PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message, } - m.removeLock(req) - m.mu.Unlock() - } + }) + m.deregister(req) }