From eec6b2ff69721165fc17a383681b86af99c5a41b Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 11 May 2026 10:20:15 -0400 Subject: [PATCH] clean up req code skeleton --- post.go | 8 +- queryreq_test.go | 1 + req.go | 262 ++++++++++++++++++++++++++++++--------------- reqmanager_test.go | 1 + streamreq_test.go | 1 + 5 files changed, 184 insertions(+), 89 deletions(-) create mode 100644 queryreq_test.go create mode 100644 reqmanager_test.go create mode 100644 streamreq_test.go diff --git a/post.go b/post.go index 1d17a10..1c78dcb 100644 --- a/post.go +++ b/post.go @@ -80,7 +80,7 @@ type Postmaster struct { // Courier type Courier struct { - task chan courierTask + tasks chan courierTask sendFunc func(data Envelope) error // state @@ -324,7 +324,7 @@ func NewCourier( component.MustExtend(ctx, "courier")) c := &Courier{ - task: make(chan courierTask, 64), + tasks: make(chan courierTask, 64), sendFunc: sendFunc, ctx: ctx, cancel: cancel, @@ -370,7 +370,7 @@ func (c *Courier) Close() { func (c *Courier) order(task courierTask) { select { case <-c.ctx.Done(): - case c.task <- task: + case c.tasks <- task: } } @@ -381,7 +381,7 @@ func (c *Courier) run() { select { case <-c.ctx.Done(): return - case task := <-c.task: + case task := <-c.tasks: task.dispatch(c) c.maybeSend() } diff --git a/queryreq_test.go b/queryreq_test.go new file mode 100644 index 0000000..8828353 --- /dev/null +++ b/queryreq_test.go @@ -0,0 +1 @@ +package prism diff --git a/req.go b/req.go index 1a37515..98356e7 100644 --- a/req.go +++ b/req.go @@ -2,6 +2,7 @@ package prism import ( "context" + "encoding/base32" "log/slog" "sync" "time" @@ -43,7 +44,7 @@ type ReqOption func(*reqConfig) // Request Manager type ReqManager struct { - subs map[string]*Request + subs map[string]Request byPeer map[string]map[string]struct{} // peerID -> subID set postmaster *Postmaster @@ -52,17 +53,18 @@ type ReqManager struct { isConnected func(peerID string) bool // Adapter.IsConnected poolEvents <-chan PoolEvent // Adapter.Subscribe - inbox <-chan InboundLetter // Clerk.Subscribe + poolInbox <-chan InboundLetter // Clerk.Subscribe - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - logger *slog.Logger + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + handler slog.Handler + logger *slog.Logger } type Request interface { - command(reqCommand) + order(reqTask) Peers() []string Close() } @@ -73,16 +75,17 @@ type request struct { id string req Envelope - cmds chan reqCommand + tasks chan reqTask closing bool done chan struct{} + buffer chan ReqEvent events chan ReqEvent messages chan ReqMessage postmaster *Postmaster journals chan<- JournalEntry - isConnected func() bool + isConnected func(peerID string) bool onClose func() sendCtx context.Context @@ -118,69 +121,77 @@ type queryPeer struct { closeOnce sync.Once } -// Commands +// Request Tasks -type reqCommand interface{ reqCommand() } // gates command channel +type reqTask interface{ reqTask() } // gates task channel -type cmdRecordReqOutcome struct { +type taskRecordReqOutcome struct { peerID string outcome LetterOutcome } -func (cmdRecordReqOutcome) reqCommand() {} +func (taskRecordReqOutcome) reqTask() {} -type cmdRecordCloseOutcome struct { +type taskRecordCloseOutcome struct { peerID string outcome LetterOutcome } -func (cmdRecordCloseOutcome) reqCommand() {} +func (taskRecordCloseOutcome) reqTask() {} -type cmdReceiveEvent struct { +type taskReceiveEvent struct { peerID string at time.Time data Envelope } -func (cmdReceiveEvent) reqCommand() {} +func (taskReceiveEvent) reqTask() {} -type cmdReceiveEOSE struct { +type taskReceiveEOSE struct { peerID string at time.Time } -func (cmdReceiveEOSE) reqCommand() {} +func (taskReceiveEOSE) reqTask() {} -type cmdReceiveClosed struct { +type taskReceiveClosed struct { peerID string at time.Time message string } -func (cmdReceiveClosed) reqCommand() {} +func (taskReceiveClosed) reqTask() {} -type cmdClosePeer struct{ peerID string } +type taskClosePeer struct{ peerID string } -func (cmdClosePeer) reqCommand() {} +func (taskClosePeer) reqTask() {} -type cmdCloseReq struct{} +type taskCloseReq struct{} -func (cmdCloseReq) reqCommand() {} +func (taskCloseReq) reqTask() {} -type cmdHandleReconnect struct{ peerID string } +type taskHandleReconnect struct{ peerID string } -func (cmdHandleReconnect) reqCommand() {} +func (taskHandleReconnect) reqTask() {} -type cmdHandleEOSETimeout struct{ peerID string } +type taskHandleEOSETimeout struct{ peerID string } -func (cmdHandleEOSETimeout) reqCommand() {} +func (taskHandleEOSETimeout) reqTask() {} // ---------------------------------------------------------------------------- // Request Options // ---------------------------------------------------------------------------- func newReqConfig(opts ...ReqOption) reqConfig { - return reqConfig{} + cfg := reqConfig{ + id: "", + label: defaultLabel, + eoseTimeout: defaultEOSETimeout, + } + for _, opt := range opts { + opt(&cfg) + } + return cfg } func WithID(id string) ReqOption { @@ -205,47 +216,117 @@ func WithEOSETimeout(timeout time.Duration) ReqOption { // Request Manager // ---------------------------------------------------------------------------- -func NewReqManager() *ReqManager { +func NewReqManager( + ctx context.Context, + postmaster *Postmaster, + isConnected func(string) bool, + poolEvents <-chan PoolEvent, + poolInbox <-chan InboundLetter, + collector *JournalCollector, + handler slog.Handler, +) *ReqManager { return nil } -func (m *ReqManager) OpenStream() {} +func (m *ReqManager) OpenStream( + filters [][]byte, + peers []string, + opts ...ReqOption, +) ( + id string, + events <-chan ReqEvent, + messages <-chan ReqMessage, + err error, +) { + return "", nil, nil, nil +} -func (m *ReqManager) OpenQuery() {} +func (m *ReqManager) OpenQuery( + filters [][]byte, + peers []string, + opts ...ReqOption, +) ( + id string, + events <-chan ReqEvent, + messages <-chan ReqMessage, + err error, +) { + return "", nil, nil, nil +} -func (m *ReqManager) CloseReq() {} +func (m *ReqManager) CloseReq(id string) error { + return nil +} func (m *ReqManager) Close() {} -func (m *ReqManager) makeOnClose() {} +func (m *ReqManager) makeOnClose(subID, peers []string) func() { + return func() {} +} -func (m *ReqManager) routeInbox() {} +func (m *ReqManager) routeInbox() { + // parses envelope label and sub ID from the letter + // looks up in sub registry + // calls req.order() +} -func (m *ReqManager) routeEvents() {} +func (m *ReqManager) routeEvents() { + // reads PoolEvent + // looks up in m.byPeer + // calls req.order() on each matching request +} // Helpers -func cleanPeers() {} +func cleanPeers(peers []string) []string { + return nil +} -var encoder struct{} +var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) -func generateID() {} +func generateID(prefix string) string { + return "" +} // ---------------------------------------------------------------------------- // Base Request // ---------------------------------------------------------------------------- -func (r *request) emit() {} +func (r *request) emit(entry JournalEntry) { + // send into journal entry channel + // selects on r.done and r.journals +} -func (r *request) command() {} +func (r *request) order(task reqTask) { + // send into task queue + // selects on r.done and r.tasks +} -func (r *request) send() {} +func (r *request) send( + peerID string, + data Envelope, + makeOutcomeTask func(peerID string, outcome LetterOutcome) reqTask, +) error { + return nil +} // ---------------------------------------------------------------------------- // Stream Request // ---------------------------------------------------------------------------- -func NewStreamReq() *StreamReq { +func NewStreamReq( + ctx context.Context, + id string, + filters [][]byte, + peers []string, + postmaster *Postmaster, + isConnected func(string) bool, + journals chan<- JournalEntry, + onClose func(), + handler slog.Handler, +) *StreamReq { + // start buffered pipe to event output + // pipe return drives channel closures return nil } @@ -264,31 +345,43 @@ func (r *StreamReq) sendClose(peerID string) error { } func (r *StreamReq) run() { - // buffers command queue internally - // switches on command type + // switches on task type } -func (r *StreamReq) applyRecordReqOutcome(cmd cmdRecordReqOutcome) {} +func (r *StreamReq) applyRecordReqOutcome(task taskRecordReqOutcome) {} -func (r *StreamReq) applyRecordCloseOutcome(cmd cmdRecordCloseOutcome) {} +func (r *StreamReq) applyRecordCloseOutcome(task taskRecordCloseOutcome) {} -func (r *StreamReq) applyReceiveEvent(cmd cmdReceiveEvent) {} +func (r *StreamReq) applyReceiveEvent(task taskReceiveEvent) {} -func (r *StreamReq) applyReceiveEOSE(cmd cmdReceiveEOSE) {} +func (r *StreamReq) applyReceiveEOSE(task taskReceiveEOSE) {} -func (r *StreamReq) applyReceiveClosed(cmd cmdReceiveClosed) {} +func (r *StreamReq) applyReceiveClosed(task taskReceiveClosed) {} -func (r *StreamReq) applyClosePeer(cmd cmdClosePeer) {} +func (r *StreamReq) applyClosePeer(task taskClosePeer) {} -func (r *StreamReq) applyCloseReq(cmd cmdCloseReq) {} +func (r *StreamReq) applyCloseReq(task taskCloseReq) {} -func (r *StreamReq) applyHandleReconnect(cmd cmdHandleReconnect) {} +func (r *StreamReq) applyHandleReconnect(task taskHandleReconnect) {} // ---------------------------------------------------------------------------- // Query Request // ---------------------------------------------------------------------------- -func NewQueryReq() *QueryReq { +func NewQueryReq( + ctx context.Context, + id string, + filters [][]byte, + peers []string, + postmaster *Postmaster, + isConnected func(string) bool, + journals chan<- JournalEntry, + eoseTimeout time.Duration, + onClose func(), + handler slog.Handler, +) *QueryReq { + // start buffered pipe to event output + // pipe return drives channel closures return nil } @@ -307,62 +400,61 @@ func (r *QueryReq) sendClose(peerID string) error { } func (r *QueryReq) run() { - // buffers command queue internally - // switches on command type + // switches on task type } -func (r *QueryReq) applyRecordReqOutcome(cmd cmdRecordReqOutcome) {} +func (r *QueryReq) applyRecordReqOutcome(task taskRecordReqOutcome) {} -func (r *QueryReq) applyRecordCloseOutcome(cmd cmdRecordCloseOutcome) {} +func (r *QueryReq) applyRecordCloseOutcome(task taskRecordCloseOutcome) {} -func (r *QueryReq) applyReceiveEvent(cmd cmdReceiveEvent) {} +func (r *QueryReq) applyReceiveEvent(task taskReceiveEvent) {} -func (r *QueryReq) applyReceiveEOSE(cmd cmdReceiveEOSE) {} +func (r *QueryReq) applyReceiveEOSE(task taskReceiveEOSE) {} -func (r *QueryReq) applyReceiveClosed(cmd cmdReceiveClosed) {} +func (r *QueryReq) applyReceiveClosed(task taskReceiveClosed) {} -func (r *QueryReq) applyClosePeer(cmd cmdClosePeer) {} +func (r *QueryReq) applyClosePeer(task taskClosePeer) {} -func (r *QueryReq) applyCloseReq(cmd cmdCloseReq) {} +func (r *QueryReq) applyCloseReq(task taskCloseReq) {} -func (r *QueryReq) applyHandleEOSETimeout(cmd cmdHandleEOSETimeout) {} +func (r *QueryReq) applyHandleEOSETimeout(task taskHandleEOSETimeout) {} // ---------------------------------------------------------------------------- -// Commands +// Request Tasks // ---------------------------------------------------------------------------- -func newRecordReqOutcome(peerID string, outcome LetterOutcome) cmdRecordReqOutcome { - return cmdRecordReqOutcome{peerID: peerID, outcome: outcome} +func newRecordReqOutcome(peerID string, outcome LetterOutcome) taskRecordReqOutcome { + return taskRecordReqOutcome{peerID: peerID, outcome: outcome} } -func newRecordCloseOutcome(peerID string, outcome LetterOutcome) cmdRecordCloseOutcome { - return cmdRecordCloseOutcome{peerID: peerID, outcome: outcome} +func newRecordCloseOutcome(peerID string, outcome LetterOutcome) taskRecordCloseOutcome { + return taskRecordCloseOutcome{peerID: peerID, outcome: outcome} } -func newReceiveEvent(peerID string, at time.Time, data Envelope) cmdReceiveEvent { - return cmdReceiveEvent{peerID: peerID, at: at, data: data} +func newReceiveEvent(peerID string, at time.Time, data Envelope) taskReceiveEvent { + return taskReceiveEvent{peerID: peerID, at: at, data: data} } -func newReceiveEOSE(peerID string, at time.Time) cmdReceiveEOSE { - return cmdReceiveEOSE{peerID: peerID, at: at} +func newReceiveEOSE(peerID string, at time.Time) taskReceiveEOSE { + return taskReceiveEOSE{peerID: peerID, at: at} } -func newReceiveClosed(peerID string, at time.Time, message string) cmdReceiveClosed { - return cmdReceiveClosed{peerID: peerID, at: at, message: message} +func newReceiveClosed(peerID string, at time.Time, message string) taskReceiveClosed { + return taskReceiveClosed{peerID: peerID, at: at, message: message} } -func newClosePeer(peerID string) cmdClosePeer { - return cmdClosePeer{peerID: peerID} +func newClosePeer(peerID string) taskClosePeer { + return taskClosePeer{peerID: peerID} } -func newCloseReq() cmdCloseReq { - return cmdCloseReq{} +func newCloseReq() taskCloseReq { + return taskCloseReq{} } -func newHandleReconnect(peerID string) cmdHandleReconnect { - return cmdHandleReconnect{peerID: peerID} +func newHandleReconnect(peerID string) taskHandleReconnect { + return taskHandleReconnect{peerID: peerID} } -func newHandleEOSETimeout(peerID string) cmdHandleEOSETimeout { - return cmdHandleEOSETimeout{peerID: peerID} +func newHandleEOSETimeout(peerID string) taskHandleEOSETimeout { + return taskHandleEOSETimeout{peerID: peerID} } diff --git a/reqmanager_test.go b/reqmanager_test.go new file mode 100644 index 0000000..8828353 --- /dev/null +++ b/reqmanager_test.go @@ -0,0 +1 @@ +package prism diff --git a/streamreq_test.go b/streamreq_test.go new file mode 100644 index 0000000..8828353 --- /dev/null +++ b/streamreq_test.go @@ -0,0 +1 @@ +package prism