From 6066639863108a0213bbaa9a7850f4aff39aeaa0 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 18 May 2026 16:44:46 -0400 Subject: [PATCH] fix inbox sub close panic on envoy --- embassy.go | 55 ++++++++++++++++++++++++++------------------------- envoy_test.go | 10 ++++++++++ 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/embassy.go b/embassy.go index 7ac30eb..ce08e52 100644 --- a/embassy.go +++ b/embassy.go @@ -69,15 +69,16 @@ type Embassy struct { } type Envoy struct { - url string - connected bool - terminate func() - queue chan []byte - send func(data []byte) error - events <-chan OutboundPoolEvent - inbox <-chan InboxMessage - eventSubs []chan<- OutboundPoolEvent - inboxSubs map[string][]chan<- InboxMessage + url string + connected bool + terminate func() + queue chan []byte + send func(data []byte) error + events <-chan OutboundPoolEvent + inbox <-chan InboxMessage + eventSubs []chan<- OutboundPoolEvent + labelledInboxSubs map[string][]chan<- InboxMessage + inboxSubs []chan<- InboxMessage ctx context.Context cancel context.CancelFunc @@ -312,15 +313,15 @@ func newEnvoy( ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy")) e := &Envoy{ - url: url, - terminate: terminate, - queue: make(chan []byte), - send: send, - events: events, - inbox: inbox, - inboxSubs: make(map[string][]chan<- InboxMessage), - ctx: ctx, - cancel: cancel, + url: url, + terminate: terminate, + queue: make(chan []byte), + send: send, + events: events, + inbox: inbox, + labelledInboxSubs: make(map[string][]chan<- InboxMessage), + ctx: ctx, + cancel: cancel, } if handler != nil { @@ -362,14 +363,13 @@ func (e *Envoy) Dismiss() { close(sub) } - for _, subs := range e.inboxSubs { - for _, sub := range subs { - close(sub) - } + for _, sub := range e.inboxSubs { + close(sub) } e.eventSubs = nil - e.inboxSubs = make(map[string][]chan<- InboxMessage) + e.inboxSubs = nil + e.labelledInboxSubs = make(map[string][]chan<- InboxMessage) } func (e *Envoy) Send(data []byte) error { @@ -388,11 +388,12 @@ func (e *Envoy) SubscribeInbox(labels []string) <-chan InboxMessage { e.mu.Lock() defer e.mu.Unlock() ch := make(chan InboxMessage) + e.inboxSubs = append(e.inboxSubs, ch) for _, label := range labels { - if _, ok := e.inboxSubs[label]; !ok { - e.inboxSubs[label] = make([]chan<- InboxMessage, 0) + if _, ok := e.labelledInboxSubs[label]; !ok { + e.labelledInboxSubs[label] = make([]chan<- InboxMessage, 0) } - e.inboxSubs[label] = append(e.inboxSubs[label], ch) + e.labelledInboxSubs[label] = append(e.labelledInboxSubs[label], ch) } return ch } @@ -448,7 +449,7 @@ func (e *Envoy) routeInbox() { } e.mu.RLock() - subs, ok := e.inboxSubs[string(label)] + subs, ok := e.labelledInboxSubs[string(label)] e.mu.RUnlock() if !ok { diff --git a/envoy_test.go b/envoy_test.go index 5d1408a..8c020ab 100644 --- a/envoy_test.go +++ b/envoy_test.go @@ -22,11 +22,21 @@ func TestEnvoy_Dismiss(t *testing.T) { } envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil) + + eventSub := envoy.SubscribeEvents() + inboxSub := envoy.SubscribeInbox([]string{"A", "B"}) + envoy.Dismiss() mu.RLock() defer mu.RUnlock() assert.True(t, terminated) + + _, ok := <-eventSub + assert.False(t, ok) + + _, ok = <-inboxSub + assert.False(t, ok) } func TestEnvoy_Send(t *testing.T) {