package prism import ( "context" "fmt" "git.wisehodl.dev/jay/go-honeybee" "git.wisehodl.dev/jay/go-mana-component" "git.wisehodl.dev/jay/go-mana-prism/observer" "git.wisehodl.dev/jay/go-roots-ws" "log/slog" "sync" "time" ) // ---------------------------------------------------------------------------- // Types // ---------------------------------------------------------------------------- type EmbassyPlugin struct { Connect func(url string) error Remove func(url string) error Send func(url string, data []byte) error Events <-chan honeybee.PoolEvent Inbox <-chan honeybee.InboxMessage } type EmbassyEventKind int const ( EventConnected EmbassyEventKind = iota EventDisconnected EventEmbassyUnknown ) func mapEmbassyEvent(kind honeybee.PoolEventKind) EmbassyEventKind { switch kind { case honeybee.EventConnected: return EventConnected case honeybee.EventDisconnected: return EventDisconnected default: return EventEmbassyUnknown } } type OutboundPoolEvent struct { ID string Kind EmbassyEventKind At time.Time } type InboxMessage struct { ID string Data []byte ReceivedAt time.Time } // ---------------------------------------------------------------------------- // Options // ---------------------------------------------------------------------------- type EmbassyConfig struct { handler slog.Handler observer observer.Observer } type EmbassyOption func(*EmbassyConfig) func WithEmbassyHandler(h slog.Handler) EmbassyOption { return func(c *EmbassyConfig) { c.handler = h } } func WithEmbassyObserver(o observer.Observer) EmbassyOption { return func(c *EmbassyConfig) { c.observer = o } } // ---------------------------------------------------------------------------- // Embassy // ---------------------------------------------------------------------------- type Embassy struct { pool EmbassyPlugin envoys map[string]*Envoy eventSubs map[string]chan<- OutboundPoolEvent inboxSubs map[string]chan<- InboxMessage ctx context.Context cancel context.CancelFunc mu sync.RWMutex wg sync.WaitGroup observer observer.Observer handler slog.Handler logger *slog.Logger } func NewEmbassy( ctx context.Context, pool EmbassyPlugin, opts ...EmbassyOption, ) *Embassy { ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy")) cfg := EmbassyConfig{observer: observer.NullObserver{}} for _, o := range opts { o(&cfg) } e := &Embassy{ pool: pool, envoys: make(map[string]*Envoy), eventSubs: make(map[string]chan<- OutboundPoolEvent), inboxSubs: make(map[string]chan<- InboxMessage), ctx: ctx, cancel: cancel, observer: cfg.observer, handler: cfg.handler, } if e.handler != nil { comp := component.FromContext(ctx) e.logger = slog.New(cfg.handler).With(slog.Any("component", comp)) } e.wg.Add(2) go e.routeEvents() go e.routeInbox() return e } func (e *Embassy) Dispatch(url string) error { url, err := honeybee.NormalizeURL(url) if err != nil { return fmt.Errorf("invalid url: %w", err) } e.mu.RLock() _, exists := e.envoys[url] if exists { e.mu.RUnlock() return fmt.Errorf("already dispatched: %s", url) } e.mu.RUnlock() e.mu.Lock() e.pool.Connect(url) terminate := func() { e.dismiss(url) } send := func(data []byte) error { return e.send(url, data) } events := e.subscribeEventsLock(url) inbox := e.subscribeInboxLock(url) e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.observer, e.handler) e.mu.Unlock() return nil } func (e *Embassy) Envoys() []string { e.mu.RLock() defer e.mu.RUnlock() var envoys []string for url := range e.envoys { envoys = append(envoys, url) } return envoys } func (e *Embassy) Call(url string) *Envoy { url, err := honeybee.NormalizeURL(url) if err != nil { return nil } e.mu.RLock() defer e.mu.RUnlock() envoy, ok := e.envoys[url] if !ok { return nil } return envoy } func (e *Embassy) Close() { e.cancel() e.wg.Wait() } func (e *Embassy) dismiss(url string) { e.mu.Lock() defer e.mu.Unlock() e.pool.Remove(url) e.unsubscribeEventsLock(url) e.unsubscribeInboxLock(url) delete(e.envoys, url) } func (e *Embassy) send(url string, data []byte) error { return e.pool.Send(url, data) } func (e *Embassy) subscribeEventsLock(url string) <-chan OutboundPoolEvent { ch := make(chan OutboundPoolEvent) e.eventSubs[url] = ch return ch } func (e *Embassy) unsubscribeEventsLock(url string) { ch, ok := e.eventSubs[url] if !ok { return } close(ch) delete(e.eventSubs, url) } func (e *Embassy) subscribeInboxLock(url string) <-chan InboxMessage { ch := make(chan InboxMessage) e.inboxSubs[url] = ch return ch } func (e *Embassy) unsubscribeInboxLock(url string) { ch, ok := e.inboxSubs[url] if !ok { return } close(ch) delete(e.inboxSubs, url) } func (e *Embassy) routeEvents() { defer e.wg.Done() for { select { case <-e.ctx.Done(): return case ev, ok := <-e.pool.Events: if !ok { return } url, err := honeybee.NormalizeURL(ev.ID) if err != nil { continue } e.mu.RLock() sub, ok := e.eventSubs[url] e.mu.RUnlock() if !ok { continue } select { case <-e.ctx.Done(): return case sub <- OutboundPoolEvent{ ID: ev.ID, Kind: mapEmbassyEvent(ev.Kind), At: ev.At, }: } } } } func (e *Embassy) routeInbox() { defer e.wg.Done() for { select { case <-e.ctx.Done(): return case ev, ok := <-e.pool.Inbox: if !ok { return } url, err := honeybee.NormalizeURL(ev.ID) if err != nil { continue } e.mu.RLock() sub, ok := e.inboxSubs[url] e.mu.RUnlock() if !ok { continue } select { case <-e.ctx.Done(): return case sub <- InboxMessage{ ID: ev.ID, Data: ev.Data, ReceivedAt: ev.ReceivedAt}: } } } } // ---------------------------------------------------------------------------- // Envoy // ---------------------------------------------------------------------------- type Envoy struct { url string connected bool terminate func() queue chan []byte send func(data []byte) error events <-chan OutboundPoolEvent inbox <-chan InboxMessage eventSubs []chan<- OutboundPoolEvent labelledInboxSubs map[string][]chan<- InboxMessage inboxSubs []chan<- InboxMessage ctx context.Context cancel context.CancelFunc mu sync.RWMutex wg sync.WaitGroup observer observer.Observer handler slog.Handler logger *slog.Logger } func newEnvoy( ctx context.Context, url string, terminate func(), send func(data []byte) error, events <-chan OutboundPoolEvent, inbox <-chan InboxMessage, observer observer.Observer, handler slog.Handler, ) *Envoy { ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy")) e := &Envoy{ url: url, terminate: terminate, queue: make(chan []byte), send: send, events: events, inbox: inbox, labelledInboxSubs: make(map[string][]chan<- InboxMessage), ctx: ctx, cancel: cancel, observer: observer, } if handler != nil { comp := component.FromContext(ctx) e.handler = handler.WithAttrs([]slog.Attr{slog.String("peer", url)}) e.logger = slog.New(e.handler).With(slog.Any("component", comp)) } e.wg.Add(2) go e.publishEvents() go e.routeInbox() return e } func (e *Envoy) IsConnected() bool { e.mu.RLock() defer e.mu.RUnlock() return e.connected } func (e *Envoy) Context() context.Context { return e.ctx } func (e *Envoy) PeerID() string { return e.url } func (e *Envoy) Handler() slog.Handler { return e.handler } func (e *Envoy) Observer() observer.Observer { return e.observer } func (e *Envoy) Dismiss() { e.terminate() e.cancel() e.wg.Wait() e.mu.Lock() defer e.mu.Unlock() for _, sub := range e.eventSubs { close(sub) } for _, sub := range e.inboxSubs { close(sub) } e.eventSubs = nil e.inboxSubs = nil e.labelledInboxSubs = make(map[string][]chan<- InboxMessage) } func (e *Envoy) Send(data []byte) error { return e.send(data) } func (e *Envoy) SubscribeEvents() <-chan OutboundPoolEvent { e.mu.Lock() defer e.mu.Unlock() ch := make(chan OutboundPoolEvent) e.eventSubs = append(e.eventSubs, ch) return ch } func (e *Envoy) SubscribeInbox(labels []string) <-chan InboxMessage { e.mu.Lock() defer e.mu.Unlock() ch := make(chan InboxMessage) e.inboxSubs = append(e.inboxSubs, ch) for _, label := range labels { if _, ok := e.labelledInboxSubs[label]; !ok { e.labelledInboxSubs[label] = make([]chan<- InboxMessage, 0) } e.labelledInboxSubs[label] = append(e.labelledInboxSubs[label], ch) } return ch } func (e *Envoy) publishEvents() { defer e.wg.Done() for { select { case <-e.ctx.Done(): return case ev, ok := <-e.events: if !ok { return } e.mu.Lock() switch ev.Kind { case EventConnected: e.connected = true case EventDisconnected: e.connected = false } subs := e.eventSubs e.mu.Unlock() for _, ch := range subs { select { case <-e.ctx.Done(): return case ch <- ev: } } } } } func (e *Envoy) routeInbox() { defer e.wg.Done() for { select { case <-e.ctx.Done(): return case msg, ok := <-e.inbox: if !ok { return } label, err := envelope.GetLabel(msg.Data) if err != nil { continue } e.mu.RLock() subs, ok := e.labelledInboxSubs[string(label)] e.mu.RUnlock() if !ok { continue } for _, ch := range subs { select { case <-e.ctx.Done(): return case ch <- msg: } } } } }