510 lines
9.6 KiB
Go
510 lines
9.6 KiB
Go
package prism
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
honeybee "git.wisehodl.dev/jay/go-honeybee/outbound"
|
|
"git.wisehodl.dev/jay/go-mana-component"
|
|
"git.wisehodl.dev/jay/go-mana-prism/observer"
|
|
"git.wisehodl.dev/jay/go-roots-ws"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Types
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type EmbassyPlugin struct {
|
|
Connect func(url string) error
|
|
Remove func(url string) error
|
|
Send func(url string, data []byte) error
|
|
Events <-chan honeybee.PoolEvent
|
|
Inbox <-chan honeybee.InboxMessage
|
|
}
|
|
|
|
type EmbassyEventKind int
|
|
|
|
const (
|
|
EventConnected EmbassyEventKind = iota
|
|
EventDisconnected
|
|
EventEmbassyUnknown
|
|
)
|
|
|
|
func mapEmbassyEvent(kind honeybee.PoolEventKind) EmbassyEventKind {
|
|
switch kind {
|
|
case honeybee.EventConnected:
|
|
return EventConnected
|
|
case honeybee.EventDisconnected:
|
|
return EventDisconnected
|
|
default:
|
|
return EventEmbassyUnknown
|
|
}
|
|
}
|
|
|
|
type OutboundPoolEvent struct {
|
|
ID string
|
|
Kind EmbassyEventKind
|
|
At time.Time
|
|
}
|
|
|
|
type InboxMessage struct {
|
|
ID string
|
|
Data []byte
|
|
ReceivedAt time.Time
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Options
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type EmbassyConfig struct {
|
|
handler slog.Handler
|
|
observer observer.Observer
|
|
}
|
|
|
|
type EmbassyOption func(*EmbassyConfig)
|
|
|
|
func WithEmbassyHandler(h slog.Handler) EmbassyOption {
|
|
return func(c *EmbassyConfig) {
|
|
c.handler = h
|
|
}
|
|
}
|
|
|
|
func WithEmbassyObserver(o observer.Observer) EmbassyOption {
|
|
return func(c *EmbassyConfig) {
|
|
c.observer = o
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Embassy
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type Embassy struct {
|
|
pool EmbassyPlugin
|
|
envoys map[string]*Envoy
|
|
eventSubs map[string]chan<- OutboundPoolEvent
|
|
inboxSubs map[string]chan<- InboxMessage
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
observer observer.Observer
|
|
handler slog.Handler
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func NewEmbassy(
|
|
ctx context.Context,
|
|
pool EmbassyPlugin,
|
|
opts ...EmbassyOption,
|
|
) *Embassy {
|
|
ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy"))
|
|
cfg := EmbassyConfig{observer: observer.NullObserver{}}
|
|
for _, o := range opts {
|
|
o(&cfg)
|
|
}
|
|
|
|
e := &Embassy{
|
|
pool: pool,
|
|
envoys: make(map[string]*Envoy),
|
|
eventSubs: make(map[string]chan<- OutboundPoolEvent),
|
|
inboxSubs: make(map[string]chan<- InboxMessage),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
observer: cfg.observer,
|
|
handler: cfg.handler,
|
|
}
|
|
|
|
if e.handler != nil {
|
|
comp := component.FromContext(ctx)
|
|
e.logger = slog.New(cfg.handler).With(slog.Any("component", comp))
|
|
}
|
|
|
|
e.wg.Add(2)
|
|
go e.routeEvents()
|
|
go e.routeInbox()
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Embassy) Dispatch(url string) error {
|
|
url, err := honeybee.NormalizeURL(url)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid url: %w", err)
|
|
}
|
|
|
|
e.mu.RLock()
|
|
_, exists := e.envoys[url]
|
|
if exists {
|
|
e.mu.RUnlock()
|
|
return fmt.Errorf("already dispatched: %s", url)
|
|
}
|
|
e.mu.RUnlock()
|
|
|
|
e.mu.Lock()
|
|
|
|
e.pool.Connect(url)
|
|
|
|
terminate := func() { e.dismiss(url) }
|
|
send := func(data []byte) error { return e.send(url, data) }
|
|
events := e.subscribeEventsLock(url)
|
|
inbox := e.subscribeInboxLock(url)
|
|
|
|
e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.observer, e.handler)
|
|
|
|
e.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *Embassy) Envoys() []string {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
var envoys []string
|
|
for url := range e.envoys {
|
|
envoys = append(envoys, url)
|
|
}
|
|
return envoys
|
|
}
|
|
|
|
func (e *Embassy) Call(url string) *Envoy {
|
|
url, err := honeybee.NormalizeURL(url)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
|
|
envoy, ok := e.envoys[url]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return envoy
|
|
}
|
|
|
|
func (e *Embassy) Close() {
|
|
e.cancel()
|
|
e.wg.Wait()
|
|
}
|
|
|
|
func (e *Embassy) dismiss(url string) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
e.pool.Remove(url)
|
|
e.unsubscribeEventsLock(url)
|
|
e.unsubscribeInboxLock(url)
|
|
delete(e.envoys, url)
|
|
}
|
|
|
|
func (e *Embassy) send(url string, data []byte) error {
|
|
return e.pool.Send(url, data)
|
|
}
|
|
|
|
func (e *Embassy) subscribeEventsLock(url string) <-chan OutboundPoolEvent {
|
|
ch := make(chan OutboundPoolEvent)
|
|
e.eventSubs[url] = ch
|
|
return ch
|
|
}
|
|
|
|
func (e *Embassy) unsubscribeEventsLock(url string) {
|
|
ch, ok := e.eventSubs[url]
|
|
if !ok {
|
|
return
|
|
}
|
|
close(ch)
|
|
delete(e.eventSubs, url)
|
|
}
|
|
|
|
func (e *Embassy) subscribeInboxLock(url string) <-chan InboxMessage {
|
|
ch := make(chan InboxMessage)
|
|
e.inboxSubs[url] = ch
|
|
return ch
|
|
}
|
|
|
|
func (e *Embassy) unsubscribeInboxLock(url string) {
|
|
ch, ok := e.inboxSubs[url]
|
|
if !ok {
|
|
return
|
|
}
|
|
close(ch)
|
|
delete(e.inboxSubs, url)
|
|
}
|
|
|
|
func (e *Embassy) routeEvents() {
|
|
defer e.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case ev, ok := <-e.pool.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
url, err := honeybee.NormalizeURL(ev.ID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
sub, ok := e.eventSubs[url]
|
|
e.mu.RUnlock()
|
|
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case sub <- OutboundPoolEvent{
|
|
ID: ev.ID, Kind: mapEmbassyEvent(ev.Kind), At: ev.At,
|
|
}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Embassy) routeInbox() {
|
|
defer e.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case ev, ok := <-e.pool.Inbox:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
url, err := honeybee.NormalizeURL(ev.ID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
sub, ok := e.inboxSubs[url]
|
|
e.mu.RUnlock()
|
|
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case sub <- InboxMessage{
|
|
ID: ev.ID, Data: ev.Data, ReceivedAt: ev.ReceivedAt}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Envoy
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type Envoy struct {
|
|
url string
|
|
connected bool
|
|
terminate func()
|
|
queue chan []byte
|
|
send func(data []byte) error
|
|
events <-chan OutboundPoolEvent
|
|
inbox <-chan InboxMessage
|
|
eventSubs []chan<- OutboundPoolEvent
|
|
labelledInboxSubs map[string][]chan<- InboxMessage
|
|
inboxSubs []chan<- InboxMessage
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
observer observer.Observer
|
|
handler slog.Handler
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func newEnvoy(
|
|
ctx context.Context,
|
|
url string,
|
|
terminate func(),
|
|
send func(data []byte) error,
|
|
events <-chan OutboundPoolEvent,
|
|
inbox <-chan InboxMessage,
|
|
observer observer.Observer,
|
|
handler slog.Handler,
|
|
) *Envoy {
|
|
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy"))
|
|
|
|
e := &Envoy{
|
|
url: url,
|
|
terminate: terminate,
|
|
queue: make(chan []byte),
|
|
send: send,
|
|
events: events,
|
|
inbox: inbox,
|
|
labelledInboxSubs: make(map[string][]chan<- InboxMessage),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
observer: observer,
|
|
handler: handler,
|
|
}
|
|
|
|
if handler != nil {
|
|
comp := component.FromContext(ctx)
|
|
e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url)
|
|
}
|
|
|
|
e.wg.Add(2)
|
|
go e.publishEvents()
|
|
go e.routeInbox()
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Envoy) IsConnected() bool {
|
|
e.mu.RLock()
|
|
defer e.mu.RUnlock()
|
|
return e.connected
|
|
}
|
|
|
|
func (e *Envoy) Context() context.Context {
|
|
return e.ctx
|
|
}
|
|
|
|
func (e *Envoy) PeerID() string {
|
|
return e.url
|
|
}
|
|
|
|
func (e *Envoy) Handler() slog.Handler {
|
|
return e.handler
|
|
}
|
|
|
|
func (e *Envoy) Observer() observer.Observer {
|
|
return e.observer
|
|
}
|
|
|
|
func (e *Envoy) Dismiss() {
|
|
e.terminate()
|
|
e.cancel()
|
|
e.wg.Wait()
|
|
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
for _, sub := range e.eventSubs {
|
|
close(sub)
|
|
}
|
|
|
|
for _, sub := range e.inboxSubs {
|
|
close(sub)
|
|
}
|
|
|
|
e.eventSubs = nil
|
|
e.inboxSubs = nil
|
|
e.labelledInboxSubs = make(map[string][]chan<- InboxMessage)
|
|
}
|
|
|
|
func (e *Envoy) Send(data []byte) error {
|
|
return e.send(data)
|
|
}
|
|
|
|
func (e *Envoy) SubscribeEvents() <-chan OutboundPoolEvent {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
ch := make(chan OutboundPoolEvent)
|
|
e.eventSubs = append(e.eventSubs, ch)
|
|
return ch
|
|
}
|
|
|
|
func (e *Envoy) SubscribeInbox(labels []string) <-chan InboxMessage {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
ch := make(chan InboxMessage)
|
|
e.inboxSubs = append(e.inboxSubs, ch)
|
|
for _, label := range labels {
|
|
if _, ok := e.labelledInboxSubs[label]; !ok {
|
|
e.labelledInboxSubs[label] = make([]chan<- InboxMessage, 0)
|
|
}
|
|
e.labelledInboxSubs[label] = append(e.labelledInboxSubs[label], ch)
|
|
}
|
|
return ch
|
|
}
|
|
|
|
func (e *Envoy) publishEvents() {
|
|
defer e.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case ev, ok := <-e.events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
e.mu.Lock()
|
|
switch ev.Kind {
|
|
case EventConnected:
|
|
e.connected = true
|
|
case EventDisconnected:
|
|
e.connected = false
|
|
}
|
|
subs := e.eventSubs
|
|
e.mu.Unlock()
|
|
|
|
for _, ch := range subs {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case ch <- ev:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Envoy) routeInbox() {
|
|
defer e.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case msg, ok := <-e.inbox:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
label, err := envelope.GetLabel(msg.Data)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
e.mu.RLock()
|
|
subs, ok := e.labelledInboxSubs[string(label)]
|
|
e.mu.RUnlock()
|
|
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
for _, ch := range subs {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case ch <- msg:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|