diff --git a/request.go b/request.go index 29a715c..ff20fbc 100644 --- a/request.go +++ b/request.go @@ -360,6 +360,10 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { // This preserves every event without dropping: req.buffer feeds either an // unbounded bufferedPipe (streams) that absorbs a slow external reader, or // is read directly by Query's collect loop in the caller's goroutine. +// --- +// routeEvent, routeEOSE, and routeClosed are always called sequentially from +// the same routeInbox goroutine via dispatchInbox. This makes it safe for +// routeEOSE to close req.buffer: no concurrent routeEvent send can race it. func (m *RequestManager) routeEvent(msg InboxMessage) { subID, event, err := envelope.FindSubscriptionEvent(msg.Data) if err != nil { @@ -390,7 +394,11 @@ func (m *RequestManager) routeEOSE(msg InboxMessage) { return } if req.active && req.isQuery { - m.deregister(req) + // manually cleanup query state + // specifically, do not close req.closed or events can be missed + req.active = false + close(req.buffer) + delete(m.reqs, req.id) go m.envoy.Send(envelope.EncloseClose(subID)) } }