Created skeleton for request manager

This commit is contained in:
Jay
2026-05-17 11:18:37 -04:00
parent 8eca61f588
commit 2a4b8ee5db
5 changed files with 395 additions and 1486 deletions
+1
View File
@@ -1 +1,2 @@
draft draft
vibe
+21 -63
View File
@@ -1,7 +1,6 @@
package prism package prism
import ( import (
"context"
"git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-roots-ws" "git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -10,63 +9,15 @@ import (
"time" "time"
) )
func TestEmbassy_TEMPLATE(t *testing.T) {
ctx := context.Background()
url := "wss://test"
connect := func(url string) error { return nil }
remove := func(url string) error { return nil }
sent := false
_ = sent
send := func(url string, data []byte) error {
sent = true
return nil
}
events := make(chan honeybee.OutboundPoolEvent)
inbox := make(chan honeybee.InboxMessage)
pool := EmbassyPlugin{
Connect: connect,
Remove: remove,
Send: send,
Events: events,
Inbox: inbox,
}
embassy := NewEmbassy(ctx, pool, nil)
embassy.Dispatch(url)
envoy := embassy.Call(url)
assert.NotNil(t, envoy)
}
func TestEmbassy_Dispatch(t *testing.T) { func TestEmbassy_Dispatch(t *testing.T) {
ctx := context.Background() p := newMockPool(t)
url := "wss://test"
connectCalled := make(chan struct{}) embassy := NewEmbassy(p.ctx, p.plugin, nil)
removeCalled := make(chan struct{}) embassy.Dispatch(p.url)
connect := func(url string) error { close(connectCalled); return nil } envoy := embassy.Call(p.url)
remove := func(url string) error { close(removeCalled); return nil }
sent := false
send := func(url string, data []byte) error {
sent = true
return nil
}
events := make(chan honeybee.OutboundPoolEvent)
inbox := make(chan honeybee.InboxMessage)
pool := EmbassyPlugin{
Connect: connect,
Remove: remove,
Send: send,
Events: events,
Inbox: inbox,
}
embassy := NewEmbassy(ctx, pool, nil)
embassy.Dispatch(url)
envoy := embassy.Call(url)
assert.NotNil(t, envoy) assert.NotNil(t, envoy)
_, ok := <-connectCalled _, ok := <-p.added
assert.False(t, ok) assert.False(t, ok)
eventSub := envoy.SubscribeEvents() eventSub := envoy.SubscribeEvents()
@@ -89,16 +40,16 @@ func TestEmbassy_Dispatch(t *testing.T) {
close(inboxDone) close(inboxDone)
}() }()
events <- honeybee.OutboundPoolEvent{ p.events <- honeybee.OutboundPoolEvent{
ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()} ID: p.url, Kind: honeybee.OutboundEventConnected, At: time.Now()}
events <- honeybee.OutboundPoolEvent{ p.events <- honeybee.OutboundPoolEvent{
ID: "wss://other", Kind: honeybee.OutboundEventConnected, At: time.Now()} ID: "wss://other", Kind: honeybee.OutboundEventConnected, At: time.Now()}
inbox <- honeybee.InboxMessage{ p.inbox <- honeybee.InboxMessage{
ID: url, ID: p.url,
Data: envelope.EncloseEvent([]byte("{}")), Data: envelope.EncloseEvent([]byte("{}")),
ReceivedAt: time.Now(), ReceivedAt: time.Now(),
} }
inbox <- honeybee.InboxMessage{ p.inbox <- honeybee.InboxMessage{
ID: "wss://other", ID: "wss://other",
Data: envelope.EncloseEvent([]byte("{}")), Data: envelope.EncloseEvent([]byte("{}")),
ReceivedAt: time.Now(), ReceivedAt: time.Now(),
@@ -116,11 +67,18 @@ func TestEmbassy_Dispatch(t *testing.T) {
"should have only gotten one inbox message") "should have only gotten one inbox message")
envoy.Send([]byte("hello")) envoy.Send([]byte("hello"))
assert.True(t, sent) Eventually(t, func() bool {
select {
default:
return false
case msg := <-p.sent:
return string(msg) == "hello"
}
}, "should have sent message")
envoy.Dismiss() envoy.Dismiss()
_, ok = <-removeCalled _, ok = <-p.removed
assert.False(t, ok) assert.False(t, ok)
_, ok = <-eventDone _, ok = <-eventDone
@@ -130,6 +88,6 @@ func TestEmbassy_Dispatch(t *testing.T) {
assert.False(t, ok) assert.False(t, ok)
// envoy no longer in embassy // envoy no longer in embassy
envoy = embassy.Call(url) envoy = embassy.Call(p.url)
assert.Nil(t, envoy) assert.Nil(t, envoy)
} }
+29 -62
View File
@@ -4,7 +4,6 @@ import (
"context" "context"
"git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component" "git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-roots-ws"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"testing" "testing"
"time" "time"
@@ -26,95 +25,63 @@ func Never(t *testing.T, condition func() bool, msg string) {
assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) assert.Never(t, condition, NegativeTestTimeout, TestTick, msg)
} }
func mustEncloseReq(id string, filters [][]byte) []byte { type mockPool struct {
return []byte(envelope.EncloseReq(id, filters)) plugin EmbassyPlugin
} ctx context.Context
url string
// managerHarness wires up a real Envoy and RequestManager backed by added chan struct{}
// controllable channels. Callers drive the envoy state and inbox by writing removed chan struct{}
// to the exported channels.
type managerHarness struct {
envoy *Envoy
manager *RequestManager
events chan honeybee.OutboundPoolEvent events chan honeybee.OutboundPoolEvent
inbox chan honeybee.InboxMessage inbox chan honeybee.InboxMessage
sent chan []byte sent chan []byte
} }
func newManagerHarness(t *testing.T) *managerHarness { func newMockPool(t *testing.T) *mockPool {
t.Helper() t.Helper()
ctx := component.MustNew(context.Background(), "prism", "test") ctx := component.MustNew(context.Background(), "prism", "test")
url := "wss://test" url := "wss://test"
events := make(chan honeybee.OutboundPoolEvent, 4) added := make(chan struct{})
removed := make(chan struct{})
events := make(chan honeybee.OutboundPoolEvent, 16)
inbox := make(chan honeybee.InboxMessage, 16) inbox := make(chan honeybee.InboxMessage, 16)
sent := make(chan []byte, 16) sent := make(chan []byte, 16)
pool := EmbassyPlugin{ plugin := EmbassyPlugin{
Connect: func(string) error { return nil }, Connect: func(url string) error { close(added); return nil },
Remove: func(string) error { return nil }, Remove: func(url string) error { close(removed); return nil },
Send: func(_ string, data []byte) error { sent <- data; return nil }, Send: func(_ string, data []byte) error { sent <- data; return nil },
Events: events, Events: events,
Inbox: inbox, Inbox: inbox,
} }
embassy := NewEmbassy(ctx, pool, nil) return &mockPool{
embassy.Dispatch(url) plugin: plugin,
envoy := embassy.Call(url) ctx: ctx,
url: url,
manager := NewRequestManager(envoy) added: added,
removed: removed,
return &managerHarness{
envoy: envoy,
manager: manager,
events: events, events: events,
inbox: inbox, inbox: inbox,
sent: sent, sent: sent,
} }
} }
// connect simulates the envoy becoming connected. func (p *mockPool) connect() {
func (h *managerHarness) connect() { p.events <- honeybee.OutboundPoolEvent{
h.events <- honeybee.OutboundPoolEvent{ ID: p.url, Kind: honeybee.OutboundEventConnected, At: time.Now()}
ID: "wss://test",
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
} }
// disconnect simulates the envoy disconnecting. func (p *mockPool) disconnect() {
func (h *managerHarness) disconnect() { p.events <- honeybee.OutboundPoolEvent{
h.events <- honeybee.OutboundPoolEvent{ ID: p.url, Kind: honeybee.OutboundEventDisconnected, At: time.Now()}
ID: "wss://test",
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
} }
// sendEvent delivers an EVENT envelope for the given subID to the inbox. func (p *mockPool) receive(data []byte) {
func (h *managerHarness) sendEvent(subID string, eventData []byte) { p.inbox <- honeybee.InboxMessage{
h.inbox <- honeybee.InboxMessage{ ID: p.url,
ID: "wss://test", Data: data,
Data: envelope.EncloseSubscriptionEvent(subID, eventData),
ReceivedAt: time.Now(),
}
}
// sendEOSE delivers an EOSE envelope for the given subID to the inbox.
func (h *managerHarness) sendEOSE(subID string) {
h.inbox <- honeybee.InboxMessage{
ID: "wss://test",
Data: envelope.EncloseEOSE(subID),
ReceivedAt: time.Now(),
}
}
// sendClosed delivers a CLOSED envelope for the given subID to the inbox.
func (h *managerHarness) sendClosed(subID, message string) {
h.inbox <- honeybee.InboxMessage{
ID: "wss://test",
Data: envelope.EncloseClosed(subID, message),
ReceivedAt: time.Now(), ReceivedAt: time.Now(),
} }
} }
+149 -541
View File
@@ -13,60 +13,7 @@ import (
) )
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Parsed inbox message types // Types
// ----------------------------------------------------------------------------
type inboxEvent struct {
subID string
data []byte
receivedAt time.Time
}
type inboxEOSE struct {
subID string
receivedAt time.Time
}
type inboxClosed struct {
subID string
message string
receivedAt time.Time
}
// ----------------------------------------------------------------------------
// Session inbox (per-session typed channels)
// ----------------------------------------------------------------------------
type sessionInbox struct {
events chan inboxEvent
eose chan inboxEOSE
closed chan inboxClosed
}
const sessionInboxBuffer = 64
func newSessionInbox() *sessionInbox {
return &sessionInbox{
events: make(chan inboxEvent, sessionInboxBuffer),
eose: make(chan inboxEOSE, 1),
closed: make(chan inboxClosed, 1),
}
}
// ----------------------------------------------------------------------------
// Registration (durable subscription identity)
// ----------------------------------------------------------------------------
type registration struct {
filters [][]byte
eventsIn chan ReqEvent
eventsOut <-chan ReqEvent
closed chan ReqClosed
deregister sync.Once
}
// ----------------------------------------------------------------------------
// Output types
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type ReqEvent struct { type ReqEvent struct {
@@ -81,234 +28,12 @@ type ReqClosed struct {
Data string Data string
} }
// ----------------------------------------------------------------------------
// Session options
// ----------------------------------------------------------------------------
type sessionOptions struct {
eoseClose bool
deregister func()
inbox *sessionInbox
forwardEvents chan<- ReqEvent
forwardClosed chan<- ReqClosed
}
type SessionOption func(*sessionOptions)
func withEOSEClose() SessionOption {
return func(o *sessionOptions) {
o.eoseClose = true
}
}
func withDeregister(fn func()) SessionOption {
return func(o *sessionOptions) {
o.deregister = fn
}
}
func withSessionInbox(si *sessionInbox) SessionOption {
return func(o *sessionOptions) {
o.inbox = si
}
}
func withForwardEvents(ch chan<- ReqEvent) SessionOption {
return func(o *sessionOptions) {
o.forwardEvents = ch
}
}
func withForwardClosed(ch chan<- ReqClosed) SessionOption {
return func(o *sessionOptions) {
o.forwardClosed = ch
}
}
// ----------------------------------------------------------------------------
// Session
// ----------------------------------------------------------------------------
type session struct {
id string
req []byte
send func([]byte) error
done <-chan struct{}
terminate func()
deregister func()
eoseClose bool
inbox *sessionInbox
forwardEvents chan<- ReqEvent
forwardClosed chan<- ReqClosed
ctx context.Context
cancel context.CancelFunc
once sync.Once
}
func newSession(
id string,
req []byte,
send func([]byte) error,
done <-chan struct{},
terminate func(),
opts ...SessionOption,
) *session {
o := &sessionOptions{
deregister: func() {},
}
for _, opt := range opts {
opt(o)
}
ctx, cancel := context.WithCancel(context.Background())
return &session{
id: id,
req: req,
send: send,
done: done,
terminate: terminate,
deregister: o.deregister,
eoseClose: o.eoseClose,
inbox: o.inbox,
forwardEvents: o.forwardEvents,
forwardClosed: o.forwardClosed,
ctx: ctx,
cancel: cancel,
}
}
func (s *session) run() {
defer s.exit()
// Send step: launch send in goroutine, wait for result or done.
sent := make(chan error, 1)
go func() { sent <- s.send(s.req) }()
select {
case <-s.done:
return
case <-s.ctx.Done():
return
case err := <-sent:
if err != nil {
return
}
}
if s.inbox == nil {
return
}
// Message loop.
for {
select {
case <-s.done:
return
case <-s.ctx.Done():
s.send(envelope.EncloseClose(s.id)) //nolint:errcheck
return
case ev, ok := <-s.inbox.events:
if !ok {
return
}
if s.forwardEvents != nil {
select {
case <-s.done:
return
case <-s.ctx.Done():
return
case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}:
}
}
case _, ok := <-s.inbox.eose:
if !ok {
return
}
if s.eoseClose {
// Drain buffered events before closing.
for {
select {
case ev, ok := <-s.inbox.events:
if !ok {
s.send(envelope.EncloseClose(s.id)) //nolint:errcheck
return
}
if s.forwardEvents != nil {
select {
case <-s.done:
return
case <-s.ctx.Done():
return
case s.forwardEvents <- ReqEvent{ReceivedAt: ev.receivedAt, Data: ev.data}:
}
}
default:
s.send(envelope.EncloseClose(s.id)) //nolint:errcheck
return
}
}
}
case cl, ok := <-s.inbox.closed:
if !ok {
return
}
if s.forwardClosed != nil {
select {
case <-s.done:
case <-s.ctx.Done():
case s.forwardClosed <- ReqClosed{ReceivedAt: cl.receivedAt, Data: cl.message}:
}
}
s.doDeregister()
return
}
}
}
func (s *session) exit() {
s.once.Do(func() {
s.terminate()
})
}
func (s *session) doDeregister() {
s.once.Do(func() {
s.terminate()
})
s.deregister()
}
func (s *session) Close() {
s.cancel()
}
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
var encoder = base32.StdEncoding.WithPadding(base32.NoPadding)
func generateID() string {
b := make([]byte, 5)
_, err := rand.Read(b)
if err != nil {
panic(fmt.Sprintf("generateID: %v", err))
}
return encoder.EncodeToString(b)
}
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
type RequestManager struct { type RequestManager struct {
regs map[string]*registration reqs map[string]*request
sessions map[string]*session sessions map[string]*session
inboxSubs map[string]*sessionInbox inboxSubs map[string]*sessionSub
done chan struct{} done chan struct{}
reqWg sync.WaitGroup sessionWg sync.WaitGroup
envoy *Envoy envoy *Envoy
events <-chan OutboundPoolEvent events <-chan OutboundPoolEvent
@@ -322,142 +47,114 @@ type RequestManager struct {
logger *slog.Logger logger *slog.Logger
} }
func NewRequestManager(envoy *Envoy) *RequestManager { type request struct {
id string
filters [][]byte
buffer chan ReqEvent
events chan ReqEvent
closed chan ReqClosed
once sync.Once
}
type session struct {
id string
req []byte
eose <-chan struct{}
closed <-chan struct{}
done chan struct{}
send func([]byte) error
terminate func(terminateReason)
closeOnEOSE bool
ctx context.Context
cancel context.CancelFunc
logger *slog.Logger
}
type sessionSub struct {
eose chan<- struct{}
closed chan<- struct{}
}
type terminateReason int
const (
termSendFailed terminateReason = iota
termCloseSent
termReceivedClosed
termExternal
)
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
var encoder = base32.StdEncoding.WithPadding(base32.NoPadding)
func generateID() string {
b := make([]byte, 5)
if _, err := rand.Read(b); err != nil {
panic(fmt.Sprintf("generateID: %v", err))
}
return encoder.EncodeToString(b)
}
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
func NewRequestManager(e *Envoy) *RequestManager {
ctx, cancel := context.WithCancel( ctx, cancel := context.WithCancel(
component.MustExtend(envoy.Context(), "request_manager")) component.MustExtend(e.Context(), "request_manager"))
m := &RequestManager{ m := &RequestManager{
regs: make(map[string]*registration), reqs: make(map[string]*request),
sessions: make(map[string]*session), sessions: make(map[string]*session),
inboxSubs: make(map[string]*sessionInbox), inboxSubs: make(map[string]*sessionSub),
envoy: envoy,
events: envoy.SubscribeEvents(), envoy: e,
inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), events: e.SubscribeEvents(),
inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
if h := envoy.Handler(); h != nil { if h := e.Handler(); h != nil {
comp := component.FromContext(ctx) comp := component.FromContext(ctx)
m.handler = h m.handler = h
m.logger = slog.New(h).With(slog.Any("component", comp)) m.logger = slog.New(h).With(slog.Any("component", comp))
} }
m.wg.Add(2) // start event handler
go m.handleEvents() // start inbox router
go m.routeInbox()
return m return m
} }
func (m *RequestManager) Stream(filters [][]byte) (string, <-chan ReqEvent, <-chan ReqClosed) { func (m *RequestManager) Stream(
id := generateID() filters [][]byte,
) (string, <-chan ReqEvent, <-chan ReqClosed) {
evIn := make(chan ReqEvent) // generate id
evOut := make(chan ReqEvent) // create channels
cl := make(chan ReqClosed, 1) // register request
// spawn session if connected
reg := &registration{ return "", nil, nil
filters: filters,
eventsIn: evIn,
closed: cl,
}
go bufferedPipe(evIn, evOut)
reg.eventsOut = evOut
m.mu.Lock()
m.regs[id] = reg
if m.envoy.IsConnected() {
m.spawnSessionLock(id, reg)
}
m.mu.Unlock()
return id, reg.eventsOut, reg.closed
} }
func (m *RequestManager) Query(filters [][]byte, timeout time.Duration) ([]ReqEvent, *ReqClosed) { func (m *RequestManager) Query(
if !m.envoy.IsConnected() { filters [][]byte,
return nil, nil timeout time.Duration,
} ) (events []ReqEvent, closed *ReqClosed) {
// return if disconnected
ctx, cancel := context.WithTimeout(m.ctx, timeout) // generate id
defer cancel() // create channels
// spawn session
id := generateID() // collect events
si := newSessionInbox() return
// Buffered collection channels so the session can forward without blocking.
evCh := make(chan ReqEvent, sessionInboxBuffer)
clCh := make(chan ReqClosed, 1)
sessionDone := make(chan struct{})
m.mu.Lock()
m.inboxSubs[id] = si
m.mu.Unlock()
terminate := func() {
m.mu.Lock()
delete(m.inboxSubs, id)
m.mu.Unlock()
m.reqWg.Done()
close(sessionDone)
}
m.reqWg.Add(1)
s := newSession(
id,
envelope.EncloseReq(id, filters),
m.envoy.Send,
m.done,
terminate,
withEOSEClose(),
withSessionInbox(si),
withForwardEvents(evCh),
withForwardClosed(clCh),
)
go s.run()
var events []ReqEvent
var closed *ReqClosed
// Wait for the session to finish, or timeout.
select {
case <-ctx.Done():
s.Close()
<-sessionDone
case <-sessionDone:
}
// Drain whatever the session forwarded.
for {
select {
case ev := <-evCh:
events = append(events, ev)
default:
goto drained
}
}
drained:
select {
case cl := <-clCh:
closed = &cl
default:
}
return events, closed
}
func (m *RequestManager) Cancel(id string) error {
m.mu.Lock()
defer m.mu.Unlock()
s, ok := m.sessions[id]
if !ok {
return fmt.Errorf("session not found: %s", id)
}
s.Close()
return nil
} }
func (m *RequestManager) Close() { func (m *RequestManager) Close() {
@@ -465,163 +162,74 @@ func (m *RequestManager) Close() {
m.wg.Wait() m.wg.Wait()
} }
func (m *RequestManager) spawnSessionLock(id string, reg *registration) {
si := newSessionInbox()
m.inboxSubs[id] = si
terminate := func() {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.inboxSubs, id)
delete(m.sessions, id)
m.reqWg.Done()
}
deregister := func() {
m.mu.Lock()
defer m.mu.Unlock()
reg.deregister.Do(func() {
delete(m.regs, id)
close(reg.eventsIn)
reg.closed <- ReqClosed{}
close(reg.closed)
})
}
m.reqWg.Add(1)
s := newSession(
id,
envelope.EncloseReq(id, reg.filters),
m.envoy.Send,
m.done,
terminate,
withDeregister(deregister),
withSessionInbox(si),
withForwardEvents(reg.eventsIn),
withForwardClosed(reg.closed),
)
m.sessions[id] = s
go s.run()
}
func (m *RequestManager) start() { func (m *RequestManager) start() {
m.mu.Lock() // start all request sessions
defer m.mu.Unlock()
m.done = make(chan struct{})
for id, reg := range m.regs {
if _, active := m.sessions[id]; !active {
m.spawnSessionLock(id, reg)
}
}
} }
func (m *RequestManager) stop() { func (m *RequestManager) stop() {
m.mu.Lock() // stop all running sessions
done := m.done
m.mu.Unlock()
if done != nil {
close(done)
}
m.reqWg.Wait()
m.mu.Lock()
m.sessions = make(map[string]*session)
m.inboxSubs = make(map[string]*sessionInbox)
m.mu.Unlock()
} }
func (m *RequestManager) handleEvents() { func (m *RequestManager) handleEvents() {
defer m.wg.Done() defer m.wg.Done()
for { // start/stop sessions on connect/disconnect
select {
case <-m.ctx.Done():
return
case ev, ok := <-m.events:
if !ok {
return
}
switch ev.Kind {
case EventConnected:
m.start()
case EventDisconnected:
m.stop()
}
}
}
} }
func (m *RequestManager) routeInbox() { func (m *RequestManager) routeInbox() {
defer m.wg.Done() defer m.wg.Done()
for { // unpack/route inbox message
select { // events forward directly to request event buffer
case <-m.ctx.Done(): // eose goes to session
return // closed goes both to session and request
case msg, ok := <-m.inbox: // uses read lock for map lookups
if !ok { }
return
} // ----------------------------------------------------------------------------
// Session
label, err := envelope.GetLabel(msg.Data) // ----------------------------------------------------------------------------
if err != nil {
continue func newSession(
} ctx context.Context,
id string,
switch string(label) { req []byte,
case "EVENT": eose <-chan struct{},
subID, data, err := envelope.FindSubscriptionEvent(msg.Data) closed <-chan struct{},
if err != nil { done chan struct{},
continue send func(data []byte) error,
} terminate func(terminateReason),
m.mu.RLock() isQuery bool,
si, ok := m.inboxSubs[subID] handler slog.Handler,
m.mu.RUnlock() ) *session {
if !ok { ctx, cancel := context.WithCancel(component.MustExtend(ctx, "session"))
continue s := &session{
} id: id,
select { req: req,
case <-m.ctx.Done(): eose: eose,
return closed: closed,
case si.events <- inboxEvent{subID: subID, data: data, receivedAt: msg.ReceivedAt}: done: done,
} send: send,
terminate: terminate,
case "EOSE": closeOnEOSE: isQuery,
subID, err := envelope.FindEOSE(msg.Data) ctx: ctx,
if err != nil { cancel: cancel,
continue }
} // create logger if handler is supplied
m.mu.RLock() // run main loop
si, ok := m.inboxSubs[subID] return s
m.mu.RUnlock() }
if !ok {
continue func (s *session) run() {
} var tr terminateReason
select { defer s.terminate(tr)
case <-m.ctx.Done():
return // send inital req
case si.eose <- inboxEOSE{subID: subID, receivedAt: msg.ReceivedAt}:
} // run main loop
// switch on done, context, eose, and closed -- terminal paths
case "CLOSED": }
subID, message, err := envelope.FindClosed(msg.Data)
if err != nil { func (s *session) Close() {
continue s.cancel()
}
m.mu.RLock()
si, ok := m.inboxSubs[subID]
m.mu.RUnlock()
if !ok {
continue
}
select {
case <-m.ctx.Done():
return
case si.closed <- inboxClosed{subID: subID, message: message, receivedAt: msg.ReceivedAt}:
}
}
}
}
} }
+190 -815
View File
File diff suppressed because it is too large Load Diff