Wrote embassy

This commit is contained in:
Jay
2026-05-09 17:36:03 -04:00
parent e14f2b83a5
commit e909e140a8
6 changed files with 477 additions and 15 deletions
+184 -7
View File
@@ -2,7 +2,9 @@ package prism
import ( import (
"context" "context"
"fmt"
"git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-honeybee"
"git.wisehodl.dev/jay/go-mana-component"
"log/slog" "log/slog"
"sync" "sync"
"time" "time"
@@ -39,6 +41,7 @@ type PoolEventKind = int
const ( const (
EventConnected PoolEventKind = iota EventConnected PoolEventKind = iota
EventDisconnected EventDisconnected
EventAdded
EventRemoved EventRemoved
) )
@@ -48,6 +51,15 @@ type PoolEvent struct {
At time.Time At time.Time
} }
func NewPoolEvent(id string, kind PoolEventKind, at time.Time) PoolEvent {
return PoolEvent{ID: id, Kind: kind, At: at}
}
var convertPoolEvent = map[honeybee.OutboundPoolEventKind]PoolEventKind{
honeybee.OutboundEventConnected: EventConnected,
honeybee.OutboundEventDisconnected: EventDisconnected,
}
// Adapter // Adapter
type Adapter interface { type Adapter interface {
@@ -92,40 +104,205 @@ type Hotel struct {
// Embassy (Outbound Adapter) // Embassy (Outbound Adapter)
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func NewEmbassy() *Embassy { func NewEmbassy(
return nil ctx context.Context,
pool EmbassyPlugin,
jc *JournalCollector,
handler slog.Handler,
) *Embassy {
ctx, cancel := context.WithCancel(
component.MustNew(ctx, "prism", "embassy"))
e := &Embassy{
pool: pool,
peers: make(map[string]bool),
eventSubs: make([]chan PoolEvent, 0),
ctx: ctx,
cancel: cancel,
}
if jc != nil {
e.journals = make(chan JournalEntry, 16)
jc.Enroll(e.journals)
}
if handler != nil {
c, ok := component.Get(ctx)
if ok {
e.logger = slog.New(handler).With(slog.Any("component", c))
}
}
e.wg.Add(1)
go e.runEventRouter()
return e
} }
func (e *Embassy) Dispatch(url string) error { func (e *Embassy) Dispatch(url string) error {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return fmt.Errorf("invalid url: %s", url)
}
if err := e.pool.Connect(url); err != nil {
return fmt.Errorf("dispatch: %w", err)
}
e.mu.Lock()
e.peers[url] = false
subs := e.eventSubs
e.mu.Unlock()
for _, ch := range subs {
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case ch <- NewPoolEvent(url, EventAdded, time.Now()):
}
}
return nil return nil
} }
func (e *Embassy) Dismiss(url string) error { func (e *Embassy) Dismiss(url string) error {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return fmt.Errorf("invalid url: %s", url)
}
if err := e.pool.Remove(url); err != nil {
return fmt.Errorf("dismiss: %w", err)
}
e.mu.Lock()
delete(e.peers, url)
subs := e.eventSubs
e.mu.Unlock()
for _, ch := range subs {
select {
case <-e.ctx.Done():
return fmt.Errorf("closing")
case ch <- NewPoolEvent(url, EventRemoved, time.Now()):
}
}
return nil return nil
} }
func (e *Embassy) Close() {} func (e *Embassy) Close() {
e.mu.Lock()
peers := e.peers
e.peers = make(map[string]bool)
e.mu.Unlock()
// dismiss peers
for peer, _ := range peers {
e.Dismiss(peer)
}
e.cancel()
e.wg.Wait()
e.mu.Lock()
subs := e.eventSubs
e.eventSubs = make([]chan PoolEvent, 0)
e.mu.Unlock()
// close subs
for _, sub := range subs {
close(sub)
}
}
func (e *Embassy) Peers() []string { func (e *Embassy) Peers() []string {
return nil e.mu.RLock()
defer e.mu.RUnlock()
peers := make([]string, 0, len(e.peers))
for p, _ := range e.peers {
peers = append(peers, p)
}
return peers
} }
func (e *Embassy) HasPeer(id string) bool { func (e *Embassy) HasPeer(url string) bool {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return false return false
}
e.mu.RLock()
defer e.mu.RUnlock()
_, ok := e.peers[url]
return ok
} }
func (e *Embassy) IsConnected(id string) bool { func (e *Embassy) IsConnected(url string) bool {
url, err := honeybee.NormalizeURL(url)
if err != nil {
return false return false
}
e.mu.RLock()
defer e.mu.RUnlock()
connected, _ := e.peers[url]
return connected
} }
func (e *Embassy) Subscribe() <-chan PoolEvent { func (e *Embassy) Subscribe() <-chan PoolEvent {
return nil e.mu.Lock()
defer e.mu.Unlock()
ch := make(chan PoolEvent, 16)
e.eventSubs = append(e.eventSubs, ch)
return ch
} }
func (e *Embassy) Send(id string, data Envelope) error { func (e *Embassy) Send(id string, data Envelope) error {
return nil return nil
} }
// Internal
func (e *Embassy) runEventRouter() {
defer e.wg.Done()
for {
select {
case <-e.ctx.Done():
return
case ev, ok := <-e.pool.Events:
if !ok {
return
}
kind := convertPoolEvent[ev.Kind]
e.mu.Lock()
switch kind {
case EventConnected:
e.peers[ev.ID] = true
case EventDisconnected:
e.peers[ev.ID] = false
}
subs := e.eventSubs
e.mu.Unlock()
for _, ch := range subs {
select {
case <-e.ctx.Done():
case ch <- NewPoolEvent(ev.ID, kind, ev.At):
}
}
}
}
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Hotel (Inbound Adapter) // Hotel (Inbound Adapter)
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
+245
View File
@@ -0,0 +1,245 @@
package prism
import (
"context"
// "fmt"
"git.wisehodl.dev/jay/go-honeybee"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestEmbassyPoolEvents(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub := e.Subscribe()
t.Run("added then removed", func(t *testing.T) {
err := e.Dispatch("wss://test")
assert.NoError(t, err)
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventAdded
}
}, "expected added event")
err = e.Dismiss("wss://test")
assert.NoError(t, err)
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventRemoved
}
}, "expected removed event")
})
t.Run("connected", func(t *testing.T) {
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventConnected
}
}, "expected connected event")
})
t.Run("disconnected", func(t *testing.T) {
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub:
return ev.Kind == EventDisconnected
}
}, "expected disconnected event")
})
}
func TestEmbassyPeerRegistry(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
// add
e.Dispatch("wss://test")
assert.True(t, e.HasPeer("wss://test"))
assert.False(t, e.IsConnected("wss://test"))
// connect
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventConnected,
At: time.Now(),
}
Eventually(t, func() bool {
exists := e.HasPeer("wss://test")
connected := e.IsConnected("wss://test")
return exists && connected
}, "expected: exists, connected")
// disconnect
eventsCh <- honeybee.OutboundPoolEvent{
ID: "wss://test",
Kind: honeybee.OutboundEventDisconnected,
At: time.Now(),
}
Eventually(t, func() bool {
exists := e.HasPeer("wss://test")
connected := e.IsConnected("wss://test")
return exists && !connected
}, "expected: exists, disconnected")
// remove
e.Dismiss("wss://test")
assert.False(t, e.HasPeer("wss://test"))
assert.False(t, e.IsConnected("wss://test"))
}
func TestEmbassyPeers(t *testing.T) {
ctx := context.Background()
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: nil,
}
e := NewEmbassy(ctx, pool, nil, nil)
assert.Len(t, e.Peers(), 0)
e.Dispatch("wss://test1")
e.Dispatch("wss://test2")
assert.Len(t, e.Peers(), 2)
e.Dismiss("wss://test2")
assert.Len(t, e.Peers(), 1)
}
func TestEmbassySubFanout(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub1 := e.Subscribe()
sub2 := e.Subscribe()
e.Dispatch("wss://test")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub1:
return ev.Kind == EventAdded
}
}, "expected added event on sub1")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub2:
return ev.Kind == EventAdded
}
}, "expected added event on sub2")
}
func TestEmbassyClose(t *testing.T) {
ctx := context.Background()
eventsCh := make(chan honeybee.OutboundPoolEvent, 1)
pool := EmbassyPlugin{
Connect: func(id string) error { return nil },
Remove: func(id string) error { return nil },
Send: func(id string, data []byte) error { return nil },
Events: eventsCh,
}
e := NewEmbassy(ctx, pool, nil, nil)
sub1 := e.Subscribe()
sub2 := e.Subscribe()
e.Dispatch("wss://test")
e.Close()
// peer gets removed
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub1:
return ev.ID == "wss://test" && ev.Kind == EventRemoved
}
}, "expected peer removed")
Eventually(t, func() bool {
select {
default:
return false
case ev := <-sub2:
return ev.ID == "wss://test" && ev.Kind == EventRemoved
}
}, "expected peer removed")
// peer list is empty
assert.False(t, e.HasPeer("wss://test"))
assert.Len(t, e.Peers(), 0)
// subs close
Eventually(t, func() bool {
_, ok1 := <-sub1
_, ok2 := <-sub2
return !ok1 && !ok2
}, "subs should close")
}
+10 -3
View File
@@ -3,7 +3,14 @@ module git.wisehodl.dev/jay/go-mana-prism
go 1.25.0 go 1.25.0
require ( require (
git.wisehodl.dev/jay/go-honeybee v0.2.0 // indirect git.wisehodl.dev/jay/go-honeybee v0.2.0
git.wisehodl.dev/jay/go-roots-ws v0.1.0 // indirect github.com/stretchr/testify v1.11.1
github.com/gorilla/websocket v1.5.3 // indirect )
require (
git.wisehodl.dev/jay/go-mana-component v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
) )
+12 -2
View File
@@ -1,6 +1,16 @@
git.wisehodl.dev/jay/go-honeybee v0.2.0 h1:bF+/7WQzJnGBv5VuPBkWjshWWMbK4PZy8gia7AtVxt0= git.wisehodl.dev/jay/go-honeybee v0.2.0 h1:bF+/7WQzJnGBv5VuPBkWjshWWMbK4PZy8gia7AtVxt0=
git.wisehodl.dev/jay/go-honeybee v0.2.0/go.mod h1:Xf3atUWJ2JgWVYpTBBxSgzL3ELdAo0znpqwpBZk9DlA= git.wisehodl.dev/jay/go-honeybee v0.2.0/go.mod h1:Xf3atUWJ2JgWVYpTBBxSgzL3ELdAo0znpqwpBZk9DlA=
git.wisehodl.dev/jay/go-roots-ws v0.1.0 h1:p1veCkpOmL26N//Qz7ekJOYj1Ck30ai4OKq9dxLjodk= git.wisehodl.dev/jay/go-mana-component v0.1.0 h1:wWYN5MzC9Hq3tEt4z7FjrwNuQz3rZY3RWAmgmNE8EZE=
git.wisehodl.dev/jay/go-roots-ws v0.1.0/go.mod h1:ANQOOP13lHs2uQwYhrSQGAlL7+zR6QvbLzNPmNBJssQ= git.wisehodl.dev/jay/go-mana-component v0.1.0/go.mod h1:r2ZaTjKzwV5JJfC5boikxtjAKusPrzlJU/7qul0EUqA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+23
View File
@@ -0,0 +1,23 @@
package prism
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
const (
TestTimeout = 2 * time.Second
TestTick = 10 * time.Millisecond
NegativeTestTimeout = 100 * time.Millisecond
)
func Eventually(t *testing.T, condition func() bool, msg string) {
t.Helper()
assert.Eventually(t, condition, TestTimeout, TestTick, msg)
}
func Never(t *testing.T, condition func() bool, msg string) {
t.Helper()
assert.Never(t, condition, NegativeTestTimeout, TestTick, msg)
}
+1 -1
View File
@@ -53,7 +53,7 @@ func NewJournalCollector() *JournalCollector {
return nil return nil
} }
func (c *JournalCollector) Enroll() {} func (c *JournalCollector) Enroll(ch <-chan JournalEntry) {}
func (c *JournalCollector) Close() {} func (c *JournalCollector) Close() {}