refactor: extract route funcs, unify newStream, fix query timeout leak

This commit is contained in:
Jay
2026-05-17 21:53:37 -04:00
parent a05e20ec6a
commit 715dfa17b0
+59 -40
View File
@@ -54,7 +54,8 @@ type request struct {
events chan ReqEvent events chan ReqEvent
closed chan ReqClosed closed chan ReqClosed
once sync.Once deregisterOnce sync.Once
closedOnce sync.Once
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@@ -106,29 +107,47 @@ func NewRequestManager(e *Envoy) *RequestManager {
func (m *RequestManager) Stream( func (m *RequestManager) Stream(
filters [][]byte, filters [][]byte,
) (string, <-chan ReqEvent, <-chan ReqClosed) {
id, events, closed := m.newStream(filters, false)
return id, events, closed
}
func (m *RequestManager) newStream(
filters [][]byte,
isQuery bool,
) (string, <-chan ReqEvent, <-chan ReqClosed) { ) (string, <-chan ReqEvent, <-chan ReqClosed) {
id := generateID() id := generateID()
buffer := make(chan ReqEvent, 64) buffer := make(chan ReqEvent, 64)
events := make(chan ReqEvent)
closed := make(chan ReqClosed, 1) closed := make(chan ReqClosed, 1)
var events chan ReqEvent
if isQuery {
// Query reads directly from buffer. bufferedPipe drains asynchronously,
// so EOSE can close the pipe's output before all buffered EVENTs have
// passed through — causing Query's collect loop to exit short. Reading
// from the buffered channel directly avoids that race.
events = buffer
} else {
events = make(chan ReqEvent)
go func() {
bufferedPipe(buffer, events)
close(events)
}()
}
req := &request{ req := &request{
id: id, id: id,
filters: filters, filters: filters,
buffer: buffer, buffer: buffer,
events: events, events: events,
closed: closed, closed: closed,
isQuery: false, isQuery: isQuery,
} }
m.mu.Lock() m.mu.Lock()
m.reqs[id] = req m.reqs[id] = req
go func() {
bufferedPipe(buffer, events)
close(events)
}()
if m.envoy.IsConnected() { if m.envoy.IsConnected() {
m.activateLock(req) m.activate(req)
} }
m.mu.Unlock() m.mu.Unlock()
@@ -138,27 +157,12 @@ func (m *RequestManager) Stream(
func (m *RequestManager) Query( func (m *RequestManager) Query(
filters [][]byte, filters [][]byte,
timeout time.Duration, timeout time.Duration,
) (events []ReqEvent, closed *ReqClosed) { ) ([]ReqEvent, *ReqClosed) {
if !m.envoy.IsConnected() { if !m.envoy.IsConnected() {
return nil, nil return nil, nil
} }
id := generateID() id, eventsCh, closedCh := m.newStream(filters, true)
eventsCh := make(chan ReqEvent)
closedCh := make(chan ReqClosed, 1)
req := &request{
id: id,
filters: filters,
buffer: eventsCh,
closed: closedCh,
isQuery: true,
}
m.mu.Lock()
m.reqs[id] = req
m.activateLock(req)
m.mu.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, timeout) ctx, cancel := context.WithTimeout(m.ctx, timeout)
defer cancel() defer cancel()
@@ -177,6 +181,7 @@ func (m *RequestManager) Query(
} }
return result, &cl return result, &cl
case <-ctx.Done(): case <-ctx.Done():
m.Cancel(id)
return result, nil return result, nil
} }
} }
@@ -196,7 +201,7 @@ func (m *RequestManager) Cancel(id string) error {
req.active = false req.active = false
} }
req.once.Do(func() { req.deregisterOnce.Do(func() {
close(req.buffer) close(req.buffer)
close(req.closed) close(req.closed)
}) })
@@ -216,7 +221,7 @@ func (m *RequestManager) Close() {
if req.active { if req.active {
go m.envoy.Send(envelope.EncloseClose(id)) go m.envoy.Send(envelope.EncloseClose(id))
} }
req.once.Do(func() { req.deregisterOnce.Do(func() {
close(req.buffer) close(req.buffer)
close(req.closed) close(req.closed)
}) })
@@ -224,14 +229,14 @@ func (m *RequestManager) Close() {
} }
} }
func (m *RequestManager) activateLock(req *request) { func (m *RequestManager) activate(req *request) {
req.active = true req.active = true
go m.envoy.Send(envelope.EncloseReq(req.id, req.filters)) go m.envoy.Send(envelope.EncloseReq(req.id, req.filters))
} }
func (m *RequestManager) removeLock(req *request) { func (m *RequestManager) deregister(req *request) {
req.active = false req.active = false
req.once.Do(func() { req.deregisterOnce.Do(func() {
close(req.buffer) close(req.buffer)
close(req.closed) close(req.closed)
}) })
@@ -254,7 +259,7 @@ func (m *RequestManager) handleEvents() {
case EventConnected: case EventConnected:
for _, req := range m.reqs { for _, req := range m.reqs {
if !req.isQuery { if !req.isQuery {
m.activateLock(req) m.activate(req)
} }
} }
case EventDisconnected: case EventDisconnected:
@@ -291,6 +296,19 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
switch string(label) { switch string(label) {
case "EVENT": case "EVENT":
m.routeEvent(msg)
case "EOSE":
m.routeEOSE(msg)
case "CLOSED":
m.routeClosed(msg)
}
}
// routeEvent, routeEOSE, and routeClosed use blocking sends into req.buffer.
// This preserves every event without dropping: req.buffer feeds either an
// unbounded bufferedPipe (streams) that absorbs a slow external reader, or
// is read directly by Query's collect loop in the caller's goroutine.
func (m *RequestManager) routeEvent(msg InboxMessage) {
subID, event, err := envelope.FindSubscriptionEvent(msg.Data) subID, event, err := envelope.FindSubscriptionEvent(msg.Data)
if err != nil { if err != nil {
return return
@@ -306,41 +324,42 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
ReceivedAt: msg.ReceivedAt, ReceivedAt: msg.ReceivedAt,
Data: event, Data: event,
} }
}
case "EOSE": func (m *RequestManager) routeEOSE(msg InboxMessage) {
subID, err := envelope.FindEOSE(msg.Data) subID, err := envelope.FindEOSE(msg.Data)
if err != nil { if err != nil {
return return
} }
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock()
req, ok := m.reqs[subID] req, ok := m.reqs[subID]
if !ok { if !ok {
m.mu.Unlock()
return return
} }
if req.active && req.isQuery { if req.active && req.isQuery {
m.removeLock(req) m.deregister(req)
go m.envoy.Send(envelope.EncloseClose(subID)) go m.envoy.Send(envelope.EncloseClose(subID))
} }
m.mu.Unlock() }
case "CLOSED": func (m *RequestManager) routeClosed(msg InboxMessage) {
subID, message, err := envelope.FindClosed(msg.Data) subID, message, err := envelope.FindClosed(msg.Data)
if err != nil { if err != nil {
return return
} }
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock()
req, ok := m.reqs[subID] req, ok := m.reqs[subID]
if !ok { if !ok {
m.mu.Unlock()
return return
} }
req.closedOnce.Do(func() {
req.closed <- ReqClosed{ req.closed <- ReqClosed{
PeerID: msg.ID, PeerID: msg.ID,
ReceivedAt: msg.ReceivedAt, ReceivedAt: msg.ReceivedAt,
Data: message, Data: message,
} }
m.removeLock(req) })
m.mu.Unlock() m.deregister(req)
}
} }