From de63405f95e46d7b0bf92358c147e8118fcff89d Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 8 May 2026 18:00:06 -0400 Subject: [PATCH] wrote revised library skeleton --- adapter.go | 168 ++++++++++++++++++++++++ clerk.go | 61 +++++++++ journal.go | 264 ++++++++++++++++++++++++++++++++++++++ post.go | 116 +++++++++++++++++ req.go | 368 +++++++++++++++++++++++++++++++++++++++++++++++++++++ sub.go | 1 - 6 files changed, 977 insertions(+), 1 deletion(-) create mode 100644 req.go delete mode 100644 sub.go diff --git a/adapter.go b/adapter.go index 8828353..373f524 100644 --- a/adapter.go +++ b/adapter.go @@ -1 +1,169 @@ package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee" + "log/slog" + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +type Envelope = []byte +type PoolSendFunc = func(id string, data Envelope) error + +// Pool Plugins + +type EmbassyPlugin struct { + Connect func(id string) error + Remove func(id string) error + Send PoolSendFunc + Events <-chan honeybee.OutboundPoolEvent +} + +type HotelPlugin struct { + Add func(id string, socket honeybee.Socket) error + Replace func(id string, socket honeybee.Socket) error + Remove func(id string) error + Send PoolSendFunc + Events <-chan honeybee.InboundPoolEvent +} + +// Events + +type PoolEventKind = int + +const ( + EventConnected PoolEventKind = iota + EventDisconnected + EventRemoved +) + +type PoolEvent struct { + ID string + Kind PoolEventKind + At time.Time +} + +// Adapter + +type Adapter interface { + Peers() []string + HasPeer(id string) bool + IsConnected(id string) bool + Subscribe() <-chan PoolEvent + Send(id string, data Envelope) error +} + +// Embassy + +type Embassy struct { + pool EmbassyPlugin + peers map[string]bool // peerID: isConnected + journals chan JournalEntry + eventSubs []chan PoolEvent + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + logger *slog.Logger +} + +// Hotel + +type Hotel struct { + pool HotelPlugin + peers map[string]bool // peerID: isConnected + journals chan JournalEntry + eventSubs []chan PoolEvent + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + logger *slog.Logger +} + +// ---------------------------------------------------------------------------- +// Embassy (Outbound Adapter) +// ---------------------------------------------------------------------------- + +func NewEmbassy() *Embassy { + return nil +} + +func (e *Embassy) Dispatch(url string) error { + return nil +} + +func (e *Embassy) Dismiss(url string) error { + return nil +} + +func (e *Embassy) Close() {} + +func (e *Embassy) Peers() []string { + return nil +} + +func (e *Embassy) HasPeer(id string) bool { + return false +} + +func (e *Embassy) IsConnected(id string) bool { + return false +} + +func (e *Embassy) Subscribe() <-chan PoolEvent { + return nil +} + +func (e *Embassy) Send(id string, data Envelope) error { + return nil +} + +// ---------------------------------------------------------------------------- +// Hotel (Inbound Adapter) +// ---------------------------------------------------------------------------- + +func NewHotel() *Hotel { + return nil +} + +func (h *Hotel) Welcome(id string, socket honeybee.Socket) error { + return nil +} + +func (h *Hotel) WelcomeBack(id string, socket honeybee.Socket) error { + return nil +} + +func (h *Hotel) Farewell(id string) error { + return nil +} + +func (h *Hotel) Close() {} + +func (h *Hotel) Peers() []string { + return nil +} + +func (h *Hotel) HasPeer(id string) bool { + return false +} + +func (h *Hotel) IsConnected(id string) bool { + return false +} + +func (h *Hotel) Subscribe() <-chan PoolEvent { + return nil +} + +func (h *Hotel) Send(id string, data Envelope) error { + return nil +} diff --git a/clerk.go b/clerk.go index 8828353..74a03e7 100644 --- a/clerk.go +++ b/clerk.go @@ -1 +1,62 @@ package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee" + "log/slog" + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +// Letters + +type InboundLetter struct { + ID string + Data Envelope + At time.Time +} + +// Clerk + +type Clerk struct { + inbox <-chan honeybee.InboxMessage + + // wiring phase + mu sync.Mutex + started bool + pending []clerkSub + known map[string]struct{} + + // runtime phase + routes clerkRoutes + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *slog.Logger +} + +type clerkSub struct { + ch chan InboundLetter + labels map[string]struct{} +} + +type clerkRoutes = map[string][]chan InboundLetter + +// ---------------------------------------------------------------------------- +// Clerk +// ---------------------------------------------------------------------------- + +func NewClerk() *Clerk { + return nil +} + +func (c *Clerk) Subscribe() {} + +func (c *Clerk) Start() {} + +func (c *Clerk) Close() {} diff --git a/journal.go b/journal.go index 8828353..d53a133 100644 --- a/journal.go +++ b/journal.go @@ -1 +1,265 @@ package prism + +import ( + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +type JournalAuthor string + +const ( + AuthorEmbassy JournalAuthor = "prism.embassy" + AuthorReqManager JournalAuthor = "prism.req_manager" + AuthorStreamReq JournalAuthor = "prism.stream_req" + AuthorQueryReq JournalAuthor = "prism.query_req" +) + +// JournalCollector + +type JournalCollector struct { + entries chan JournalEntry + mu sync.Mutex + wg sync.WaitGroup + closing bool +} + +// JournalEntry + +type JournalEntry interface { + PeerID() string + SealedAt() time.Time + Author() JournalAuthor +} + +type entry struct { + peerID string + sealedAt time.Time + author JournalAuthor +} + +func (e *entry) PeerID() string { return e.peerID } +func (e *entry) SealedAt() time.Time { return e.sealedAt } +func (e *entry) Author() JournalAuthor { return e.author } + +// ---------------------------------------------------------------------------- +// Journal Collector +// ---------------------------------------------------------------------------- + +func NewJournalCollector() *JournalCollector { + return nil +} + +func (c *JournalCollector) Enroll() {} + +func (c *JournalCollector) Close() {} + +func (c *JournalCollector) Entries() {} + +// ---------------------------------------------------------------------------- +// Journal Entries +// ---------------------------------------------------------------------------- + +func newEntry(peerID string, author JournalAuthor) *entry { + return &entry{ + peerID: peerID, + author: author, + sealedAt: time.Now(), + } +} + +// PeerConnected + +type PeerConnectedJournal struct { + *entry + Data PeerConnectedData +} + +type PeerConnectedData struct { + At time.Time +} + +func NewPeerConnectedJournal( + peerID string, author JournalAuthor, data PeerConnectedData, +) PeerConnectedJournal { + return PeerConnectedJournal{entry: newEntry(peerID, author), Data: data} +} + +// PeerDisconnected + +type PeerDisconnectedJournal struct { + *entry + Data PeerDisconnectedData +} + +type PeerDisconnectedData struct { + At time.Time +} + +func NewPeerDisconnectedJournal( + peerID string, author JournalAuthor, data PeerDisconnectedData, +) PeerDisconnectedJournal { + return PeerDisconnectedJournal{entry: newEntry(peerID, author), Data: data} +} + +// ReqQueued + +type ReqQueuedJournal struct { + *entry + Data ReqQueuedData +} + +type ReqQueuedData struct { + SubID string + LetterID uint64 + QueuedAt time.Time + Err error +} + +func NewReqQueuedJournal( + peerID string, author JournalAuthor, data ReqQueuedData, +) ReqQueuedJournal { + return ReqQueuedJournal{entry: newEntry(peerID, author), Data: data} +} + +// CloseQueued + +type CloseQueuedJournal struct { + *entry + Data CloseQueuedData +} + +type CloseQueuedData struct { + SubID string + LetterID uint64 + QueuedAt time.Time + Err error +} + +func NewCloseQueuedJournal( + peerID string, author JournalAuthor, data CloseQueuedData, +) CloseQueuedJournal { + return CloseQueuedJournal{entry: newEntry(peerID, author), Data: data} +} + +// ReqSendOutcome + +type ReqSendOutcomeJournal struct { + *entry + Data ReqSendOutcomeData +} + +type ReqSendOutcomeData struct { + SubID string + LetterID uint64 + Outcome LetterOutcomeKind + SentAt time.Time + MissedAt time.Time + RetryCount int + Err error +} + +func NewReqSendOutcomeJournal( + peerID string, author JournalAuthor, data ReqSendOutcomeData, +) ReqSendOutcomeJournal { + return ReqSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} +} + +// CloseSendOutcome + +type CloseSendOutcomeJournal struct { + *entry + Data CloseSendOutcomeData +} + +type CloseSendOutcomeData struct { + SubID string + LetterID uint64 + Outcome LetterOutcomeKind + SentAt time.Time + MissedAt time.Time + RetryCount int + Err error +} + +func NewCloseSendOutcomeJournal( + peerID string, author JournalAuthor, data CloseSendOutcomeData, +) CloseSendOutcomeJournal { + return CloseSendOutcomeJournal{entry: newEntry(peerID, author), Data: data} +} + +// ReceivedEOSE + +type ReceivedEOSEJournal struct { + *entry + Data ReceivedEOSEData +} + +type ReceivedEOSEData struct { + SubID string + At time.Time +} + +func NewReceivedEOSEJournal( + peerID string, author JournalAuthor, data ReceivedEOSEData, +) ReceivedEOSEJournal { + return ReceivedEOSEJournal{entry: newEntry(peerID, author), Data: data} +} + +// MissedEOSE + +type MissedEOSEJournal struct { + *entry + Data MissedEOSEData +} + +type MissedEOSEData struct { + SubID string + At time.Time +} + +func NewMissedEOSEJournal( + peerID string, author JournalAuthor, data MissedEOSEData, +) MissedEOSEJournal { + return MissedEOSEJournal{entry: newEntry(peerID, author), Data: data} +} + +// ReceivedClosed + +type ReceivedClosedJournal struct { + *entry + Data ReceivedClosedData +} + +type ReceivedClosedData struct { + SubID string + At time.Time + Message string +} + +func NewReceivedClosedJournal( + peerID string, author JournalAuthor, data ReceivedClosedData, +) ReceivedClosedJournal { + return ReceivedClosedJournal{entry: newEntry(peerID, author), Data: data} +} + +// ReqClosed + +type ReqClosedJournal struct { + *entry + Data ReqClosedData +} + +type ReqClosedData struct { + SubID string + At time.Time +} + +func NewReqClosedJournal( + peerID string, author JournalAuthor, data ReqClosedData, +) ReqClosedJournal { + return ReqClosedJournal{entry: newEntry(peerID, author), Data: data} +} diff --git a/post.go b/post.go index 8828353..c4ef679 100644 --- a/post.go +++ b/post.go @@ -1 +1,117 @@ package prism + +import ( + "container/list" + "context" + "log/slog" + "sync" + "sync/atomic" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +// Letters + +type LetterID = uint64 + +type OutboundLetter struct { +} + +type letterRecord struct { +} + +type LetterOutcomeKind int + +type LetterOutcome struct { +} + +// Postmaster + +type Postmaster struct { + couriers map[string]*Courier + letters map[LetterID]letterRecord + events <-chan PoolEvent // Adapter.Subscribe + send PoolSendFunc // Adapter.Send + counter atomic.Uint64 + + ctx context.Context + cancel context.CancelFunc + mu sync.Mutex + wg sync.WaitGroup + cfg postmasterConfig + logger *slog.Logger +} + +// Courier + +type Courier struct { + id string // peer id + master *Postmaster + cmd chan courierCommand + send func(data Envelope) error + + // state + queue list.List + connected bool + sending bool + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *slog.Logger +} + +// Commands + +type courierCommand interface { + apply(c *Courier) +} + +// Options + +type PostmasterOption func(*postmasterConfig) + +type postmasterConfig struct{} + +// ---------------------------------------------------------------------------- +// Postmaster +// ---------------------------------------------------------------------------- + +func NewPostmaster( + pool *Adapter, + send PoolSendFunc, + opts ...PostmasterOption, +) *Postmaster { + return nil +} + +func (m *Postmaster) Send( + ctx context.Context, + peerID string, + data Envelope, + deadline time.Duration, + onOutcome func(LetterOutcome), // should be non-blocking +) (LetterID, error) { + return 0, nil +} + +func (m *Postmaster) Close() {} + +// ---------------------------------------------------------------------------- +// Courier +// ---------------------------------------------------------------------------- + +func NewCourier() *Courier { + return nil +} + +func (c *Courier) Enqueue(letter OutboundLetter) {} + +func (c *Courier) Close() {} + +// ---------------------------------------------------------------------------- +// Commands +// ---------------------------------------------------------------------------- diff --git a/req.go b/req.go new file mode 100644 index 0000000..1a37515 --- /dev/null +++ b/req.go @@ -0,0 +1,368 @@ +package prism + +import ( + "context" + "log/slog" + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +// Outputs + +type ReqEvent struct { + PeerID string + ReceivedAt time.Time + Event []byte +} + +type ReqMessage struct { + PeerID string + ReceivedAt time.Time + Message string +} + +// Options + +const ( + defaultLabel = "REQ" + defaultEOSETimeout = 30 * time.Second +) + +type reqConfig struct { + id string + label string + eoseTimeout time.Duration +} + +type ReqOption func(*reqConfig) + +// Request Manager + +type ReqManager struct { + subs map[string]*Request + byPeer map[string]map[string]struct{} // peerID -> subID set + + postmaster *Postmaster + collector *JournalCollector + journals chan JournalEntry // JournalCollector.Enroll + + isConnected func(peerID string) bool // Adapter.IsConnected + poolEvents <-chan PoolEvent // Adapter.Subscribe + inbox <-chan InboundLetter // Clerk.Subscribe + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + logger *slog.Logger +} + +type Request interface { + command(reqCommand) + Peers() []string + Close() +} + +// Base Request + +type request struct { + id string + req Envelope + + cmds chan reqCommand + closing bool + done chan struct{} + + events chan ReqEvent + messages chan ReqMessage + + postmaster *Postmaster + journals chan<- JournalEntry + isConnected func() bool + onClose func() + + sendCtx context.Context + wg sync.WaitGroup + logger *slog.Logger +} + +// Stream Request + +type StreamReq struct { + *request + peers map[string]*streamPeer +} + +type streamPeer struct { + closeSent bool + closed bool + closeOnce sync.Once +} + +// Query Request + +type QueryReq struct { + *request + peers map[string]*queryPeer + eoseTimeout time.Duration +} + +type queryPeer struct { + eoseTimer *time.Timer + closeSent bool + closed bool + closeOnce sync.Once +} + +// Commands + +type reqCommand interface{ reqCommand() } // gates command channel + +type cmdRecordReqOutcome struct { + peerID string + outcome LetterOutcome +} + +func (cmdRecordReqOutcome) reqCommand() {} + +type cmdRecordCloseOutcome struct { + peerID string + outcome LetterOutcome +} + +func (cmdRecordCloseOutcome) reqCommand() {} + +type cmdReceiveEvent struct { + peerID string + at time.Time + data Envelope +} + +func (cmdReceiveEvent) reqCommand() {} + +type cmdReceiveEOSE struct { + peerID string + at time.Time +} + +func (cmdReceiveEOSE) reqCommand() {} + +type cmdReceiveClosed struct { + peerID string + at time.Time + message string +} + +func (cmdReceiveClosed) reqCommand() {} + +type cmdClosePeer struct{ peerID string } + +func (cmdClosePeer) reqCommand() {} + +type cmdCloseReq struct{} + +func (cmdCloseReq) reqCommand() {} + +type cmdHandleReconnect struct{ peerID string } + +func (cmdHandleReconnect) reqCommand() {} + +type cmdHandleEOSETimeout struct{ peerID string } + +func (cmdHandleEOSETimeout) reqCommand() {} + +// ---------------------------------------------------------------------------- +// Request Options +// ---------------------------------------------------------------------------- + +func newReqConfig(opts ...ReqOption) reqConfig { + return reqConfig{} +} + +func WithID(id string) ReqOption { + return func(c *reqConfig) { + c.id = id + } +} + +func WithLabel(label string) ReqOption { + return func(c *reqConfig) { + c.label = label + } +} + +func WithEOSETimeout(timeout time.Duration) ReqOption { + return func(c *reqConfig) { + c.eoseTimeout = timeout + } +} + +// ---------------------------------------------------------------------------- +// Request Manager +// ---------------------------------------------------------------------------- + +func NewReqManager() *ReqManager { + return nil +} + +func (m *ReqManager) OpenStream() {} + +func (m *ReqManager) OpenQuery() {} + +func (m *ReqManager) CloseReq() {} + +func (m *ReqManager) Close() {} + +func (m *ReqManager) makeOnClose() {} + +func (m *ReqManager) routeInbox() {} + +func (m *ReqManager) routeEvents() {} + +// Helpers + +func cleanPeers() {} + +var encoder struct{} + +func generateID() {} + +// ---------------------------------------------------------------------------- +// Base Request +// ---------------------------------------------------------------------------- + +func (r *request) emit() {} + +func (r *request) command() {} + +func (r *request) send() {} + +// ---------------------------------------------------------------------------- +// Stream Request +// ---------------------------------------------------------------------------- + +func NewStreamReq() *StreamReq { + return nil +} + +func (r *StreamReq) Peers() []string { + return nil +} + +func (r *StreamReq) Close() {} + +func (r *StreamReq) sendReq(peerID string) error { + return nil +} + +func (r *StreamReq) sendClose(peerID string) error { + return nil +} + +func (r *StreamReq) run() { + // buffers command queue internally + // switches on command type +} + +func (r *StreamReq) applyRecordReqOutcome(cmd cmdRecordReqOutcome) {} + +func (r *StreamReq) applyRecordCloseOutcome(cmd cmdRecordCloseOutcome) {} + +func (r *StreamReq) applyReceiveEvent(cmd cmdReceiveEvent) {} + +func (r *StreamReq) applyReceiveEOSE(cmd cmdReceiveEOSE) {} + +func (r *StreamReq) applyReceiveClosed(cmd cmdReceiveClosed) {} + +func (r *StreamReq) applyClosePeer(cmd cmdClosePeer) {} + +func (r *StreamReq) applyCloseReq(cmd cmdCloseReq) {} + +func (r *StreamReq) applyHandleReconnect(cmd cmdHandleReconnect) {} + +// ---------------------------------------------------------------------------- +// Query Request +// ---------------------------------------------------------------------------- + +func NewQueryReq() *QueryReq { + return nil +} + +func (r *QueryReq) Peers() []string { + return nil +} + +func (r *QueryReq) Close() {} + +func (r *QueryReq) sendReq(peerID string) error { + return nil +} + +func (r *QueryReq) sendClose(peerID string) error { + return nil +} + +func (r *QueryReq) run() { + // buffers command queue internally + // switches on command type +} + +func (r *QueryReq) applyRecordReqOutcome(cmd cmdRecordReqOutcome) {} + +func (r *QueryReq) applyRecordCloseOutcome(cmd cmdRecordCloseOutcome) {} + +func (r *QueryReq) applyReceiveEvent(cmd cmdReceiveEvent) {} + +func (r *QueryReq) applyReceiveEOSE(cmd cmdReceiveEOSE) {} + +func (r *QueryReq) applyReceiveClosed(cmd cmdReceiveClosed) {} + +func (r *QueryReq) applyClosePeer(cmd cmdClosePeer) {} + +func (r *QueryReq) applyCloseReq(cmd cmdCloseReq) {} + +func (r *QueryReq) applyHandleEOSETimeout(cmd cmdHandleEOSETimeout) {} + +// ---------------------------------------------------------------------------- +// Commands +// ---------------------------------------------------------------------------- + +func newRecordReqOutcome(peerID string, outcome LetterOutcome) cmdRecordReqOutcome { + return cmdRecordReqOutcome{peerID: peerID, outcome: outcome} +} + +func newRecordCloseOutcome(peerID string, outcome LetterOutcome) cmdRecordCloseOutcome { + return cmdRecordCloseOutcome{peerID: peerID, outcome: outcome} +} + +func newReceiveEvent(peerID string, at time.Time, data Envelope) cmdReceiveEvent { + return cmdReceiveEvent{peerID: peerID, at: at, data: data} +} + +func newReceiveEOSE(peerID string, at time.Time) cmdReceiveEOSE { + return cmdReceiveEOSE{peerID: peerID, at: at} +} + +func newReceiveClosed(peerID string, at time.Time, message string) cmdReceiveClosed { + return cmdReceiveClosed{peerID: peerID, at: at, message: message} +} + +func newClosePeer(peerID string) cmdClosePeer { + return cmdClosePeer{peerID: peerID} +} + +func newCloseReq() cmdCloseReq { + return cmdCloseReq{} +} + +func newHandleReconnect(peerID string) cmdHandleReconnect { + return cmdHandleReconnect{peerID: peerID} +} + +func newHandleEOSETimeout(peerID string) cmdHandleEOSETimeout { + return cmdHandleEOSETimeout{peerID: peerID} +} diff --git a/sub.go b/sub.go deleted file mode 100644 index 8828353..0000000 --- a/sub.go +++ /dev/null @@ -1 +0,0 @@ -package prism