session: ignores eose if stream; unify send and message loop into single for/select
This commit is contained in:
+13
-12
@@ -223,21 +223,22 @@ func (s *session) run() {
|
|||||||
sent := make(chan error, 1)
|
sent := make(chan error, 1)
|
||||||
go func() { sent <- s.send(s.req) }()
|
go func() { sent <- s.send(s.req) }()
|
||||||
|
|
||||||
select {
|
for {
|
||||||
case err := <-sent:
|
select {
|
||||||
if err != nil {
|
case err := <-sent:
|
||||||
s.terminate(termSendFailed)
|
if err != nil {
|
||||||
|
s.terminate(termSendFailed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-s.done:
|
||||||
|
s.terminate(termExternal)
|
||||||
return
|
return
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
s.terminate(termExternal)
|
||||||
|
return
|
||||||
|
case <-s.eose:
|
||||||
}
|
}
|
||||||
case <-s.done:
|
|
||||||
s.terminate(termExternal)
|
|
||||||
return
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
s.terminate(termExternal)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: main message loop (eose, closed, done, ctx) -- deferred to later tests
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *session) Close() {
|
func (s *session) Close() {
|
||||||
|
|||||||
+42
-10
@@ -12,6 +12,7 @@ import (
|
|||||||
// Helpers
|
// Helpers
|
||||||
|
|
||||||
type mockSessionHarness struct {
|
type mockSessionHarness struct {
|
||||||
|
ctx context.Context
|
||||||
id string
|
id string
|
||||||
filters [][]byte
|
filters [][]byte
|
||||||
req []byte
|
req []byte
|
||||||
@@ -25,6 +26,7 @@ type mockSessionHarness struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newMockSessionHarness() *mockSessionHarness {
|
func newMockSessionHarness() *mockSessionHarness {
|
||||||
|
ctx := component.MustNew(context.Background(), "prism", "test")
|
||||||
filters := [][]byte{[]byte(`{}`)}
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
id := "TESTREQ"
|
id := "TESTREQ"
|
||||||
sent := make(chan []byte, 2)
|
sent := make(chan []byte, 2)
|
||||||
@@ -36,6 +38,7 @@ func newMockSessionHarness() *mockSessionHarness {
|
|||||||
terminate := func(r terminateReason) { terminatedWith <- r }
|
terminate := func(r terminateReason) { terminatedWith <- r }
|
||||||
|
|
||||||
return &mockSessionHarness{
|
return &mockSessionHarness{
|
||||||
|
ctx: ctx,
|
||||||
id: id,
|
id: id,
|
||||||
filters: filters,
|
filters: filters,
|
||||||
req: envelope.EncloseReq(id, filters),
|
req: envelope.EncloseReq(id, filters),
|
||||||
@@ -57,10 +60,8 @@ func newMockSessionHarness() *mockSessionHarness {
|
|||||||
func TestRequestManager_Session(t *testing.T) {
|
func TestRequestManager_Session(t *testing.T) {
|
||||||
t.Run("sends req on start", func(t *testing.T) {
|
t.Run("sends req on start", func(t *testing.T) {
|
||||||
h := newMockSessionHarness()
|
h := newMockSessionHarness()
|
||||||
|
|
||||||
ctx := component.MustNew(context.Background(), "prism", "test")
|
|
||||||
s := newSession(
|
s := newSession(
|
||||||
ctx, h.id, h.req, h.eose, h.closed, h.done,
|
h.ctx, h.id, h.req, h.eose, h.closed, h.done,
|
||||||
h.send, h.terminate, false, nil)
|
h.send, h.terminate, false, nil)
|
||||||
go s.run()
|
go s.run()
|
||||||
|
|
||||||
@@ -79,12 +80,10 @@ func TestRequestManager_Session(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("terminates on failed req send", func(t *testing.T) {
|
t.Run("terminates on failed req send", func(t *testing.T) {
|
||||||
h := newMockSessionHarness()
|
h := newMockSessionHarness()
|
||||||
|
|
||||||
send := func([]byte) error { return fmt.Errorf("send failed") }
|
send := func([]byte) error { return fmt.Errorf("send failed") }
|
||||||
|
|
||||||
ctx := component.MustNew(context.Background(), "prism", "test")
|
|
||||||
s := newSession(
|
s := newSession(
|
||||||
ctx, h.id, h.req, h.eose, h.closed, h.done,
|
h.ctx, h.id, h.req, h.eose, h.closed, h.done,
|
||||||
send, h.terminate, false, nil)
|
send, h.terminate, false, nil)
|
||||||
go s.run()
|
go s.run()
|
||||||
|
|
||||||
@@ -99,10 +98,43 @@ func TestRequestManager_Session(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ignores eose if stream", func(t *testing.T) {
|
t.Run("ignores eose if stream", func(t *testing.T) {
|
||||||
// construct a session with closeOnEOSE = false
|
h := newMockSessionHarness()
|
||||||
// send a value into the eose channel
|
s := newSession(
|
||||||
// assert terminate is never called
|
h.ctx, h.id, h.req, h.eose, h.closed, h.done,
|
||||||
// assert a subsequent event still causes the session to run normally
|
h.send, h.terminate, false, nil)
|
||||||
|
go s.run()
|
||||||
|
|
||||||
|
// wait for initial REQ send before proceeding
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-h.sent:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected initial send")
|
||||||
|
|
||||||
|
h.eose <- struct{}{}
|
||||||
|
|
||||||
|
Never(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-h.terminatedWith:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "terminate should not be called on eose for stream")
|
||||||
|
|
||||||
|
// session is still running; done closes it cleanly
|
||||||
|
close(h.done)
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case r := <-h.terminatedWith:
|
||||||
|
return r == termExternal
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected termExternal after done closed")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("sends close on eose if query", func(t *testing.T) {
|
t.Run("sends close on eose if query", func(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user