fix query event collection race
This commit is contained in:
+9
-1
@@ -360,6 +360,10 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
|||||||
// This preserves every event without dropping: req.buffer feeds either an
|
// This preserves every event without dropping: req.buffer feeds either an
|
||||||
// unbounded bufferedPipe (streams) that absorbs a slow external reader, or
|
// unbounded bufferedPipe (streams) that absorbs a slow external reader, or
|
||||||
// is read directly by Query's collect loop in the caller's goroutine.
|
// is read directly by Query's collect loop in the caller's goroutine.
|
||||||
|
// ---
|
||||||
|
// routeEvent, routeEOSE, and routeClosed are always called sequentially from
|
||||||
|
// the same routeInbox goroutine via dispatchInbox. This makes it safe for
|
||||||
|
// routeEOSE to close req.buffer: no concurrent routeEvent send can race it.
|
||||||
func (m *RequestManager) routeEvent(msg InboxMessage) {
|
func (m *RequestManager) routeEvent(msg InboxMessage) {
|
||||||
subID, event, err := envelope.FindSubscriptionEvent(msg.Data)
|
subID, event, err := envelope.FindSubscriptionEvent(msg.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -390,7 +394,11 @@ func (m *RequestManager) routeEOSE(msg InboxMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if req.active && req.isQuery {
|
if req.active && req.isQuery {
|
||||||
m.deregister(req)
|
// manually cleanup query state
|
||||||
|
// specifically, do not close req.closed or events can be missed
|
||||||
|
req.active = false
|
||||||
|
close(req.buffer)
|
||||||
|
delete(m.reqs, req.id)
|
||||||
go m.envoy.Send(envelope.EncloseClose(subID))
|
go m.envoy.Send(envelope.EncloseClose(subID))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user