diff --git a/request.go b/request.go index ff20fbc..2f08673 100644 --- a/request.go +++ b/request.go @@ -140,6 +140,9 @@ func (m *RequestManager) newStream( isQuery bool, opts ...RequestOption, ) (string, <-chan ReqEvent, <-chan ReqClosed, error) { + m.mu.Lock() + defer m.mu.Unlock() + var o requestOptions for _, opt := range opts { opt(&o) @@ -147,6 +150,9 @@ func (m *RequestManager) newStream( var id string if o.id != "" { + if _, exists := m.reqs[o.id]; exists { + return "", nil, nil, fmt.Errorf("Stream: id %q already in use", o.id) + } id = o.id } else { label := o.label @@ -158,20 +164,11 @@ func (m *RequestManager) newStream( buffer := make(chan ReqEvent, 64) closed := make(chan ReqClosed, 1) - var events chan ReqEvent - if isQuery { - // Query reads directly from buffer. bufferedPipe drains asynchronously, - // so EOSE can close the pipe's output before all buffered EVENTs have - // passed through — causing Query's collect loop to exit short. Reading - // from the buffered channel directly avoids that race. - events = buffer - } else { - events = make(chan ReqEvent) - go func() { - bufferedPipe(buffer, events) - close(events) - }() - } + events := make(chan ReqEvent) + go func() { + bufferedPipe(buffer, events) + close(events) + }() req := &request{ id: id, @@ -182,22 +179,10 @@ func (m *RequestManager) newStream( isQuery: isQuery, } - m.mu.Lock() - if _, exists := m.reqs[id]; exists { - m.mu.Unlock() - close(buffer) - if !isQuery { - // drain bufferedPipe goroutine - for range events { - } - } - return "", nil, nil, fmt.Errorf("Stream: id %q already in use", id) - } m.reqs[id] = req if m.envoy.IsConnected() { m.activate(req) } - m.mu.Unlock() return id, events, closed, nil } @@ -356,14 +341,8 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { } } -// routeEvent, routeEOSE, and routeClosed use blocking sends into req.buffer. -// This preserves every event without dropping: req.buffer feeds either an -// unbounded bufferedPipe (streams) that absorbs a slow external reader, or -// is read directly by Query's collect loop in the caller's goroutine. -// --- -// routeEvent, routeEOSE, and routeClosed are always called sequentially from -// the same routeInbox goroutine via dispatchInbox. This makes it safe for -// routeEOSE to close req.buffer: no concurrent routeEvent send can race it. +// routeEvent, routeEOSE, and routeClosed use blocking sends into req.buffer, +// which reads eagerly into a slice buffer and cannot block the router. func (m *RequestManager) routeEvent(msg InboxMessage) { subID, event, err := envelope.FindSubscriptionEvent(msg.Data) if err != nil { @@ -382,6 +361,9 @@ func (m *RequestManager) routeEvent(msg InboxMessage) { } } +// routeEvent, routeEOSE, and routeClosed are always called sequentially from +// the same routeInbox goroutine via dispatchInbox. This makes it safe for +// routeEOSE to close req.buffer: no concurrent routeEvent send can race it. func (m *RequestManager) routeEOSE(msg InboxMessage) { subID, err := envelope.FindEOSE(msg.Data) if err != nil {