allow queries to wait for peers to connect within its timeout
This commit is contained in:
+1
-7
@@ -192,10 +192,6 @@ func (m *RequestManager) Query(
|
|||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
opts ...RequestOption,
|
opts ...RequestOption,
|
||||||
) ([]ReqEvent, *ReqClosed, error) {
|
) ([]ReqEvent, *ReqClosed, error) {
|
||||||
if !m.envoy.IsConnected() {
|
|
||||||
return nil, nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
id, eventsCh, closedCh, err := m.newStream(filters, true, opts...)
|
id, eventsCh, closedCh, err := m.newStream(filters, true, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@@ -295,9 +291,7 @@ func (m *RequestManager) handleEvents() {
|
|||||||
switch ev.Kind {
|
switch ev.Kind {
|
||||||
case EventConnected:
|
case EventConnected:
|
||||||
for _, req := range m.reqs {
|
for _, req := range m.reqs {
|
||||||
if !req.isQuery {
|
m.activate(req)
|
||||||
m.activate(req)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
case EventDisconnected:
|
case EventDisconnected:
|
||||||
for _, req := range m.reqs {
|
for _, req := range m.reqs {
|
||||||
|
|||||||
+29
-6
@@ -485,17 +485,40 @@ func TestRequestManager_Query(t *testing.T) {
|
|||||||
assert.Nil(t, closed)
|
assert.Nil(t, closed)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("returns nil nil when disconnected", func(t *testing.T) {
|
t.Run("connects within timeout and returns events", func(t *testing.T) {
|
||||||
_, envoy := newMockEnvoy(t)
|
p, envoy := newMockEnvoy(t)
|
||||||
// do not connect
|
// do NOT connect
|
||||||
|
|
||||||
m := NewRequestManager(envoy)
|
m := NewRequestManager(envoy)
|
||||||
t.Cleanup(func() { m.Close() })
|
t.Cleanup(func() { m.Close() })
|
||||||
|
|
||||||
events, closed, err := m.Query([][]byte{[]byte(`{}`)}, TestTimeout)
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
assert.NoError(t, err)
|
eventData := []byte(`{"id":"abc"}`)
|
||||||
|
|
||||||
assert.Nil(t, events)
|
go func() {
|
||||||
|
// wait to connect
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
p.connect()
|
||||||
|
|
||||||
|
// listen for REQ on pool side to extract subscription id
|
||||||
|
reqBytes := <-p.sent
|
||||||
|
subID, _, err := envelope.FindReq(reqBytes)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("FindReq: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// send event and eose
|
||||||
|
p.receive(envelope.EncloseSubscriptionEvent(subID, eventData))
|
||||||
|
p.receive(envelope.EncloseEOSE(subID))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// start the query while the peer is disconnected
|
||||||
|
// because it connects within the timeout, the query should still
|
||||||
|
// return events
|
||||||
|
events, closed, err := m.Query(filters, TestTimeout)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, events, 1)
|
||||||
assert.Nil(t, closed)
|
assert.Nil(t, closed)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user