From 093a56ea56f54c380e8d0019966c7634cba36c2c Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 20 May 2026 08:46:13 -0400 Subject: [PATCH] gut inbound and queue. promote outbound to honeybee. --- outbound/config.go => config.go | 2 +- ...config_pool_test.go => config_pool_test.go | 2 +- outbound/errors.go => errors.go | 2 +- outbound/helper_test.go => helper_test.go | 2 +- inbound/config.go | 246 ---------- inbound/config_test.go | 207 --------- inbound/errors.go | 17 - inbound/helpers_test.go | 24 - inbound/pool.go | 423 ------------------ inbound/pool_test.go | 400 ----------------- inbound/worker.go | 340 -------------- inbound/worker_forwarder_test.go | 33 -- inbound/worker_reader_test.go | 213 --------- inbound/worker_test.go | 266 ----------- inbound/worker_watchdog_test.go | 135 ------ outbound/pool.go => pool.go | 2 +- outbound/pool_test.go => pool_test.go | 2 +- queue/queue.go | 131 ------ queue/queue_test.go | 105 ----- outbound/worker.go => worker.go | 2 +- ...er_dialer_test.go => worker_dialer_test.go | 2 +- ...palive_test.go => worker_keepalive_test.go | 2 +- ...worker_send_test.go => worker_send_test.go | 2 +- ...er_test.go => worker_session_inner_test.go | 2 +- ..._session_test.go => worker_session_test.go | 2 +- ...rker_start_test.go => worker_start_test.go | 2 +- 26 files changed, 13 insertions(+), 2553 deletions(-) rename outbound/config.go => config.go (99%) rename outbound/config_pool_test.go => config_pool_test.go (99%) rename outbound/errors.go => errors.go (98%) rename outbound/helper_test.go => helper_test.go (96%) delete mode 100644 inbound/config.go delete mode 100644 inbound/config_test.go delete mode 100644 inbound/errors.go delete mode 100644 inbound/helpers_test.go delete mode 100644 inbound/pool.go delete mode 100644 inbound/pool_test.go delete mode 100644 inbound/worker.go delete mode 100644 inbound/worker_forwarder_test.go delete mode 100644 inbound/worker_reader_test.go delete mode 100644 inbound/worker_test.go delete mode 100644 inbound/worker_watchdog_test.go rename outbound/pool.go => pool.go (99%) rename outbound/pool_test.go => pool_test.go (99%) delete mode 100644 queue/queue.go delete mode 100644 queue/queue_test.go rename outbound/worker.go => worker.go (99%) rename outbound/worker_dialer_test.go => worker_dialer_test.go (99%) rename outbound/worker_keepalive_test.go => worker_keepalive_test.go (99%) rename outbound/worker_send_test.go => worker_send_test.go (99%) rename outbound/worker_session_inner_test.go => worker_session_inner_test.go (99%) rename outbound/worker_session_test.go => worker_session_test.go (99%) rename outbound/worker_start_test.go => worker_start_test.go (99%) diff --git a/outbound/config.go b/config.go similarity index 99% rename from outbound/config.go rename to config.go index dae398e..896e923 100644 --- a/outbound/config.go +++ b/config.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/config_pool_test.go b/config_pool_test.go similarity index 99% rename from outbound/config_pool_test.go rename to config_pool_test.go index df503fb..25b4ba7 100644 --- a/outbound/config_pool_test.go +++ b/config_pool_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "git.wisehodl.dev/jay/go-honeybee/transport" diff --git a/outbound/errors.go b/errors.go similarity index 98% rename from outbound/errors.go rename to errors.go index e308151..7933fee 100644 --- a/outbound/errors.go +++ b/errors.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import "errors" import "fmt" diff --git a/outbound/helper_test.go b/helper_test.go similarity index 96% rename from outbound/helper_test.go rename to helper_test.go index 0cda7c1..cab276b 100644 --- a/outbound/helper_test.go +++ b/helper_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "git.wisehodl.dev/jay/go-honeybee/honeybeetest" diff --git a/inbound/config.go b/inbound/config.go deleted file mode 100644 index 967ec5f..0000000 --- a/inbound/config.go +++ /dev/null @@ -1,246 +0,0 @@ -package inbound - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/transport" - "log/slog" - "time" -) - -// Pool Config - -type WorkerFactory func( - ctx context.Context, - id string, - conn *transport.Connection, - config *WorkerConfig, - logger *slog.Logger, -) (Worker, error) - -type PoolConfig struct { - InboxBufferSize int - EventsBufferSize int - LoggingEnabled bool - LogLevel *slog.Level - ConnectionConfig *transport.ConnectionConfig - WorkerConfig *WorkerConfig - WorkerFactory WorkerFactory -} - -type PoolOption func(*PoolConfig) error - -func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) { - conf := GetDefaultPoolConfig() - if err := applyPoolOptions(conf, options...); err != nil { - return nil, err - } - if err := ValidatePoolConfig(conf); err != nil { - return nil, err - } - return conf, nil -} - -func GetDefaultPoolConfig() *PoolConfig { - return &PoolConfig{ - InboxBufferSize: 256, - EventsBufferSize: 10, - LoggingEnabled: true, - LogLevel: nil, - ConnectionConfig: nil, - WorkerConfig: nil, - WorkerFactory: nil, - } -} - -func applyPoolOptions(config *PoolConfig, options ...PoolOption) error { - for _, option := range options { - if err := option(config); err != nil { - return err - } - } - return nil -} - -func ValidatePoolConfig(config *PoolConfig) error { - if config.ConnectionConfig != nil { - if err := transport.ValidateConnectionConfig(config.ConnectionConfig); err != nil { - return err - } - } - if config.WorkerConfig != nil { - if err := ValidateWorkerConfig(config.WorkerConfig); err != nil { - return err - } - } - return nil -} - -func validateBufferSize(value int) error { - if value < 1 { - return InvalidBufferSize - } - return nil -} - -func WithInboxBufferSize(value int) PoolOption { - return func(c *PoolConfig) error { - if err := validateBufferSize(value); err != nil { - return err - } - c.InboxBufferSize = value - return nil - } -} - -func WithEventsBufferSize(value int) PoolOption { - return func(c *PoolConfig) error { - if err := validateBufferSize(value); err != nil { - return err - } - c.EventsBufferSize = value - return nil - } -} - -func WithPoolLoggingEnabled(value bool) PoolOption { - return func(c *PoolConfig) error { - c.LoggingEnabled = value - return nil - } -} - -func WithPoolLogLevel(level slog.Level) PoolOption { - return func(c *PoolConfig) error { - l := level - c.LogLevel = &l - return nil - } -} - -func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { - return func(c *PoolConfig) error { - if err := transport.ValidateConnectionConfig(cc); err != nil { - return err - } - c.ConnectionConfig = cc - return nil - } -} - -func WithWorkerConfig(wc *WorkerConfig) PoolOption { - return func(c *PoolConfig) error { - if err := ValidateWorkerConfig(wc); err != nil { - return err - } - c.WorkerConfig = wc - return nil - } -} - -func WithWorkerFactory(wf WorkerFactory) PoolOption { - return func(c *PoolConfig) error { - c.WorkerFactory = wf - return nil - } -} - -// Worker Config - -type WorkerConfig struct { - MaxQueueSize int - InactivityTimeout time.Duration - LoggingEnabled bool - LogLevel *slog.Level -} - -type WorkerOption func(*WorkerConfig) error - -func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { - conf := GetDefaultWorkerConfig() - if err := applyWorkerOptions(conf, options...); err != nil { - return nil, err - } - if err := ValidateWorkerConfig(conf); err != nil { - return nil, err - } - return conf, nil -} - -func GetDefaultWorkerConfig() *WorkerConfig { - return &WorkerConfig{ - MaxQueueSize: 0, // queue can grow indefinitely by default - InactivityTimeout: 0, // eviction disabled by default - LoggingEnabled: true, - LogLevel: nil, - } -} - -func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { - for _, option := range options { - if err := option(config); err != nil { - return err - } - } - return nil -} - -func ValidateWorkerConfig(config *WorkerConfig) error { - if err := validateMaxQueueSize(config.MaxQueueSize); err != nil { - return err - } - if err := validateInactivityTimeout(config.InactivityTimeout); err != nil { - return err - } - return nil -} - -func validateMaxQueueSize(value int) error { - if value < 0 { - return InvalidMaxQueueSize - } - return nil -} - -func validateInactivityTimeout(value time.Duration) error { - if value < 0 { - return InvalidInactivityTimeout - } - return nil -} - -// When MaxQueueSize is zero, queue limits are disabled. -func WithMaxQueueSize(value int) WorkerOption { - return func(c *WorkerConfig) error { - if err := validateMaxQueueSize(value); err != nil { - return err - } - c.MaxQueueSize = value - return nil - } -} - -// When InactivityTimeout is zero, the watchdog is disabled. -func WithInactivityTimeout(value time.Duration) WorkerOption { - return func(c *WorkerConfig) error { - if err := validateInactivityTimeout(value); err != nil { - return err - } - c.InactivityTimeout = value - return nil - } -} - -func WithWorkerLoggingEnabled(value bool) WorkerOption { - return func(c *WorkerConfig) error { - c.LoggingEnabled = value - return nil - } -} - -func WithWorkerLogLevel(level slog.Level) WorkerOption { - return func(c *WorkerConfig) error { - l := level - c.LogLevel = &l - return nil - } -} diff --git a/inbound/config_test.go b/inbound/config_test.go deleted file mode 100644 index ef46f01..0000000 --- a/inbound/config_test.go +++ /dev/null @@ -1,207 +0,0 @@ -// responderpool/config_test.go -package inbound - -import ( - "git.wisehodl.dev/jay/go-honeybee/transport" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -func TestNewWorkerConfig(t *testing.T) { - conf, err := NewWorkerConfig() - assert.NoError(t, err) - assert.Equal(t, GetDefaultWorkerConfig(), conf) -} - -func TestDefaultWorkerConfig(t *testing.T) { - conf := GetDefaultWorkerConfig() - assert.Equal(t, &WorkerConfig{ - MaxQueueSize: 0, - InactivityTimeout: 0, - LoggingEnabled: true, - LogLevel: nil, - }, conf) -} - -func TestValidateWorkerConfig(t *testing.T) { - cases := []struct { - name string - conf WorkerConfig - wantErr error - }{ - { - name: "valid defaults", - conf: *GetDefaultWorkerConfig(), - }, - { - name: "zero inactivity timeout disabled", - conf: WorkerConfig{InactivityTimeout: 0}, - }, - { - name: "positive inactivity timeout", - conf: WorkerConfig{InactivityTimeout: 30 * time.Second}, - }, - { - name: "negative max queue size", - conf: WorkerConfig{MaxQueueSize: -1}, - wantErr: InvalidMaxQueueSize, - }, - { - name: "negative inactivity timeout", - conf: WorkerConfig{InactivityTimeout: -1 * time.Second}, - wantErr: InvalidInactivityTimeout, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - err := ValidateWorkerConfig(&tc.conf) - if tc.wantErr != nil { - assert.ErrorIs(t, err, tc.wantErr) - return - } - assert.NoError(t, err) - }) - } -} - -func TestWithMaxQueueSize(t *testing.T) { - conf := &WorkerConfig{} - - err := applyWorkerOptions(conf, WithMaxQueueSize(10)) - assert.NoError(t, err) - assert.Equal(t, 10, conf.MaxQueueSize) - - err = applyWorkerOptions(conf, WithMaxQueueSize(0)) - assert.NoError(t, err) - - err = applyWorkerOptions(conf, WithMaxQueueSize(-1)) - assert.ErrorIs(t, err, InvalidMaxQueueSize) -} - -func TestWithInactivityTimeout(t *testing.T) { - conf := &WorkerConfig{} - - err := applyWorkerOptions(conf, WithInactivityTimeout(30*time.Second)) - assert.NoError(t, err) - assert.Equal(t, 30*time.Second, conf.InactivityTimeout) - - err = applyWorkerOptions(conf, WithInactivityTimeout(0)) - assert.NoError(t, err) - - err = applyWorkerOptions(conf, WithInactivityTimeout(-1*time.Second)) - assert.ErrorIs(t, err, InvalidInactivityTimeout) -} - -func TestNewPoolConfig(t *testing.T) { - conf, err := NewPoolConfig() - assert.NoError(t, err) - assert.Equal(t, GetDefaultPoolConfig(), conf) -} - -func TestDefaultPoolConfig(t *testing.T) { - conf := GetDefaultPoolConfig() - assert.Equal(t, &PoolConfig{ - InboxBufferSize: 256, - EventsBufferSize: 10, - LoggingEnabled: true, - LogLevel: nil, - ConnectionConfig: nil, - WorkerConfig: nil, - WorkerFactory: nil, - }, conf) -} - -func TestValidatePoolConfig(t *testing.T) { - cases := []struct { - name string - conf PoolConfig - wantErrText string - }{ - { - name: "valid empty", - conf: PoolConfig{}, - }, - { - name: "valid defaults", - conf: *GetDefaultPoolConfig(), - }, - { - name: "valid with configs", - conf: PoolConfig{ - ConnectionConfig: &transport.ConnectionConfig{}, - WorkerConfig: &WorkerConfig{}, - }, - }, - { - name: "invalid connection config", - conf: PoolConfig{ - ConnectionConfig: &transport.ConnectionConfig{ - Retry: &transport.RetryConfig{ - InitialDelay: 10 * time.Second, - MaxDelay: 1 * time.Second, - }, - }, - }, - wantErrText: "initial delay may not exceed maximum delay", - }, - { - name: "invalid worker config", - conf: PoolConfig{ - WorkerConfig: &WorkerConfig{MaxQueueSize: -1}, - }, - wantErrText: "maximum queue size cannot be negative", - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - err := ValidatePoolConfig(&tc.conf) - if tc.wantErrText != "" { - assert.ErrorContains(t, err, tc.wantErrText) - return - } - assert.NoError(t, err) - }) - } -} - -func TestWithBufferSizes(t *testing.T) { - conf := &PoolConfig{} - - err := applyPoolOptions(conf, - WithInboxBufferSize(100), - WithEventsBufferSize(20), - ) - assert.NoError(t, err) - assert.Equal(t, 100, conf.InboxBufferSize) - assert.Equal(t, 20, conf.EventsBufferSize) -} - -func TestWithConnectionConfig(t *testing.T) { - conf := &PoolConfig{} - - err := applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{})) - assert.NoError(t, err) - assert.NotNil(t, conf.ConnectionConfig) - - err = applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{ - Retry: &transport.RetryConfig{ - InitialDelay: 10 * time.Second, - MaxDelay: 1 * time.Second, - }, - })) - assert.Error(t, err) -} - -func TestWithWorkerConfig(t *testing.T) { - conf := &PoolConfig{} - - err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{InactivityTimeout: 30 * time.Second})) - assert.NoError(t, err) - assert.Equal(t, 30*time.Second, conf.WorkerConfig.InactivityTimeout) - - err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1})) - assert.Error(t, err) -} diff --git a/inbound/errors.go b/inbound/errors.go deleted file mode 100644 index 6fbe42f..0000000 --- a/inbound/errors.go +++ /dev/null @@ -1,17 +0,0 @@ -package inbound - -import "errors" - -var ( - // Pool errors - PoolError = errors.New("pool error") - ErrInvalidPoolID = errors.New("pool id cannot be empty") - ErrPoolClosed = errors.New("pool is closed") - ErrPeerNotFound = errors.New("peer not found") - ErrPeerExists = errors.New("peer already exists") - - // Config errors - InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") - InvalidInactivityTimeout = errors.New("inactivity timeout cannot be negative") - InvalidBufferSize = errors.New("buffer size must be greater than zero") -) diff --git a/inbound/helpers_test.go b/inbound/helpers_test.go deleted file mode 100644 index d1cf8b4..0000000 --- a/inbound/helpers_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package inbound - -import ( - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/transport" - "github.com/stretchr/testify/assert" - "testing" -) - -func setupTestConnection(t *testing.T) ( - conn *transport.Connection, - socket *honeybeetest.MockSocket, - incoming chan honeybeetest.MockIncomingData, - outgoing chan honeybeetest.MockOutgoingData, -) { - t.Helper() - - socket, incoming, outgoing = honeybeetest.SetupTestSocket(t) - - var err error - conn, err = transport.NewConnectionFromSocket(socket, nil, nil) - assert.NoError(t, err) - return -} diff --git a/inbound/pool.go b/inbound/pool.go deleted file mode 100644 index 3ca2e9e..0000000 --- a/inbound/pool.go +++ /dev/null @@ -1,423 +0,0 @@ -package inbound - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-honeybee/logging" - "git.wisehodl.dev/jay/go-honeybee/transport" - "git.wisehodl.dev/jay/go-honeybee/types" - "log/slog" - "sync" - "sync/atomic" - "time" -) - -// Re-exported types for consumer convenience - -type Socket = types.Socket -type InboxMessage = types.InboxMessage - -var NormalizeURL = transport.NormalizeURL - -// Types - -type PoolEventKind string - -const ( - EventDisconnected PoolEventKind = "disconnected" - EventDroppedClose PoolEventKind = "dropped_close" - EventDroppedError PoolEventKind = "dropped_error" - EventEvictedPolicy PoolEventKind = "evicted_policy" -) - -var workerToPoolEvent = map[WorkerExitKind]PoolEventKind{ - ExitDisconnected: EventDisconnected, - ExitUnexpectedClose: EventDroppedClose, - ExitReadError: EventDroppedError, - ExitPolicy: EventEvictedPolicy, -} - -type OnExitFunction func(kind WorkerExitKind) - -type PoolEvent struct { - ID string - Kind PoolEventKind - At time.Time -} - -type PoolStats struct { - ChanInbox int - ChanEvents int - - TotalReceived uint64 - TotalSent uint64 - - PeerCount int - PeerStats []PeerStats -} - -type PeerStats struct { - ID string - Worker WorkerStats - Connection transport.ConnectionStats -} - -type PoolPlugin struct { - Inbox chan<- types.InboxMessage - Events chan<- PoolEvent - InboxCounter *atomic.Uint64 - OnExit OnExitFunction - Handler slog.Handler -} - -// Pool - -type Peer struct { - id string - conn *transport.Connection - worker Worker - done chan struct{} -} - -type Pool struct { - ctx context.Context - cancel context.CancelFunc - - id string - - peers map[string]*Peer - inbox chan types.InboxMessage - events chan PoolEvent - - inboxCounter *atomic.Uint64 - outgoingCount *atomic.Uint64 - - config *PoolConfig - handler slog.Handler - logger *slog.Logger - - mu sync.RWMutex - wg sync.WaitGroup - closed bool -} - -func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Handler) (*Pool, error) { - if id == "" { - return nil, ErrInvalidPoolID - } - - if config == nil { - config = GetDefaultPoolConfig() - } - - // If a custom factory is supplied, config.WorkerConfig is not used. - // The factory function should be non-blocking or else Connect() may cause - // deadlocks. - if config.WorkerFactory == nil { - config.WorkerFactory = func( - ctx context.Context, - id string, - conn *transport.Connection, - config *WorkerConfig, - logger *slog.Logger, - ) (Worker, error) { - return NewWorker(ctx, id, conn, config, logger) - } - } - - if err := ValidatePoolConfig(config); err != nil { - return nil, err - } - - pctx, cancel := context.WithCancel(ctx) - - var logger *slog.Logger - if handler != nil && config.LoggingEnabled { - logger = logging.NewInboundPoolLogger( - logging.WrapOrDefault(config.LogLevel, handler), id) - } - - return &Pool{ - ctx: pctx, - cancel: cancel, - id: id, - peers: make(map[string]*Peer), - inbox: make(chan types.InboxMessage, config.InboxBufferSize), - events: make(chan PoolEvent, config.EventsBufferSize), - inboxCounter: &atomic.Uint64{}, - outgoingCount: &atomic.Uint64{}, - config: config, - handler: handler, - logger: logger, - }, nil -} - -func (p *Pool) Peers() []string { - p.mu.RLock() - defer p.mu.RUnlock() - - ids := make([]string, 0, len(p.peers)) - for id := range p.peers { - ids = append(ids, id) - } - - return ids -} - -func (p *Pool) Inbox() <-chan types.InboxMessage { - return p.inbox -} - -func (p *Pool) Events() <-chan PoolEvent { - return p.events -} - -func (p *Pool) Stats() PoolStats { - p.mu.RLock() - defer p.mu.RUnlock() - - count := len(p.peers) - peerStats := make([]PeerStats, 0, count) - for id, peer := range p.peers { - peerStats = append(peerStats, PeerStats{ - ID: id, - Worker: peer.worker.Stats(), - Connection: peer.conn.Stats(), - }) - } - - return PoolStats{ - ChanInbox: len(p.inbox), - ChanEvents: len(p.events), - - TotalReceived: p.inboxCounter.Load(), - TotalSent: p.outgoingCount.Load(), - - PeerCount: len(p.peers), - PeerStats: peerStats, - } -} - -func (p *Pool) PeerStats(id string) (PeerStats, error) { - p.mu.RLock() - defer p.mu.RUnlock() - - peer, exists := p.peers[id] - if !exists { - return PeerStats{}, ErrPeerNotFound - } - - return PeerStats{ - ID: id, - Worker: peer.worker.Stats(), - Connection: peer.conn.Stats(), - }, nil -} - -func (p *Pool) Close() { - if p.logger != nil { - p.logger.Debug("closing") - } - - p.mu.Lock() - if p.closed { - p.mu.Unlock() - return - } - - p.closed = true - p.cancel() - - // close all connections - for _, peer := range p.peers { - peer.worker.Stop() - peer.conn.Close() - } - - // remove all peers - p.peers = make(map[string]*Peer) - - p.mu.Unlock() - - go func() { - p.wg.Wait() - close(p.inbox) - close(p.events) - - if p.logger != nil { - p.logger.Info("closed") - } - }() -} - -func (p *Pool) Add(id string, socket types.Socket) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return ErrPoolClosed - } - - if _, exists := p.peers[id]; exists { - return ErrPeerExists - } - - return p.addLocked(id, socket) -} - -func (p *Pool) Replace(id string, socket types.Socket) error { - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return ErrPoolClosed - } - - if peer, exists := p.peers[id]; exists { - p.removeLocked(peer) - - if p.logger != nil { - p.logger.Info("removed peer", "peer", id) - } - - } else { - return ErrPeerNotFound - } - - return p.addLocked(id, socket) -} - -func (p *Pool) Remove(id string) error { - if p.logger != nil { - p.logger.Debug("removing peer", "peer", id) - } - - p.mu.Lock() - defer p.mu.Unlock() - - if p.closed { - return ErrPoolClosed - } - - peer, exists := p.peers[id] - if !exists { - return ErrPeerNotFound - } - - p.removeLocked(peer) - - if p.logger != nil { - p.logger.Info("removed peer", "peer", id) - } - - return nil -} - -func (p *Pool) Send(id string, data []byte) error { - p.mu.RLock() - defer p.mu.RUnlock() - - if p.closed { - return ErrPoolClosed - } - - peer, exists := p.peers[id] - if !exists { - return ErrPeerNotFound - } - - err := peer.worker.Send(data) - if err != nil { - return err - } - - p.outgoingCount.Add(1) - return nil -} - -// addLocked constructs and registers a peer. Caller must hold p.mu write lock. -func (p *Pool) addLocked(id string, socket types.Socket) error { - var logger *slog.Logger - if p.handler != nil && p.config.ConnectionConfig.LoggingEnabled { - logger = logging.NewConnectionLogger( - logging.WrapOrDefault(p.config.ConnectionConfig.LogLevel, p.handler), p.id, id) - } - - conn, err := transport.NewConnectionFromSocket( - socket, p.config.ConnectionConfig, logger) - if err != nil { - return err - } - - wctx, cancel := context.WithCancel(p.ctx) - if p.handler != nil && p.config.WorkerConfig.LoggingEnabled { - logger = logging.NewInboundWorkerLogger( - logging.WrapOrDefault(p.config.WorkerConfig.LogLevel, p.handler), p.id, id) - } - - // The worker factory must be non-blocking to avoid deadlocks - worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger) - if err != nil { - cancel() - conn.Close() - return fmt.Errorf("%w: %w", PoolError, err) - } - - var once sync.Once - onExit := func(kind WorkerExitKind) { - once.Do(func() { - p.mu.Lock() - delete(p.peers, id) - p.mu.Unlock() - - conn.Close() - - select { - case p.events <- PoolEvent{ID: id, Kind: workerToPoolEvent[kind], At: time.Now()}: - case <-p.ctx.Done(): - return - } - }) - } - - pool := PoolPlugin{ - Inbox: p.inbox, - Events: p.events, - InboxCounter: p.inboxCounter, - OnExit: onExit, - Handler: p.handler, - } - - peer := &Peer{ - id: id, - conn: conn, - worker: worker, - done: make(chan struct{}), - } - - p.wg.Add(1) - go func() { - defer cancel() - defer close(peer.done) - worker.Start(pool) - p.wg.Done() - }() - - p.peers[id] = peer - - if p.logger != nil { - p.logger.Info("added peer", "peer", id) - } - - return nil -} - -// removeLocked closes and unregisters a peer. Caller must hold p.mu write lock. -func (p *Pool) removeLocked(peer *Peer) { - delete(p.peers, peer.id) - peer.worker.Stop() - go func() { - <-peer.done - peer.conn.Close() - }() -} diff --git a/inbound/pool_test.go b/inbound/pool_test.go deleted file mode 100644 index aa5e19c..0000000 --- a/inbound/pool_test.go +++ /dev/null @@ -1,400 +0,0 @@ -package inbound - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" - "slices" - "testing" - "time" -) - -// Helpers - -func setupPool(t *testing.T) *Pool { - t.Helper() - pool, err := NewPool(context.Background(), "pool-1", nil, nil) - assert.NoError(t, err) - return pool -} - -func expectEvent( - t *testing.T, - events <-chan PoolEvent, - expectedURL string, - expectedKind PoolEventKind, -) { - t.Helper() - honeybeetest.Eventually(t, func() bool { - select { - case e := <-events: - return e.ID == expectedURL && e.Kind == expectedKind && !e.At.IsZero() - default: - return false - } - }, fmt.Sprintf("expected event: URL=%q, Kind=%q", expectedURL, expectedKind)) -} - -// Tests - -func TestPoolID(t *testing.T) { - _, err := NewPool(context.Background(), "", nil, nil) - assert.ErrorIs(t, err, ErrInvalidPoolID) -} - -func TestPoolAdd(t *testing.T) { - t.Run("successfully adds peer", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.NoError(t, err) - }) - - t.Run("peer appears in Peers after add", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.NoError(t, err) - - assert.Contains(t, pool.Peers(), "peer-1") - }) - - t.Run("duplicate id returns ErrPeerExists", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket1, _, _ := honeybeetest.SetupTestSocket(t) - socket2, _, _ := honeybeetest.SetupTestSocket(t) - - err := pool.Add("peer-1", socket1) - assert.NoError(t, err) - - err = pool.Add("peer-1", socket2) - assert.ErrorIs(t, err, ErrPeerExists) - }) - - t.Run("closed pool returns ErrPoolClosed", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.ErrorIs(t, err, ErrPoolClosed) - }) -} - -func TestPoolReplace(t *testing.T) { - t.Run("replaces existing peer", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket1, _, _ := honeybeetest.SetupTestSocket(t) - socket2, _, _ := honeybeetest.SetupTestSocket(t) - - err := pool.Add("peer-1", socket1) - assert.NoError(t, err) - - err = pool.Replace("peer-1", socket2) - assert.NoError(t, err) - - assert.Contains(t, pool.Peers(), "peer-1") - }) - - t.Run("unknown id returns ErrPeerNotFound", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Replace("unknown", socket) - assert.ErrorIs(t, err, ErrPeerNotFound) - }) - - t.Run("closed pool returns ErrPoolClosed", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Replace("peer-1", socket) - assert.ErrorIs(t, err, ErrPoolClosed) - }) - - t.Run("no event emitted for replaced peer", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket1, _, _ := honeybeetest.SetupTestSocket(t) - socket2, _, _ := honeybeetest.SetupTestSocket(t) - - err := pool.Add("peer-1", socket1) - assert.NoError(t, err) - - err = pool.Replace("peer-1", socket2) - assert.NoError(t, err) - - honeybeetest.Never(t, func() bool { - select { - case <-pool.Events(): - return true - default: - return false - } - }, "no event expected on replace") - }) -} - -func TestPoolRemove(t *testing.T) { - t.Run("removes known peer", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.NoError(t, err) - - err = pool.Remove("peer-1") - assert.NoError(t, err) - - assert.NotContains(t, pool.Peers(), "peer-1") - }) - - t.Run("unknown id returns ErrPeerNotFound", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - err := pool.Remove("unknown") - assert.ErrorIs(t, err, ErrPeerNotFound) - }) - - t.Run("closed pool returns ErrPoolClosed", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - err := pool.Remove("peer-1") - assert.ErrorIs(t, err, ErrPoolClosed) - }) - - t.Run("no event emitted on remove", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.NoError(t, err) - - err = pool.Remove("peer-1") - assert.NoError(t, err) - - honeybeetest.Never(t, func() bool { - select { - case e := <-pool.Events(): - fmt.Printf("got event: %v", e) - return true - default: - return false - } - }, "no event expected on remove") - }) -} - -func TestPoolSend(t *testing.T) { - t.Run("data reaches socket", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, outgoing := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.NoError(t, err) - - err = pool.Send("peer-1", []byte("hello")) - assert.NoError(t, err) - - honeybeetest.ExpectWrite(t, outgoing, websocket.TextMessage, []byte("hello")) - }) - - t.Run("unknown id returns ErrPeerNotFound", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - err := pool.Send("unknown", []byte("hello")) - assert.ErrorIs(t, err, ErrPeerNotFound) - }) - - t.Run("closed pool returns ErrPoolClosed", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - err := pool.Send("peer-1", []byte("hello")) - assert.ErrorIs(t, err, ErrPoolClosed) - }) -} - -func TestPoolClose(t *testing.T) { - t.Run("inbox and events channels close after pool close", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - _, ok := <-pool.Inbox() - assert.False(t, ok) - _, ok = <-pool.Events() - assert.False(t, ok) - }) - - t.Run("add after close returns ErrPoolClosed", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - err := pool.Add("peer-1", socket) - assert.ErrorIs(t, err, ErrPoolClosed) - }) - - t.Run("close is idempotent", func(t *testing.T) { - pool := setupPool(t) - pool.Close() - pool.Close() - }) -} - -func TestPoolPeers(t *testing.T) { - t.Run("reflects active peers after add", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket1, _, _ := honeybeetest.SetupTestSocket(t) - socket2, _, _ := honeybeetest.SetupTestSocket(t) - - pool.Add("peer-1", socket1) - pool.Add("peer-2", socket2) - - peers := pool.Peers() - assert.Contains(t, peers, "peer-1") - assert.Contains(t, peers, "peer-2") - }) - - t.Run("loses entry after remove", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - pool.Remove("peer-1") - - assert.NotContains(t, pool.Peers(), "peer-1") - }) - - t.Run("loses entry after peer self-disconnects", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, incoming, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - - close(incoming) - - honeybeetest.Eventually(t, func() bool { - return !slices.Contains(pool.Peers(), "peer-1") - }, "expected peer to be removed after self-disconnect") - }) -} - -func TestPoolEvents(t *testing.T) { - t.Run("EventPeerDisconnected emitted on clean close", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, incoming, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - - incoming <- honeybeetest.MockIncomingData{ - Err: &websocket.CloseError{Code: websocket.CloseNormalClosure}, - } - - expectEvent(t, pool.Events(), "peer-1", EventDisconnected) - - honeybeetest.Eventually(t, func() bool { - return !slices.Contains(pool.Peers(), "peer-1") - }, "expected peer auto-removed") - }) - - t.Run("EventPeerDropped emitted on unexpected close", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, incoming, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - - incoming <- honeybeetest.MockIncomingData{ - Err: &websocket.CloseError{Code: websocket.CloseProtocolError}, - } - - expectEvent(t, pool.Events(), "peer-1", EventDroppedClose) - - honeybeetest.Eventually(t, func() bool { - return !slices.Contains(pool.Peers(), "peer-1") - }, "expected peer auto-removed") - }) - - t.Run("EventPeerEvicted emitted on watchdog timeout", func(t *testing.T) { - config, err := NewPoolConfig( - WithWorkerConfig(&WorkerConfig{InactivityTimeout: 20 * time.Millisecond}), - ) - assert.NoError(t, err) - - pool, err := NewPool(context.Background(), "pool-1", config, nil) - assert.NoError(t, err) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - - expectEvent(t, pool.Events(), "peer-1", EventEvictedPolicy) - - honeybeetest.Eventually(t, func() bool { - return !slices.Contains(pool.Peers(), "peer-1") - }, "expected peer auto-removed") - }) - - t.Run("no event emitted on Remove", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket, _, _ := honeybeetest.SetupTestSocket(t) - pool.Add("peer-1", socket) - pool.Remove("peer-1") - - honeybeetest.Never(t, func() bool { - select { - case <-pool.Events(): - return true - default: - return false - } - }, "no event expected on Remove") - }) - - t.Run("no event emitted on Replace of old peer", func(t *testing.T) { - pool := setupPool(t) - defer pool.Close() - - socket1, _, _ := honeybeetest.SetupTestSocket(t) - socket2, _, _ := honeybeetest.SetupTestSocket(t) - - pool.Add("peer-1", socket1) - pool.Replace("peer-1", socket2) - - honeybeetest.Never(t, func() bool { - select { - case <-pool.Events(): - return true - default: - return false - } - }, "no event expected on Replace") - }) -} diff --git a/inbound/worker.go b/inbound/worker.go deleted file mode 100644 index ec7f73d..0000000 --- a/inbound/worker.go +++ /dev/null @@ -1,340 +0,0 @@ -package inbound - -import ( - "context" - "errors" - "git.wisehodl.dev/jay/go-honeybee/queue" - "git.wisehodl.dev/jay/go-honeybee/transport" - "git.wisehodl.dev/jay/go-honeybee/types" - "log/slog" - "sync" - "sync/atomic" - "time" -) - -type Worker interface { - Start(pool PoolPlugin) - Stop() - Send(data []byte) error - Stats() WorkerStats -} - -type WorkerExitKind string - -const ( - ExitDisconnected WorkerExitKind = "disconnected" - ExitUnexpectedClose WorkerExitKind = "unexpected_close" - ExitReadError WorkerExitKind = "read_error" - ExitPolicy WorkerExitKind = "policy" -) - -type WorkerStats struct { - ChanIncoming int - ChanQueue int - ChanForwarder int - BufferDepth int64 - - TotalProcessed uint64 - TotalDropped uint64 - TotalSent uint64 -} - -type DefaultWorker struct { - id string - conn *transport.Connection - - heartbeat chan struct{} - toQueue chan types.ReceivedMessage - toForwarder chan types.ReceivedMessage - - processedCount *atomic.Uint64 - droppedCount *atomic.Uint64 - outgoingCount *atomic.Uint64 - bufferDepth *atomic.Int64 - - ctx context.Context - cancel context.CancelFunc - config *WorkerConfig - logger *slog.Logger -} - -func NewWorker( - ctx context.Context, - id string, - conn *transport.Connection, - config *WorkerConfig, - logger *slog.Logger, -) (*DefaultWorker, error) { - if config == nil { - config = GetDefaultWorkerConfig() - } - if err := ValidateWorkerConfig(config); err != nil { - return nil, err - } - - wctx, cancel := context.WithCancel(ctx) - return &DefaultWorker{ - id: id, - conn: conn, - heartbeat: make(chan struct{}), - toQueue: make(chan types.ReceivedMessage, 256), - toForwarder: make(chan types.ReceivedMessage, 256), - processedCount: &atomic.Uint64{}, - droppedCount: &atomic.Uint64{}, - outgoingCount: &atomic.Uint64{}, - bufferDepth: &atomic.Int64{}, - config: config, - ctx: wctx, - cancel: cancel, - logger: logger, - }, nil -} - -func (w *DefaultWorker) Start(pool PoolPlugin) { - if w.logger != nil { - w.logger.Debug("starting") - } - - var wg sync.WaitGroup - wg.Add(5) - - go func() { - defer wg.Done() - RunReader(w.ctx, pool.OnExit, w.conn, w.toQueue, w.heartbeat, w.logger) - }() - - go func() { - defer wg.Done() - RunHeartbeatForwarder(w.ctx, w.conn, w.heartbeat, w.logger) - }() - - go func() { - defer wg.Done() - queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth) - }() - - go func() { - defer wg.Done() - RunForwarder(w.id, w.ctx, w.toForwarder, pool.Inbox, w.processedCount, pool.InboxCounter) - }() - - go func() { - defer wg.Done() - RunWatchdog(w.ctx, pool.OnExit, w.heartbeat, w.config.InactivityTimeout, w.logger) - }() - - if w.logger != nil { - w.logger.Info("started") - } - - wg.Wait() - - if w.logger != nil { - w.logger.Info("stopped") - } -} - -func (w *DefaultWorker) Stop() { - if w.logger != nil { - w.logger.Debug("shutting down") - } - w.cancel() -} - -func (w *DefaultWorker) Send(data []byte) error { - if err := w.conn.Send(data); err != nil { - return err - } - - select { - case w.heartbeat <- struct{}{}: - case <-w.ctx.Done(): - } - - w.outgoingCount.Add(1) - - return nil -} - -func (w *DefaultWorker) Stats() WorkerStats { - return WorkerStats{ - ChanIncoming: len(w.conn.Incoming()), - ChanQueue: len(w.toQueue), - ChanForwarder: len(w.toForwarder), - BufferDepth: w.bufferDepth.Load(), - - TotalProcessed: w.processedCount.Load(), - TotalDropped: w.droppedCount.Load(), - TotalSent: w.outgoingCount.Load(), - } -} - -func RunReader( - ctx context.Context, - onPeerClose OnExitFunction, - - conn *transport.Connection, - messages chan<- types.ReceivedMessage, - heartbeat chan<- struct{}, - - logger *slog.Logger, -) { - for { - select { - case <-ctx.Done(): - return - case data, ok := <-conn.Incoming(): - if !ok { - var err error - // determine exit kind - // by default, the peer dropped unexpectedly - kind := ExitUnexpectedClose - select { - // the peer-side error is sent before the connection is closed, - // so a non-blocking call here is correct - // if an error is not sent, then assume the default event kind - case err = <-conn.Errors(): - if errors.Is(err, transport.ErrPeerClosedClean) { - kind = ExitDisconnected - } - if errors.Is(err, transport.ErrPeerClosedUnexpected) { - kind = ExitUnexpectedClose - } - if errors.Is(err, transport.ErrReadError) { - kind = ExitReadError - } - default: - } - - if logger != nil { - if kind == ExitUnexpectedClose || kind == ExitReadError { - logger.Error("reader: peer dropped", "event", kind, "error", err) - } else { - logger.Info("reader: peer disconnected", "event", kind) - } - } - - onPeerClose(kind) - return - } - - messages <- types.ReceivedMessage{Data: data, ReceivedAt: time.Now()} - - select { - case heartbeat <- struct{}{}: - case <-ctx.Done(): - return - } - } - } -} - -func RunHeartbeatForwarder( - ctx context.Context, - conn *transport.Connection, - heartbeat chan<- struct{}, - logger *slog.Logger, -) { - for { - select { - case <-ctx.Done(): - return - case <-conn.Heartbeat(): - select { - case heartbeat <- struct{}{}: - if logger != nil { - logger.Debug("ping-pong heartbeat") - } - case <-ctx.Done(): - return - } - } - } -} - -func RunForwarder( - id string, - ctx context.Context, - messages <-chan types.ReceivedMessage, - inbox chan<- types.InboxMessage, - workerProcessedCount *atomic.Uint64, - poolInboxCount *atomic.Uint64, -) { - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-messages: - if !ok { - return - } - select { - case <-ctx.Done(): - return - - case inbox <- types.InboxMessage{ - ID: id, - Data: msg.Data, - ReceivedAt: msg.ReceivedAt, - }: - workerProcessedCount.Add(1) - poolInboxCount.Add(1) - } - } - } -} - -func RunWatchdog( - ctx context.Context, - onInactive OnExitFunction, - heartbeat <-chan struct{}, - timeout time.Duration, - logger *slog.Logger, -) { - // disable watchdog timeout if not configured - if timeout <= 0 { - if logger != nil { - logger.Debug("watchdog: disabled") - } - // drain heartbeats - // wait for cancel and exit - for { - select { - case <-heartbeat: - case <-ctx.Done(): - return - } - } - } - - if logger != nil { - logger.Debug("watchdog: enabled", "timeout", timeout) - } - - timer := time.NewTimer(timeout) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-heartbeat: - // drain the timer channel and reset - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - timer.Reset(timeout) - // timer completed - case <-timer.C: - // signal peer is inactive - if logger != nil { - logger.Info("watchdog: no activity observed") - } - onInactive(ExitPolicy) - return - } - } -} diff --git a/inbound/worker_forwarder_test.go b/inbound/worker_forwarder_test.go deleted file mode 100644 index c6f8272..0000000 --- a/inbound/worker_forwarder_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package inbound - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/types" - "sync/atomic" - "testing" - "time" -) - -func TestRunForwarder(t *testing.T) { - t.Run("message passes through to inbox", func(t *testing.T) { - id := "wss://test" - messages := make(chan types.ReceivedMessage, 1) - inbox := make(chan types.InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go RunForwarder(id, ctx, messages, inbox, &atomic.Uint64{}, &atomic.Uint64{}) - - messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} - - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-inbox: - return string(msg.Data) == "hello" && msg.ID == "wss://test" - default: - return false - } - }, "expected message") - }) -} diff --git a/inbound/worker_reader_test.go b/inbound/worker_reader_test.go deleted file mode 100644 index 6c0b860..0000000 --- a/inbound/worker_reader_test.go +++ /dev/null @@ -1,213 +0,0 @@ -package inbound - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/transport" - "git.wisehodl.dev/jay/go-honeybee/types" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" - "io" - "sync/atomic" - "testing" - "time" -) - -func TestRunReader(t *testing.T) { - t.Run("message forwarded with correct data and non-zero receivedAt", func(t *testing.T) { - conn, _, incoming, _ := setupTestConnection(t) - defer conn.Close() - - messages := make(chan types.ReceivedMessage, 1) - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat, nil) - - before := time.Now() - incoming <- honeybeetest.MockIncomingData{MsgType: websocket.TextMessage, Data: []byte("hello")} - - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-messages: - return string(msg.Data) == "hello" && msg.ReceivedAt.After(before) - default: - return false - } - }, "expected message") - }) - - t.Run("heartbeat sent per forwarded message", func(t *testing.T) { - conn, _, incoming, _ := setupTestConnection(t) - defer conn.Close() - - messages := make(chan types.ReceivedMessage, 10) - heartbeat := make(chan struct{}, 10) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - count := atomic.Int32{} - go func() { - for range heartbeat { - count.Add(1) - } - }() - go func() { - for range messages { - } - }() - go RunReader(ctx, func(WorkerExitKind) {}, conn, messages, heartbeat, nil) - - const n = 3 - for i := 0; i < n; i++ { - incoming <- honeybeetest.MockIncomingData{MsgType: websocket.TextMessage, Data: []byte("msg")} - } - - honeybeetest.Eventually(t, func() bool { - return count.Load() == n - }, "expected heartbeats") - }) - - t.Run("clean close calls onPeerClose with ExitCleanDisconnect", func(t *testing.T) { - mock := honeybeetest.NewMockSocket() - mock.CloseFunc = func() error { - mock.Once.Do(func() { close(mock.Closed) }) - return nil - } - mock.ReadMessageFunc = func() (int, []byte, error) { - return 0, nil, &websocket.CloseError{Code: websocket.CloseNormalClosure} - } - - conn, err := transport.NewConnectionFromSocket(mock, nil, nil) - assert.NoError(t, err) - - messages := make(chan types.ReceivedMessage, 1) - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var gotKind WorkerExitKind - done := make(chan struct{}) - go RunReader(ctx, func(kind WorkerExitKind) { - gotKind = kind - close(done) - }, conn, messages, heartbeat, nil) - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected onPeerClose") - - assert.Equal(t, ExitDisconnected, gotKind) - }) - - t.Run("unexpected close calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) { - mock := honeybeetest.NewMockSocket() - mock.CloseFunc = func() error { - mock.Once.Do(func() { close(mock.Closed) }) - return nil - } - mock.ReadMessageFunc = func() (int, []byte, error) { - return 0, nil, &websocket.CloseError{Code: websocket.CloseProtocolError} - } - - conn, err := transport.NewConnectionFromSocket(mock, nil, nil) - assert.NoError(t, err) - - messages := make(chan types.ReceivedMessage, 1) - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var gotKind WorkerExitKind - done := make(chan struct{}) - go RunReader(ctx, func(kind WorkerExitKind) { - gotKind = kind - close(done) - }, conn, messages, heartbeat, nil) - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected onPeerClose") - - assert.Equal(t, ExitUnexpectedClose, gotKind) - }) - - t.Run("read error calls onPeerClose with ExitUnexpectedDrop", func(t *testing.T) { - mock := honeybeetest.NewMockSocket() - mock.CloseFunc = func() error { - mock.Once.Do(func() { close(mock.Closed) }) - return nil - } - mock.ReadMessageFunc = func() (int, []byte, error) { - return 0, nil, io.EOF - } - - conn, err := transport.NewConnectionFromSocket(mock, nil, nil) - assert.NoError(t, err) - - messages := make(chan types.ReceivedMessage, 1) - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var gotKind WorkerExitKind - done := make(chan struct{}) - go RunReader(ctx, func(kind WorkerExitKind) { - gotKind = kind - close(done) - }, conn, messages, heartbeat, nil) - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected onPeerClose") - - assert.Equal(t, ExitReadError, gotKind) - }) - - t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) { - conn, _, _, _ := setupTestConnection(t) - defer conn.Close() - - messages := make(chan types.ReceivedMessage, 1) - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - - called := atomic.Bool{} - done := make(chan struct{}) - go func() { - RunReader(ctx, func(WorkerExitKind) { - called.Store(true) - }, conn, messages, heartbeat, nil) - close(done) - }() - - cancel() - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected RunReader to exit") - - assert.False(t, called.Load()) - }) -} diff --git a/inbound/worker_test.go b/inbound/worker_test.go deleted file mode 100644 index ee5b725..0000000 --- a/inbound/worker_test.go +++ /dev/null @@ -1,266 +0,0 @@ -package inbound - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/transport" - "git.wisehodl.dev/jay/go-honeybee/types" - "github.com/gorilla/websocket" - "github.com/stretchr/testify/assert" - "sync" - "sync/atomic" - "testing" - "time" -) - -type workerTestVars struct { - worker *DefaultWorker - conn *transport.Connection - incoming chan honeybeetest.MockIncomingData - outgoing chan honeybeetest.MockOutgoingData - pool PoolPlugin - inbox chan types.InboxMessage - events chan PoolEvent - exitKind *atomic.Value - wg *sync.WaitGroup -} - -func setupWorkerTest(t *testing.T) workerTestVars { - t.Helper() - - conn, _, incoming, outgoing := setupTestConnection(t) - - ctx, cancel := context.WithCancel(context.Background()) - var err error - worker, err := NewWorker(ctx, "peer-1", conn, nil, nil) - assert.NoError(t, err) - worker.cancel = cancel - - inbox := make(chan types.InboxMessage, 256) - events := make(chan PoolEvent, 10) - exitKind := &atomic.Value{} - - var once sync.Once - pool := PoolPlugin{ - Inbox: inbox, - Events: events, - OnExit: func(kind WorkerExitKind) { - once.Do(func() { exitKind.Store(kind) }) - }, - InboxCounter: &atomic.Uint64{}, - } - - wg := &sync.WaitGroup{} - wg.Add(1) - - return workerTestVars{ - worker: worker, - conn: conn, - incoming: incoming, - outgoing: outgoing, - pool: pool, - inbox: inbox, - events: events, - exitKind: exitKind, - wg: wg, - } -} - -func TestWorkerStart(t *testing.T) { - t.Run("socket data arrives on inbox", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - go v.worker.Start(v.pool) - - v.incoming <- honeybeetest.MockIncomingData{ - MsgType: websocket.TextMessage, - Data: []byte("hello"), - } - - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-v.inbox: - return msg.ID == "peer-1" && string(msg.Data) == "hello" - default: - return false - } - }, "expected message on inbox") - }) - - t.Run("clean peer close calls OnExit with ExitCleanDisconnect", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - go v.worker.Start(v.pool) - - v.incoming <- honeybeetest.MockIncomingData{ - Err: &websocket.CloseError{Code: websocket.CloseNormalClosure}, - } - - honeybeetest.Eventually(t, func() bool { - val := v.exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitDisconnected - }, "expected ExitCleanDisconnect") - }) - - t.Run("unexpected peer close calls OnExit with ExitUnexpectedDrop", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - go v.worker.Start(v.pool) - - v.incoming <- honeybeetest.MockIncomingData{ - Err: &websocket.CloseError{Code: websocket.CloseProtocolError}, - } - - honeybeetest.Eventually(t, func() bool { - val := v.exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitUnexpectedClose - }, "expected ExitUnexpectedDrop") - }) - - t.Run("watchdog timeout calls OnExit with ExitInactive", func(t *testing.T) { - conn, _, _, _ := setupTestConnection(t) - - ctx, cancel := context.WithCancel(context.Background()) - worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{ - InactivityTimeout: 20 * time.Millisecond, - }, nil) - assert.NoError(t, err) - worker.cancel = cancel - defer worker.Stop() - - exitKind := &atomic.Value{} - var once sync.Once - pool := PoolPlugin{ - Inbox: make(chan types.InboxMessage, 256), - Events: make(chan PoolEvent, 10), - OnExit: func(kind WorkerExitKind) { - once.Do(func() { exitKind.Store(kind) }) - }, - } - - var wg sync.WaitGroup - wg.Add(1) - go func() { - worker.Start(pool) - wg.Done() - }() - - honeybeetest.Eventually(t, func() bool { - val := exitKind.Load() - return val != nil && val.(WorkerExitKind) == ExitPolicy - }, "expected ExitInactive") - }) -} - -func TestWorkerStop(t *testing.T) { - v := setupWorkerTest(t) - - go func() { v.worker.Start(v.pool); v.wg.Done() }() - - v.worker.Stop() - - done := make(chan struct{}) - go func() { v.wg.Wait(); close(done) }() - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected wg to drain") - - // does not call onExit - assert.Nil(t, v.exitKind.Load()) -} - -func TestWorkerSend(t *testing.T) { - t.Run("Send delivers data to socket", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - go v.worker.Start(v.pool) - - err := v.worker.Send([]byte("hello")) - assert.NoError(t, err) - - honeybeetest.ExpectWrite(t, v.outgoing, websocket.TextMessage, []byte("hello")) - }) - - t.Run("Send produces heartbeats", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - count := atomic.Int32{} - go func() { - for range v.worker.heartbeat { - count.Add(1) - } - }() - - // do not start the worker, allow heartbeats to be drained manually - - for i := 0; i < 3; i++ { - err := v.worker.Send([]byte(fmt.Sprintf("msg-%d", i))) - assert.NoError(t, err) - } - - honeybeetest.Eventually(t, func() bool { - return count.Load() == 3 - }, "expected heartbeats") - }) - - t.Run("Send returns error after connection closed", func(t *testing.T) { - v := setupWorkerTest(t) - defer v.worker.Stop() - - go v.worker.Start(v.pool) - - v.conn.Close() - - honeybeetest.Eventually(t, func() bool { - return v.conn.State() == transport.StateClosed - }, "expected connection closed") - - err := v.worker.Send([]byte("hello")) - assert.Error(t, err) - }) -} - -func TestHeartbeatForwarder(t *testing.T) { - t.Run("connection level heartbeat propagates", func(t *testing.T) { - socket, _, _ := honeybeetest.SetupTestSocket(t) - var pongHandler func(string) error - socket.SetPongHandlerFunc = func(h func(string) error) { pongHandler = h } - - conn, err := transport.NewConnectionFromSocket(socket, nil, nil) - assert.NoError(t, err) - - heartbeat := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go RunHeartbeatForwarder(ctx, conn, heartbeat, nil) - - honeybeetest.Eventually(t, func() bool { - return pongHandler != nil - }, "expected Connection to register PongHandler") - - if pongHandler == nil { - t.Fatal("pong handler was never set") - } - - pongHandler("") // Trigger pong - - select { - case <-heartbeat: - case <-time.After(time.Second): - t.Fatal("pong did not propagate to worker heartbeat") - } - }) -} diff --git a/inbound/worker_watchdog_test.go b/inbound/worker_watchdog_test.go deleted file mode 100644 index 1726a93..0000000 --- a/inbound/worker_watchdog_test.go +++ /dev/null @@ -1,135 +0,0 @@ -package inbound - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "github.com/stretchr/testify/assert" - "sync/atomic" - "testing" - "time" -) - -func TestRunWatchdog(t *testing.T) { - t.Run("heartbeat resets timer, onInactive not called", func(t *testing.T) { - heartbeat := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - called := atomic.Bool{} - go RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 200*time.Millisecond, nil) - - for i := 0; i < 5; i++ { - time.Sleep(20 * time.Millisecond) - heartbeat <- struct{}{} - } - - honeybeetest.Never(t, func() bool { - return called.Load() - }, "unexpected onInactive call") - }) - - t.Run("timeout fires onInactive exactly once", func(t *testing.T) { - heartbeat := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var gotKind WorkerExitKind - count := atomic.Int32{} - done := make(chan struct{}) - go RunWatchdog(ctx, func(kind WorkerExitKind) { - count.Add(1) - gotKind = kind - close(done) - }, heartbeat, 20*time.Millisecond, nil) - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected onInactive") - - assert.Equal(t, int32(1), count.Load()) - assert.Equal(t, ExitPolicy, gotKind) - }) - - t.Run("ctx.Done exits without calling onInactive", func(t *testing.T) { - heartbeat := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - - called := atomic.Bool{} - done := make(chan struct{}) - go func() { - RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 20*time.Second, nil) - close(done) - }() - - cancel() - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected RunWatchdog to exit") - - assert.False(t, called.Load()) - }) - - t.Run("zero timeout exits on ctx.Done without firing onInactive", func(t *testing.T) { - heartbeat := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - - called := atomic.Bool{} - done := make(chan struct{}) - go func() { - RunWatchdog(ctx, func(WorkerExitKind) { called.Store(true) }, heartbeat, 0, nil) - close(done) - }() - - cancel() - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected RunWatchdog to exit") - - assert.False(t, called.Load()) - }) - - t.Run("disabled keepalive drains heartbeats without blocking", func(t *testing.T) { - heartbeat := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - done := make(chan struct{}) - go func() { - RunWatchdog(ctx, func(WorkerExitKind) {}, heartbeat, 0, nil) - close(done) - }() - - // these must not block - for i := 0; i < 5; i++ { - heartbeat <- struct{}{} - } - - cancel() - - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected RunWatchdog to exit") - }) -} diff --git a/outbound/pool.go b/pool.go similarity index 99% rename from outbound/pool.go rename to pool.go index b47f3fb..c4bc77c 100644 --- a/outbound/pool.go +++ b/pool.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/pool_test.go b/pool_test.go similarity index 99% rename from outbound/pool_test.go rename to pool_test.go index 75dcd8f..e4a9de6 100644 --- a/outbound/pool_test.go +++ b/pool_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/queue/queue.go b/queue/queue.go deleted file mode 100644 index 43b4b0f..0000000 --- a/queue/queue.go +++ /dev/null @@ -1,131 +0,0 @@ -package queue - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/types" - "sync/atomic" -) - -func RunQueue( - id string, - ctx context.Context, - in <-chan types.ReceivedMessage, - out chan<- types.ReceivedMessage, - maxQueueSize int, - droppedCount *atomic.Uint64, - bufferDepth *atomic.Int64, -) { - var next types.ReceivedMessage - var queue messageQueue - if maxQueueSize > 0 { - queue = newBoundedRing(maxQueueSize) - } else { - queue = newUnboundedRing(1024) - } - - for { - var outOrNil chan<- types.ReceivedMessage - - // enable out channel if queue is populated - if queue.len() > 0 { - outOrNil = out - next = queue.peek() - } - - select { - case <-ctx.Done(): - return - case msg := <-in: - // limit queue size if maximum is configured - if maxQueueSize > 0 && queue.len() >= maxQueueSize { - // drop oldest message - _ = queue.pop() - droppedCount.Add(1) - bufferDepth.Add(-1) - } - // add new message - queue.push(msg) - bufferDepth.Add(1) - // send next message to out channel - case outOrNil <- next: - _ = queue.pop() - bufferDepth.Add(-1) - } - } -} - -// Ring Buffer Queue - -type messageQueue interface { - push(types.ReceivedMessage) - pop() types.ReceivedMessage - peek() types.ReceivedMessage - len() int -} - -type ring struct { - buf []types.ReceivedMessage - head int - size int -} - -func (r *ring) len() int { return r.size } - -func (r *ring) pop() types.ReceivedMessage { - m := r.buf[r.head] - var zero types.ReceivedMessage - r.buf[r.head] = zero // release reference for GC - r.head = (r.head + 1) % len(r.buf) - r.size-- - return m -} - -func (r *ring) peek() types.ReceivedMessage { - m := r.buf[r.head] - return m -} - -// shared write at logical tail; caller guarantees space exists -func (r *ring) writeTail(m types.ReceivedMessage) { - r.buf[(r.head+r.size)%len(r.buf)] = m - r.size++ -} - -// Bounded ring - -type boundedRing struct{ ring } - -func newBoundedRing(cap int) *boundedRing { - return &boundedRing{ring{buf: make([]types.ReceivedMessage, cap)}} -} - -func (b *boundedRing) push(m types.ReceivedMessage) { - if b.size == len(b.buf) { - b.buf[b.head] = m - b.head = (b.head + 1) % len(b.buf) - return - } - b.writeTail(m) -} - -// Unbounded Ring - -type unboundedRing struct{ ring } - -func newUnboundedRing(initialCap int) *unboundedRing { - if initialCap < 1 { - initialCap = 1 - } - return &unboundedRing{ring{buf: make([]types.ReceivedMessage, initialCap)}} -} - -func (u *unboundedRing) push(m types.ReceivedMessage) { - if u.size == len(u.buf) { - bigger := make([]types.ReceivedMessage, len(u.buf)*2) - n := copy(bigger, u.buf[u.head:]) - copy(bigger[n:], u.buf[:u.head]) - u.buf = bigger - u.head = 0 - } - u.writeTail(m) -} diff --git a/queue/queue_test.go b/queue/queue_test.go deleted file mode 100644 index 5711ebe..0000000 --- a/queue/queue_test.go +++ /dev/null @@ -1,105 +0,0 @@ -package queue - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/types" - "github.com/stretchr/testify/assert" - "sync/atomic" - "testing" - "time" -) - -func TestRunQueue(t *testing.T) { - t.Run("message passes through to inbox", func(t *testing.T) { - id := "wss://test" - inChan := make(chan types.ReceivedMessage, 1) - outChan := make(chan types.ReceivedMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{}) - - inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} - - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-outChan: - return string(msg.Data) == "hello" - default: - return false - } - }, "expected message") - }) - - t.Run("oldest message dropped when queue is full", func(t *testing.T) { - id := "wss://test" - inChan := make(chan types.ReceivedMessage, 1) - outChan := make(chan types.ReceivedMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - gate := make(chan struct{}) - gatedInbox := make(chan types.ReceivedMessage) - - // gate the inbox from receiving messages until the gate is opened - go func() { - <-gate - for msg := range gatedInbox { - outChan <- msg - } - }() - - go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}, &atomic.Int64{}) - - // send three messages while the gated inbox is blocked - inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()} - inChan <- types.ReceivedMessage{Data: []byte("second"), ReceivedAt: time.Now()} - inChan <- types.ReceivedMessage{Data: []byte("third"), ReceivedAt: time.Now()} - - // allow time for the first message to be dropped - time.Sleep(20 * time.Millisecond) - - // close the gate, draining messages into the inbox - close(gate) - - // receive messages from the inbox - var received []string - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-outChan: - received = append(received, string(msg.Data)) - default: - } - return len(received) == 2 - }, "expected messages") - - // first message was dropped - assert.Equal(t, []string{"second", "third"}, received) - - }) - - t.Run("exits on context cancellation", func(t *testing.T) { - id := "wss://test" - inChan := make(chan types.ReceivedMessage, 1) - outChan := make(chan types.ReceivedMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - done := make(chan struct{}) - go func() { - RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{}) - close(done) - }() - - cancel() - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected done signal") - }) -} diff --git a/outbound/worker.go b/worker.go similarity index 99% rename from outbound/worker.go rename to worker.go index aca3fc0..bfb344f 100644 --- a/outbound/worker.go +++ b/worker.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_dialer_test.go b/worker_dialer_test.go similarity index 99% rename from outbound/worker_dialer_test.go rename to worker_dialer_test.go index 3da774a..1b26a91 100644 --- a/outbound/worker_dialer_test.go +++ b/worker_dialer_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_keepalive_test.go b/worker_keepalive_test.go similarity index 99% rename from outbound/worker_keepalive_test.go rename to worker_keepalive_test.go index f311558..54985da 100644 --- a/outbound/worker_keepalive_test.go +++ b/worker_keepalive_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_send_test.go b/worker_send_test.go similarity index 99% rename from outbound/worker_send_test.go rename to worker_send_test.go index 6e06ccb..4785a68 100644 --- a/outbound/worker_send_test.go +++ b/worker_send_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_session_inner_test.go b/worker_session_inner_test.go similarity index 99% rename from outbound/worker_session_inner_test.go rename to worker_session_inner_test.go index d3777d5..7ed810b 100644 --- a/outbound/worker_session_inner_test.go +++ b/worker_session_inner_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_session_test.go b/worker_session_test.go similarity index 99% rename from outbound/worker_session_test.go rename to worker_session_test.go index 298312d..147cbdd 100644 --- a/outbound/worker_session_test.go +++ b/worker_session_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context" diff --git a/outbound/worker_start_test.go b/worker_start_test.go similarity index 99% rename from outbound/worker_start_test.go rename to worker_start_test.go index 2f79cab..4157f7e 100644 --- a/outbound/worker_start_test.go +++ b/worker_start_test.go @@ -1,4 +1,4 @@ -package outbound +package honeybee import ( "context"