402 lines
7.9 KiB
Go
402 lines
7.9 KiB
Go
package prism
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/base32"
|
|
"fmt"
|
|
"git.wisehodl.dev/jay/go-mana-component"
|
|
"git.wisehodl.dev/jay/go-roots-ws"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Types
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type ReqEvent struct {
|
|
PeerID string
|
|
ReceivedAt time.Time
|
|
Data []byte
|
|
}
|
|
|
|
type ReqClosed struct {
|
|
PeerID string
|
|
ReceivedAt time.Time
|
|
Data string
|
|
}
|
|
|
|
type RequestManager struct {
|
|
reqs map[string]*request
|
|
|
|
envoy *Envoy
|
|
events <-chan OutboundPoolEvent
|
|
inbox <-chan InboxMessage
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mu sync.RWMutex
|
|
wg sync.WaitGroup
|
|
handler slog.Handler
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// ID Generation
|
|
// ----------------------------------------------------------------------------
|
|
|
|
var b32 = base32.StdEncoding.WithPadding(base32.NoPadding)
|
|
|
|
func generateID() string {
|
|
b := make([]byte, 5)
|
|
if _, err := rand.Read(b); err != nil {
|
|
panic(fmt.Sprintf("generateID: %v", err))
|
|
}
|
|
return b32.EncodeToString(b)
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Options
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type RequestOption func(*requestOptions)
|
|
|
|
type requestOptions struct {
|
|
id string
|
|
label string
|
|
}
|
|
|
|
// WithID sets an explicit subscription ID. Returns an error from Stream or
|
|
// Query if the ID is already in use.
|
|
func WithID(id string) RequestOption {
|
|
return func(o *requestOptions) { o.id = id }
|
|
}
|
|
|
|
// WithLabel sets the prefix for the generated subscription ID. The default
|
|
// prefix is "req". The counter is shared across all labels.
|
|
func WithLabel(label string) RequestOption {
|
|
return func(o *requestOptions) { o.label = label }
|
|
}
|
|
|
|
type request struct {
|
|
id string
|
|
filters [][]byte
|
|
|
|
isQuery bool
|
|
active bool
|
|
|
|
buffer chan ReqEvent
|
|
events chan ReqEvent
|
|
closed chan ReqClosed
|
|
|
|
deregisterOnce sync.Once
|
|
closedOnce sync.Once
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Request Manager
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func NewRequestManager(e *Envoy) *RequestManager {
|
|
ctx, cancel := context.WithCancel(
|
|
component.MustExtend(e.Context(), "request_manager"))
|
|
|
|
m := &RequestManager{
|
|
reqs: make(map[string]*request),
|
|
|
|
envoy: e,
|
|
events: e.SubscribeEvents(),
|
|
inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}),
|
|
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
if h := e.Handler(); h != nil {
|
|
comp := component.FromContext(ctx)
|
|
m.handler = h
|
|
m.logger = slog.New(h).With(slog.Any("component", comp))
|
|
}
|
|
|
|
m.wg.Add(2)
|
|
go m.handleEvents()
|
|
go m.routeInbox()
|
|
|
|
return m
|
|
}
|
|
|
|
func (m *RequestManager) Stream(
|
|
filters [][]byte,
|
|
opts ...RequestOption,
|
|
) (string, <-chan ReqEvent, <-chan ReqClosed, error) {
|
|
id, events, closed, err := m.newStream(filters, false, opts...)
|
|
return id, events, closed, err
|
|
}
|
|
|
|
func (m *RequestManager) newStream(
|
|
filters [][]byte,
|
|
isQuery bool,
|
|
opts ...RequestOption,
|
|
) (string, <-chan ReqEvent, <-chan ReqClosed, error) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
var o requestOptions
|
|
for _, opt := range opts {
|
|
opt(&o)
|
|
}
|
|
|
|
var id string
|
|
if o.id != "" {
|
|
if _, exists := m.reqs[o.id]; exists {
|
|
return "", nil, nil, fmt.Errorf("Stream: id %q already in use", o.id)
|
|
}
|
|
id = o.id
|
|
} else {
|
|
label := o.label
|
|
if label == "" {
|
|
label = "REQ"
|
|
}
|
|
id = fmt.Sprintf("%s:%s", label, generateID())
|
|
}
|
|
buffer := make(chan ReqEvent, 64)
|
|
closed := make(chan ReqClosed, 1)
|
|
|
|
events := make(chan ReqEvent)
|
|
go func() {
|
|
bufferedPipe(buffer, events)
|
|
close(events)
|
|
}()
|
|
|
|
req := &request{
|
|
id: id,
|
|
filters: filters,
|
|
buffer: buffer,
|
|
events: events,
|
|
closed: closed,
|
|
isQuery: isQuery,
|
|
}
|
|
|
|
m.reqs[id] = req
|
|
if m.envoy.IsConnected() {
|
|
m.activate(req)
|
|
}
|
|
|
|
return id, events, closed, nil
|
|
}
|
|
|
|
func (m *RequestManager) Query(
|
|
filters [][]byte,
|
|
timeout time.Duration,
|
|
opts ...RequestOption,
|
|
) ([]ReqEvent, *ReqClosed, error) {
|
|
id, eventsCh, closedCh, err := m.newStream(filters, true, opts...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(m.ctx, timeout)
|
|
defer cancel()
|
|
|
|
var result []ReqEvent
|
|
for {
|
|
select {
|
|
case ev, ok := <-eventsCh:
|
|
if !ok {
|
|
return result, nil, nil
|
|
}
|
|
result = append(result, ev)
|
|
case cl, ok := <-closedCh:
|
|
if !ok {
|
|
return result, nil, nil
|
|
}
|
|
return result, &cl, nil
|
|
case <-ctx.Done():
|
|
m.Cancel(id)
|
|
return result, nil, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *RequestManager) Cancel(id string) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
req, ok := m.reqs[id]
|
|
if !ok {
|
|
return fmt.Errorf("Cancel: unknown id %q", id)
|
|
}
|
|
|
|
if req.active {
|
|
go m.envoy.Send(envelope.EncloseClose(id))
|
|
req.active = false
|
|
}
|
|
|
|
req.deregisterOnce.Do(func() {
|
|
close(req.buffer)
|
|
close(req.closed)
|
|
})
|
|
delete(m.reqs, id)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *RequestManager) Close() {
|
|
m.cancel()
|
|
m.wg.Wait()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
for id, req := range m.reqs {
|
|
if req.active {
|
|
go m.envoy.Send(envelope.EncloseClose(id))
|
|
}
|
|
req.deregisterOnce.Do(func() {
|
|
close(req.buffer)
|
|
close(req.closed)
|
|
})
|
|
delete(m.reqs, id)
|
|
}
|
|
}
|
|
|
|
func (m *RequestManager) activate(req *request) {
|
|
req.active = true
|
|
go m.envoy.Send(envelope.EncloseReq(req.id, req.filters))
|
|
}
|
|
|
|
func (m *RequestManager) deregister(req *request) {
|
|
req.active = false
|
|
req.deregisterOnce.Do(func() {
|
|
close(req.buffer)
|
|
close(req.closed)
|
|
})
|
|
delete(m.reqs, req.id)
|
|
}
|
|
|
|
func (m *RequestManager) handleEvents() {
|
|
defer m.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case ev, ok := <-m.events:
|
|
if !ok {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
switch ev.Kind {
|
|
case EventConnected:
|
|
for _, req := range m.reqs {
|
|
m.activate(req)
|
|
}
|
|
case EventDisconnected:
|
|
for _, req := range m.reqs {
|
|
req.active = false
|
|
}
|
|
}
|
|
m.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *RequestManager) routeInbox() {
|
|
defer m.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
return
|
|
case msg, ok := <-m.inbox:
|
|
if !ok {
|
|
return
|
|
}
|
|
m.dispatchInbox(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
|
label, err := envelope.GetLabel(msg.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
switch string(label) {
|
|
case "EVENT":
|
|
m.routeEvent(msg)
|
|
case "EOSE":
|
|
m.routeEOSE(msg)
|
|
case "CLOSED":
|
|
m.routeClosed(msg)
|
|
}
|
|
}
|
|
|
|
// routeEvent, routeEOSE, and routeClosed use blocking sends into req.buffer,
|
|
// which reads eagerly into a slice buffer and cannot block the router.
|
|
func (m *RequestManager) routeEvent(msg InboxMessage) {
|
|
subID, event, err := envelope.FindSubscriptionEvent(msg.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
m.mu.RLock()
|
|
req, ok := m.reqs[subID]
|
|
m.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
req.buffer <- ReqEvent{
|
|
PeerID: msg.ID,
|
|
ReceivedAt: msg.ReceivedAt,
|
|
Data: event,
|
|
}
|
|
}
|
|
|
|
// routeEvent, routeEOSE, and routeClosed are always called sequentially from
|
|
// the same routeInbox goroutine via dispatchInbox. This makes it safe for
|
|
// routeEOSE to close req.buffer: no concurrent routeEvent send can race it.
|
|
func (m *RequestManager) routeEOSE(msg InboxMessage) {
|
|
subID, err := envelope.FindEOSE(msg.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
req, ok := m.reqs[subID]
|
|
if !ok {
|
|
return
|
|
}
|
|
if req.active && req.isQuery {
|
|
// manually cleanup query state
|
|
// specifically, do not close req.closed or events can be missed
|
|
req.active = false
|
|
close(req.buffer)
|
|
delete(m.reqs, req.id)
|
|
go m.envoy.Send(envelope.EncloseClose(subID))
|
|
}
|
|
}
|
|
|
|
func (m *RequestManager) routeClosed(msg InboxMessage) {
|
|
subID, message, err := envelope.FindClosed(msg.Data)
|
|
if err != nil {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
req, ok := m.reqs[subID]
|
|
if !ok {
|
|
return
|
|
}
|
|
req.closedOnce.Do(func() {
|
|
req.closed <- ReqClosed{
|
|
PeerID: msg.ID,
|
|
ReceivedAt: msg.ReceivedAt,
|
|
Data: message,
|
|
}
|
|
})
|
|
m.deregister(req)
|
|
}
|