From 727dc18b5728f064443923b4a008305a5631558d Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 23 Apr 2026 17:58:40 -0400 Subject: [PATCH] Add slog attributes at pool, worker, and connection levels. --- honeybee.go | 8 +- honeybeetest/helpers.go | 114 ++++++++++++++++++++ honeybeetest/mocks.go | 23 ++-- inbound/config.go | 2 + inbound/errors.go | 9 +- inbound/pool.go | 79 +++++++++----- inbound/pool_test.go | 9 +- inbound/worker.go | 4 + inbound/worker_test.go | 4 +- logging/logging.go | 77 +++++++++++++ logging/logging_test.go | 84 +++++++++++++++ outbound/config.go | 7 +- outbound/errors.go | 7 +- outbound/pool.go | 63 +++++++---- outbound/pool_test.go | 13 ++- outbound/worker.go | 13 ++- transport/logging_test.go | 220 +++++++++----------------------------- 17 files changed, 488 insertions(+), 248 deletions(-) create mode 100644 logging/logging.go create mode 100644 logging/logging_test.go diff --git a/honeybee.go b/honeybee.go index 33bc3ea..66e76ae 100644 --- a/honeybee.go +++ b/honeybee.go @@ -97,8 +97,8 @@ var ( // Outbound Pool constructors -func NewOutboundPool(ctx context.Context, config *OutboundPoolConfig, logger *slog.Logger) (*OutboundPool, error) { - return outbound.NewPool(ctx, config, logger) +func NewOutboundPool(ctx context.Context, id string, config *OutboundPoolConfig, handler slog.Handler) (*OutboundPool, error) { + return outbound.NewPool(ctx, id, config, handler) } func NewOutboundPoolConfig(opts ...OutboundPoolOption) (*OutboundPoolConfig, error) { @@ -129,8 +129,8 @@ var ( // Inbound Pool constructors -func NewInboundPool(ctx context.Context, config *InboundPoolConfig, logger *slog.Logger) (*InboundPool, error) { - return inbound.NewPool(ctx, config, logger) +func NewInboundPool(ctx context.Context, id string, config *InboundPoolConfig, handler slog.Handler) (*InboundPool, error) { + return inbound.NewPool(ctx, id, config, handler) } func NewInboundPoolConfig(opts ...InboundPoolOption) (*InboundPoolConfig, error) { diff --git a/honeybeetest/helpers.go b/honeybeetest/helpers.go index 5ad036d..ba94fd0 100644 --- a/honeybeetest/helpers.go +++ b/honeybeetest/helpers.go @@ -4,6 +4,8 @@ import ( "bytes" "github.com/stretchr/testify/assert" "io" + "log/slog" + "strings" "testing" "time" ) @@ -29,6 +31,12 @@ type MockOutgoingData struct { Data []byte } +type ExpectedLog struct { + Level slog.Level + Msg string + Attrs map[string]any +} + // Setup func SetupTestSocket(t *testing.T) ( @@ -117,3 +125,109 @@ func Never(t *testing.T, condition func() bool, msg string) { t.Helper() assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) } + +// Logging Helpers + +func AssertLogSequence(t *testing.T, records []slog.Record, expected []ExpectedLog) { + t.Helper() + + recIndex := 0 + for expIndex, exp := range expected { + found := false + + for recIndex < len(records) { + rec := records[recIndex] + + if rec.Level == exp.Level && strings.Contains(rec.Message, exp.Msg) { + allAttrsMatch := true + for key, expectedValue := range exp.Attrs { + if !AssertAttributePresent(t, rec, key, expectedValue) { + allAttrsMatch = false + break + } + } + + if allAttrsMatch { + found = true + recIndex++ + break + } + } + + recIndex++ + } + + if !found { + t.Fatalf( + "expected log not found: index=%d level=%v msg=%q attrs=%v", + expIndex, exp.Level, exp.Msg, exp.Attrs, + ) + } + } +} + +func FindLogRecord(records []slog.Record, level slog.Level, msgSnippet string) *slog.Record { + for i := range records { + if records[i].Level == level && strings.Contains(records[i].Message, msgSnippet) { + return &records[i] + } + } + return nil +} + +func AssertAttributePresent(t *testing.T, record slog.Record, key string, expectedValue any) bool { + t.Helper() + + var found bool + var actualValue any + + record.Attrs(func(attr slog.Attr) bool { + if attr.Key == key { + found = true + actualValue = attr.Value.Any() + return false + } + return true + }) + + if !found { + t.Fatalf("attribute %q not found in log record", key) + return false + } + + if !logValuesEqual(actualValue, expectedValue) { + t.Errorf("attribute %q: expected=%v actual=%v", key, expectedValue, actualValue) + return false + } + + return true +} + +func logValuesEqual(a, b any) bool { + if a == b { + return true + } + aInt, aOk := toInt64(a) + bInt, bOk := toInt64(b) + if aOk && bOk { + return aInt == bInt + } + return false +} + +func toInt64(v any) (int64, bool) { + switch val := v.(type) { + case int: + return int64(val), true + case int64: + return val, true + case int32: + return int64(val), true + case int16: + return int64(val), true + case int8: + return int64(val), true + default: + return 0, false + } +} diff --git a/honeybeetest/mocks.go b/honeybeetest/mocks.go index dc7af91..a29420c 100644 --- a/honeybeetest/mocks.go +++ b/honeybeetest/mocks.go @@ -79,20 +79,24 @@ func (m *MockSocket) SetCloseHandler(h func(code int, text string) error) { // Logging mocks type MockSlogHandler struct { - records []slog.Record + records *[]slog.Record + attrs []slog.Attr mu sync.RWMutex } func NewMockSlogHandler() *MockSlogHandler { + records := make([]slog.Record, 0) return &MockSlogHandler{ - records: make([]slog.Record, 0), + records: &records, + attrs: make([]slog.Attr, 0), } } func (m *MockSlogHandler) Handle(ctx context.Context, record slog.Record) error { m.mu.Lock() defer m.mu.Unlock() - m.records = append(m.records, record) + record.AddAttrs(m.attrs...) + *m.records = append(*m.records, record) return nil } @@ -101,7 +105,12 @@ func (m *MockSlogHandler) Enabled(ctx context.Context, level slog.Level) bool { } func (m *MockSlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - return m + m.mu.RLock() + defer m.mu.RUnlock() + return &MockSlogHandler{ + records: m.records, // shared records slice + attrs: append(m.attrs, attrs...), + } } func (m *MockSlogHandler) WithGroup(name string) slog.Handler { @@ -111,13 +120,13 @@ func (m *MockSlogHandler) WithGroup(name string) slog.Handler { func (m *MockSlogHandler) GetRecords() []slog.Record { m.mu.RLock() defer m.mu.RUnlock() - result := make([]slog.Record, len(m.records)) - copy(result, m.records) + result := make([]slog.Record, len(*m.records)) + copy(result, *m.records) return result } func (m *MockSlogHandler) Clear() { m.mu.Lock() defer m.mu.Unlock() - m.records = make([]slog.Record, 0) + *m.records = make([]slog.Record, 0) } diff --git a/inbound/config.go b/inbound/config.go index ceb5e61..b8bf6b5 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -4,6 +4,7 @@ package inbound import ( "context" "git.wisehodl.dev/jay/go-honeybee/transport" + "log/slog" "time" ) @@ -96,6 +97,7 @@ type WorkerFactory func( id string, conn *transport.Connection, config *WorkerConfig, + logger *slog.Logger, ) (Worker, error) type PoolConfig struct { diff --git a/inbound/errors.go b/inbound/errors.go index 9acbd7b..6fbe42f 100644 --- a/inbound/errors.go +++ b/inbound/errors.go @@ -4,10 +4,11 @@ import "errors" var ( // Pool errors - PoolError = errors.New("pool error") - ErrPoolClosed = errors.New("pool is closed") - ErrPeerNotFound = errors.New("peer not found") - ErrPeerExists = errors.New("peer already exists") + PoolError = errors.New("pool error") + ErrInvalidPoolID = errors.New("pool id cannot be empty") + ErrPoolClosed = errors.New("pool is closed") + ErrPeerNotFound = errors.New("peer not found") + ErrPeerExists = errors.New("peer already exists") // Config errors InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") diff --git a/inbound/pool.go b/inbound/pool.go index 4ccefed..d18889f 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -3,6 +3,7 @@ package inbound import ( "context" "fmt" + "git.wisehodl.dev/jay/go-honeybee/logging" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" @@ -40,11 +41,11 @@ type InboxMessage struct { } type PoolPlugin struct { - Inbox chan<- InboxMessage - Events chan<- PoolEvent - Errors chan<- error - Logger *slog.Logger - OnExit OnExitFunction + Inbox chan<- InboxMessage + Events chan<- PoolEvent + Errors chan<- error + OnExit OnExitFunction + Handler slog.Handler } // Pool @@ -60,20 +61,27 @@ type Pool struct { ctx context.Context cancel context.CancelFunc + id string + peers map[string]*Peer inbox chan InboxMessage events chan PoolEvent errors chan error - config *PoolConfig - logger *slog.Logger + config *PoolConfig + handler slog.Handler + logger *slog.Logger mu sync.RWMutex wg sync.WaitGroup closed bool } -func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Pool, error) { +func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Handler) (*Pool, error) { + if id == "" { + return nil, ErrInvalidPoolID + } + if config == nil { config = GetDefaultPoolConfig() } @@ -82,13 +90,15 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Poo // The factory function should be non-blocking or else Connect() may cause // deadlocks. if config.WorkerFactory == nil { + // TODO: Construct worker logger config.WorkerFactory = func( ctx context.Context, id string, conn *transport.Connection, config *WorkerConfig, + logger *slog.Logger, ) (Worker, error) { - return NewWorker(ctx, id, conn, config) + return NewWorker(ctx, id, conn, config, logger) } } @@ -98,15 +108,22 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Poo pctx, cancel := context.WithCancel(ctx) + var logger *slog.Logger + if handler != nil { + logger = logging.NewInboundPoolLogger(handler, id) + } + return &Pool{ - ctx: pctx, - cancel: cancel, - peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), - events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), - config: config, - logger: logger, + ctx: pctx, + cancel: cancel, + id: id, + peers: make(map[string]*Peer), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), + config: config, + handler: handler, + logger: logger, }, nil } @@ -231,15 +248,24 @@ func (p *Pool) Send(id string, data []byte) error { // addLocked constructs and registers a peer. Caller must hold p.mu write lock. func (p *Pool) addLocked(id string, socket types.Socket) error { + var logger *slog.Logger + if p.handler != nil { + logger = logging.NewConnectionLogger(p.handler, p.id, id) + } + conn, err := transport.NewConnectionFromSocket( - socket, p.config.ConnectionConfig, p.logger) + socket, p.config.ConnectionConfig, logger) if err != nil { return err } // The worker factory must be non-blocking to avoid deadlocks wctx, cancel := context.WithCancel(p.ctx) - worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig) + if p.handler != nil { + logger = logging.NewInboundWorkerLogger(p.handler, p.id, id) + } + + worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger) if err != nil { cancel() conn.Close() @@ -263,17 +289,12 @@ func (p *Pool) addLocked(id string, socket types.Socket) error { }) } - var logger *slog.Logger - if p.logger != nil { - logger = p.logger.With("id", id) - } - pool := PoolPlugin{ - Inbox: p.inbox, - Events: p.events, - Errors: p.errors, - Logger: logger, - OnExit: onExit, + Inbox: p.inbox, + Events: p.events, + Errors: p.errors, + OnExit: onExit, + Handler: p.handler, } peer := &Peer{ diff --git a/inbound/pool_test.go b/inbound/pool_test.go index d485d6a..5cf473f 100644 --- a/inbound/pool_test.go +++ b/inbound/pool_test.go @@ -15,7 +15,7 @@ import ( func setupPool(t *testing.T) *Pool { t.Helper() - pool, err := NewPool(context.Background(), nil, nil) + pool, err := NewPool(context.Background(), "pool-1", nil, nil) assert.NoError(t, err) return pool } @@ -39,6 +39,11 @@ func expectEvent( // Tests +func TestPoolID(t *testing.T) { + _, err := NewPool(context.Background(), "", nil, nil) + assert.ErrorIs(t, err, ErrInvalidPoolID) +} + func TestPoolAdd(t *testing.T) { t.Run("successfully adds peer", func(t *testing.T) { pool := setupPool(t) @@ -341,7 +346,7 @@ func TestPoolEvents(t *testing.T) { ) assert.NoError(t, err) - pool, err := NewPool(context.Background(), config, nil) + pool, err := NewPool(context.Background(), "pool-1", config, nil) assert.NoError(t, err) defer pool.Close() diff --git a/inbound/worker.go b/inbound/worker.go index 2ca50c2..2e749f2 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -6,6 +6,7 @@ import ( "git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" + "log/slog" "sync" "time" ) @@ -31,6 +32,7 @@ type DefaultWorker struct { config *WorkerConfig ctx context.Context cancel context.CancelFunc + logger *slog.Logger } func NewWorker( @@ -38,6 +40,7 @@ func NewWorker( id string, conn *transport.Connection, config *WorkerConfig, + logger *slog.Logger, ) (*DefaultWorker, error) { if config == nil { config = GetDefaultWorkerConfig() @@ -54,6 +57,7 @@ func NewWorker( config: config, ctx: wctx, cancel: cancel, + logger: logger, }, nil } diff --git a/inbound/worker_test.go b/inbound/worker_test.go index 1fe4ef1..afe9683 100644 --- a/inbound/worker_test.go +++ b/inbound/worker_test.go @@ -32,7 +32,7 @@ func setupWorkerTest(t *testing.T) workerTestVars { ctx, cancel := context.WithCancel(context.Background()) var err error - worker, err := NewWorker(ctx, "peer-1", conn, nil) + worker, err := NewWorker(ctx, "peer-1", conn, nil, nil) assert.NoError(t, err) worker.cancel = cancel @@ -125,7 +125,7 @@ func TestWorkerStart(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{ InactivityTimeout: 20 * time.Millisecond, - }) + }, nil) assert.NoError(t, err) worker.cancel = cancel defer worker.Stop() diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 0000000..637699d --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,77 @@ +package logging + +import ( + "log/slog" +) + +// Constants + +const KEY_MODULE = "module" +const KEY_COMPONENT = "component" +const KEY_POOL_ID = "pool_id" +const KEY_PEER_ID = "peer_id" + +const MODULE_NAME = "honeybee" + +const COMPONENT_OUTBOUND_POOL = "outbound_pool" +const COMPONENT_OUTBOUND_WORKER = "outbound_worker" + +const COMPONENT_INBOUND_POOL = "inbound_pool" +const COMPONENT_INBOUND_WORKER = "inbound_worker" + +const COMPONENT_CONNECTION = "connection" + +// Outbound loggers + +func NewOutboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { + return newLogger(handler, + KEY_MODULE, MODULE_NAME, + KEY_COMPONENT, COMPONENT_OUTBOUND_POOL, + KEY_POOL_ID, poolID, + ) +} + +func NewOutboundWorkerLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger { + return newLogger(handler, + KEY_MODULE, MODULE_NAME, + KEY_COMPONENT, COMPONENT_OUTBOUND_WORKER, + KEY_POOL_ID, poolID, + KEY_PEER_ID, peerID, + ) +} + +// Inbound loggers + +func NewInboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { + return newLogger(handler, + KEY_MODULE, MODULE_NAME, + KEY_COMPONENT, COMPONENT_INBOUND_POOL, + KEY_POOL_ID, poolID, + ) +} + +func NewInboundWorkerLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger { + return newLogger(handler, + KEY_MODULE, MODULE_NAME, + KEY_COMPONENT, COMPONENT_INBOUND_WORKER, + KEY_POOL_ID, poolID, + KEY_PEER_ID, peerID, + ) +} + +// Connection logger + +func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger { + return newLogger(handler, + KEY_MODULE, MODULE_NAME, + KEY_COMPONENT, COMPONENT_CONNECTION, + KEY_POOL_ID, poolID, + KEY_PEER_ID, peerID, + ) +} + +// Helpers + +func newLogger(handler slog.Handler, attrs ...any) *slog.Logger { + return slog.New(handler).With(attrs...) +} diff --git a/logging/logging_test.go b/logging/logging_test.go new file mode 100644 index 0000000..dd90ea4 --- /dev/null +++ b/logging/logging_test.go @@ -0,0 +1,84 @@ +package logging + +import ( + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + // "github.com/stretchr/testify/assert" + "log/slog" + "testing" +) + +// Helpers + +func log(level slog.Level, msg string, attrs map[string]any) honeybeetest.ExpectedLog { + return honeybeetest.ExpectedLog{Level: level, Msg: msg, Attrs: attrs} +} + +// Tests + +func TestOutboundLogger(t *testing.T) { + const POOL_ID = "pool-1" + const PEER_ID = "wss://test" + + handler := honeybeetest.NewMockSlogHandler() + poolLogger := NewOutboundPoolLogger(handler, POOL_ID) + workerLogger := NewOutboundWorkerLogger(handler, POOL_ID, PEER_ID) + connLogger := NewConnectionLogger(handler, POOL_ID, PEER_ID) + + poolLogger.Info("test") + workerLogger.Info("test") + connLogger.Info("test") + + honeybeetest.Eventually(t, func() bool { + return len(handler.GetRecords()) == 3 + }, "expected a log record") + + records := handler.GetRecords() + + honeybeetest.AssertAttributePresent(t, records[0], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[0], KEY_COMPONENT, COMPONENT_OUTBOUND_POOL) + honeybeetest.AssertAttributePresent(t, records[0], KEY_POOL_ID, POOL_ID) + + honeybeetest.AssertAttributePresent(t, records[1], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[1], KEY_COMPONENT, COMPONENT_OUTBOUND_WORKER) + honeybeetest.AssertAttributePresent(t, records[1], KEY_POOL_ID, POOL_ID) + honeybeetest.AssertAttributePresent(t, records[1], KEY_PEER_ID, PEER_ID) + + honeybeetest.AssertAttributePresent(t, records[2], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[2], KEY_COMPONENT, COMPONENT_CONNECTION) + honeybeetest.AssertAttributePresent(t, records[2], KEY_POOL_ID, POOL_ID) + honeybeetest.AssertAttributePresent(t, records[2], KEY_PEER_ID, PEER_ID) +} + +func TestInboundLogger(t *testing.T) { + const POOL_ID = "pool-1" + const PEER_ID = "peer-1" + + handler := honeybeetest.NewMockSlogHandler() + poolLogger := NewInboundPoolLogger(handler, POOL_ID) + workerLogger := NewInboundWorkerLogger(handler, POOL_ID, PEER_ID) + connLogger := NewConnectionLogger(handler, POOL_ID, PEER_ID) + + poolLogger.Info("test") + workerLogger.Info("test") + connLogger.Info("test") + + honeybeetest.Eventually(t, func() bool { + return len(handler.GetRecords()) == 3 + }, "expected a log record") + + records := handler.GetRecords() + + honeybeetest.AssertAttributePresent(t, records[0], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[0], KEY_COMPONENT, COMPONENT_INBOUND_POOL) + honeybeetest.AssertAttributePresent(t, records[0], KEY_POOL_ID, POOL_ID) + + honeybeetest.AssertAttributePresent(t, records[1], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[1], KEY_COMPONENT, COMPONENT_INBOUND_WORKER) + honeybeetest.AssertAttributePresent(t, records[1], KEY_POOL_ID, POOL_ID) + honeybeetest.AssertAttributePresent(t, records[1], KEY_PEER_ID, PEER_ID) + + honeybeetest.AssertAttributePresent(t, records[2], KEY_MODULE, MODULE_NAME) + honeybeetest.AssertAttributePresent(t, records[2], KEY_COMPONENT, COMPONENT_CONNECTION) + honeybeetest.AssertAttributePresent(t, records[2], KEY_POOL_ID, POOL_ID) + honeybeetest.AssertAttributePresent(t, records[2], KEY_PEER_ID, PEER_ID) +} diff --git a/outbound/config.go b/outbound/config.go index 4aa0558..c05eff1 100644 --- a/outbound/config.go +++ b/outbound/config.go @@ -3,12 +3,17 @@ package outbound import ( "context" "git.wisehodl.dev/jay/go-honeybee/transport" + "log/slog" "time" ) // Types -type WorkerFactory func(ctx context.Context, id string) (Worker, error) +type WorkerFactory func( + ctx context.Context, + id string, + logger *slog.Logger, +) (Worker, error) // Pool Config diff --git a/outbound/errors.go b/outbound/errors.go index 45accc5..dbed788 100644 --- a/outbound/errors.go +++ b/outbound/errors.go @@ -10,9 +10,10 @@ var ( InvalidBufferSize = errors.New("buffer size must be greater than zero") // Pool errors - ErrPoolClosed = errors.New("pool is closed") - ErrPeerNotFound = errors.New("peer not found") - ErrPeerExists = errors.New("peer already exists") + ErrInvalidPoolID = errors.New("pool id cannot be empty") + ErrPoolClosed = errors.New("pool is closed") + ErrPeerNotFound = errors.New("peer not found") + ErrPeerExists = errors.New("peer already exists") // Worker errors ErrConnectionUnavailable = errors.New("connection unavailable") diff --git a/outbound/pool.go b/outbound/pool.go index d0ad314..f8415ce 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -2,6 +2,7 @@ package outbound import ( "context" + "git.wisehodl.dev/jay/go-honeybee/logging" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" @@ -30,12 +31,13 @@ type InboxMessage struct { } type PoolPlugin struct { + ID string Inbox chan<- InboxMessage Events chan<- PoolEvent Errors chan<- error - Logger *slog.Logger Dialer types.Dialer ConnectionConfig *transport.ConnectionConfig + Handler slog.Handler } // Pool @@ -49,22 +51,29 @@ type Pool struct { ctx context.Context cancel context.CancelFunc + id string + peers map[string]*Peer inbox chan InboxMessage events chan PoolEvent errors chan error - dialer types.Dialer - config *PoolConfig - logger *slog.Logger + dialer types.Dialer + config *PoolConfig + handler slog.Handler + logger *slog.Logger mu sync.RWMutex wg sync.WaitGroup closed bool } -func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, +func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Handler, ) (*Pool, error) { + if id == "" { + return nil, ErrInvalidPoolID + } + if config == nil { config = GetDefaultPoolConfig() } @@ -74,8 +83,8 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, // deadlocks. if config.WorkerFactory == nil { config.WorkerFactory = func( - ctx context.Context, id string) (Worker, error) { - return NewWorker(ctx, id, config.WorkerConfig) + ctx context.Context, id string, logger *slog.Logger) (Worker, error) { + return NewWorker(ctx, id, config.WorkerConfig, logger) } } @@ -85,16 +94,23 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, pctx, cancel := context.WithCancel(ctx) + var logger *slog.Logger + if handler != nil { + logger = logging.NewOutboundPoolLogger(handler, id) + } + return &Pool{ - ctx: pctx, - cancel: cancel, - peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), - events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), - dialer: transport.NewDialer(), - config: config, - logger: logger, + ctx: pctx, + cancel: cancel, + id: id, + peers: make(map[string]*Peer), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), + dialer: transport.NewDialer(), + config: config, + handler: handler, + logger: logger, }, nil } @@ -168,24 +184,25 @@ func (p *Pool) Connect(id string) error { return NewPoolError(ErrPeerExists) } + var logger *slog.Logger + if p.handler != nil { + logger = logging.NewOutboundWorkerLogger(p.handler, p.id, id) + } + // The worker factory must be non-blocking to avoid deadlocks - worker, err := p.config.WorkerFactory(p.ctx, id) + worker, err := p.config.WorkerFactory(p.ctx, id, logger) if err != nil { return err } - var logger *slog.Logger - if p.logger != nil { - logger = p.logger.With("id", id) - } - pool := PoolPlugin{ + ID: p.id, Inbox: p.inbox, Events: p.events, Errors: p.errors, - Logger: logger, Dialer: p.dialer, ConnectionConfig: p.config.ConnectionConfig, + Handler: p.handler, } p.wg.Add(1) diff --git a/outbound/pool_test.go b/outbound/pool_test.go index 92e8fc2..acb69ae 100644 --- a/outbound/pool_test.go +++ b/outbound/pool_test.go @@ -15,7 +15,7 @@ import ( func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) { t.Helper() - pool, err := NewPool(context.Background(), nil, nil) + pool, err := NewPool(context.Background(), "pool-1", nil, nil) assert.NoError(t, err) dialer := &honeybeetest.MockDialer{ DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { @@ -45,6 +45,11 @@ func expectEvent( // Tests +func TestPoolID(t *testing.T) { + _, err := NewPool(context.Background(), "", nil, nil) + assert.ErrorIs(t, err, ErrInvalidPoolID) +} + func TestPoolConnect(t *testing.T) { t.Run("successfully adds connection", func(t *testing.T) { pool, _ := setupPool(t) @@ -85,7 +90,7 @@ func TestPoolConnect(t *testing.T) { func TestPoolClose(t *testing.T) { t.Run("channels close after pool close", func(t *testing.T) { - pool, _ := NewPool(context.Background(), nil, nil) + pool, _ := NewPool(context.Background(), "pool-1", nil, nil) pool.Close() _, ok := <-pool.Inbox() assert.False(t, ok) @@ -96,7 +101,7 @@ func TestPoolClose(t *testing.T) { }) t.Run("connect after close returns error", func(t *testing.T) { - pool, _ := NewPool(context.Background(), nil, nil) + pool, _ := NewPool(context.Background(), "pool-1", nil, nil) pool.Close() err := pool.Connect("wss://test") assert.ErrorIs(t, err, ErrPoolClosed) @@ -154,7 +159,7 @@ func TestPoolSend(t *testing.T) { }, } - pool, err := NewPool(context.Background(), nil, nil) + pool, err := NewPool(context.Background(), "pool-1", nil, nil) assert.NoError(t, err) pool.dialer = mockDialer diff --git a/outbound/worker.go b/outbound/worker.go index 57fce15..8ae8c10 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -2,9 +2,11 @@ package outbound import ( "context" + "git.wisehodl.dev/jay/go-honeybee/logging" "git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" + "log/slog" "sync" "sync/atomic" "time" @@ -25,13 +27,14 @@ type DefaultWorker struct { config *WorkerConfig ctx context.Context cancel context.CancelFunc + logger *slog.Logger } func NewWorker( ctx context.Context, id string, config *WorkerConfig, - + logger *slog.Logger, ) (*DefaultWorker, error) { if config == nil { config = GetDefaultWorkerConfig() @@ -47,6 +50,7 @@ func NewWorker( heartbeat: make(chan struct{}), ctx: wctx, cancel: wcancel, + logger: logger, } return w, nil @@ -333,7 +337,12 @@ func connect( ctx context.Context, pool PoolPlugin, ) (*transport.Connection, error) { - conn, err := transport.NewConnection(id, pool.ConnectionConfig, pool.Logger) + var logger *slog.Logger + if pool.Handler != nil { + logger = logging.NewConnectionLogger(pool.Handler, pool.ID, id) + } + + conn, err := transport.NewConnection(id, pool.ConnectionConfig, logger) if err != nil { return nil, err } diff --git a/transport/logging_test.go b/transport/logging_test.go index fecdf1e..6129b43 100644 --- a/transport/logging_test.go +++ b/transport/logging_test.go @@ -6,7 +6,6 @@ import ( "io" "log/slog" "net/http" - "strings" "testing" "time" @@ -18,121 +17,8 @@ import ( // Helpers -type expectedLog struct { - level slog.Level - msg string - attrs map[string]any -} - -func assertLogSequence(t *testing.T, records []slog.Record, expected []expectedLog) { - t.Helper() - - recIndex := 0 - for expIndex, exp := range expected { - found := false - - // Search forward through records - for recIndex < len(records) { - rec := records[recIndex] - - if rec.Level == exp.level && strings.Contains(rec.Message, exp.msg) { - allAttrsMatch := true - for key, expectedValue := range exp.attrs { - if !assertAttributePresent(t, rec, key, expectedValue) { - allAttrsMatch = false - break - } - } - - if allAttrsMatch { - found = true - recIndex++ // Consume this record - break - } - } - - recIndex++ // Move to next record - } - - if !found { - t.Fatalf( - "expected log not found: index=%d level=%v msg=%q attrs=%v", - expIndex, exp.level, exp.msg, exp.attrs) - } - } -} - -func findLogRecord( - records []slog.Record, level slog.Level, msgSnippet string, -) *slog.Record { - for i := range records { - if records[i].Level == level && strings.Contains(records[i].Message, msgSnippet) { - return &records[i] - } - } - return nil -} - -func assertAttributePresent( - t *testing.T, record slog.Record, key string, expectedValue any, -) bool { - t.Helper() - - var found bool - var actualValue any - - record.Attrs(func(attr slog.Attr) bool { - if attr.Key == key { - found = true - actualValue = attr.Value.Any() - return false - } - return true - }) - - if !found { - t.Fatalf("attribute %q not found in log record", key) - } - - if !valuesEqual(actualValue, expectedValue) { - t.Errorf("attribute %q mismatch: expected=%v actual=%v", key, expectedValue, actualValue) - return false - } - - return true -} - -func valuesEqual(a, b any) bool { - // Direct equality - if a == b { - return true - } - - // Handle int/int64 conversions - aInt, aIsInt := toInt64(a) - bInt, bIsInt := toInt64(b) - if aIsInt && bIsInt { - return aInt == bInt - } - - return false -} - -func toInt64(v any) (int64, bool) { - switch val := v.(type) { - case int: - return int64(val), true - case int64: - return val, true - case int32: - return int64(val), true - case int16: - return int64(val), true - case int8: - return int64(val), true - default: - return 0, false - } +func log(level slog.Level, msg string, attrs map[string]any) honeybeetest.ExpectedLog { + return honeybeetest.ExpectedLog{Level: level, Msg: msg, Attrs: attrs} } // Tests @@ -159,14 +45,14 @@ func TestConnectLogging(t *testing.T) { records := mockHandler.GetRecords() - expected := []expectedLog{ - {slog.LevelInfo, "connecting", map[string]any{}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, - {slog.LevelInfo, "dial successful", map[string]any{"attempt": 1}}, - {slog.LevelInfo, "connected", map[string]any{}}, + expected := []honeybeetest.ExpectedLog{ + log(slog.LevelInfo, "connecting", map[string]any{}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}), + log(slog.LevelInfo, "dial successful", map[string]any{"attempt": 1}), + log(slog.LevelInfo, "connected", map[string]any{}), } - assertLogSequence(t, records, expected) + honeybeetest.AssertLogSequence(t, records, expected) }) t.Run("max retries failure", func(t *testing.T) { @@ -198,18 +84,18 @@ func TestConnectLogging(t *testing.T) { records := mockHandler.GetRecords() - expected := []expectedLog{ - {slog.LevelInfo, "connecting", map[string]any{}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, - {slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 2}}, - {slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 3}}, - {slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}}, - {slog.LevelError, "connection failed", map[string]any{"error": dialErr}}, + expected := []honeybeetest.ExpectedLog{ + log(slog.LevelInfo, "connecting", map[string]any{}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}), + log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 2}), + log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 3}), + log(slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}), + log(slog.LevelError, "connection failed", map[string]any{"error": dialErr}), } - assertLogSequence(t, records, expected) + honeybeetest.AssertLogSequence(t, records, expected) }) t.Run("success after retry", func(t *testing.T) { @@ -247,18 +133,18 @@ func TestConnectLogging(t *testing.T) { records := mockHandler.GetRecords() - expected := []expectedLog{ - {slog.LevelInfo, "connecting", map[string]any{}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, - {slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 2}}, - {slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}}, - {slog.LevelInfo, "dialing", map[string]any{"attempt": 3}}, - {slog.LevelInfo, "dial successful", map[string]any{"attempt": 3}}, - {slog.LevelInfo, "connected", map[string]any{}}, + expected := []honeybeetest.ExpectedLog{ + log(slog.LevelInfo, "connecting", map[string]any{}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}), + log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 2}), + log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}), + log(slog.LevelInfo, "dialing", map[string]any{"attempt": 3}), + log(slog.LevelInfo, "dial successful", map[string]any{"attempt": 3}), + log(slog.LevelInfo, "connected", map[string]any{}), } - assertLogSequence(t, records, expected) + honeybeetest.AssertLogSequence(t, records, expected) }) } @@ -274,18 +160,18 @@ func TestCloseLogging(t *testing.T) { conn.Close() honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelInfo, "closed") != nil }, "expected log") records := mockHandler.GetRecords() - expected := []expectedLog{ - {slog.LevelInfo, "shutting down", map[string]any{}}, - {slog.LevelInfo, "closed", map[string]any{}}, + expected := []honeybeetest.ExpectedLog{ + log(slog.LevelInfo, "shutting down", map[string]any{}), + log(slog.LevelInfo, "closed", map[string]any{}), } - assertLogSequence(t, records, expected) + honeybeetest.AssertLogSequence(t, records, expected) }) t.Run("close with socket error", func(t *testing.T) { @@ -304,18 +190,18 @@ func TestCloseLogging(t *testing.T) { conn.Close() honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelError, "socket close failed") != nil }, "expected log") records := mockHandler.GetRecords() - expected := []expectedLog{ - {slog.LevelInfo, "shutting down", map[string]any{}}, - {slog.LevelError, "socket close failed", map[string]any{"error": closeErr}}, + expected := []honeybeetest.ExpectedLog{ + log(slog.LevelInfo, "shutting down", map[string]any{}), + log(slog.LevelError, "socket close failed", map[string]any{"error": closeErr}), } - assertLogSequence(t, records, expected) + honeybeetest.AssertLogSequence(t, records, expected) }) } @@ -337,14 +223,14 @@ func TestReaderLogging(t *testing.T) { defer conn.Close() honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") != nil }, "expected log") - record := findLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") + record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") assert.NotNil(t, record) - assertAttributePresent(t, *record, "code", websocket.CloseNormalClosure) - assertAttributePresent(t, *record, "text", "goodbye") + honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseNormalClosure) + honeybeetest.AssertAttributePresent(t, *record, "text", "goodbye") }) @@ -365,14 +251,14 @@ func TestReaderLogging(t *testing.T) { defer conn.Close() honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelError, "unexpected close") != nil }, "expected log") - record := findLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close") + record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close") assert.NotNil(t, record) - assertAttributePresent(t, *record, "code", websocket.CloseProtocolError) - assertAttributePresent(t, *record, "text", "bad protocol") + honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseProtocolError) + honeybeetest.AssertAttributePresent(t, *record, "text", "bad protocol") }) @@ -390,7 +276,7 @@ func TestReaderLogging(t *testing.T) { defer conn.Close() honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelError, "read error") != nil }, "expected log") }) @@ -416,15 +302,15 @@ func TestWriterLogging(t *testing.T) { assert.ErrorContains(t, err, "failed to set write deadline: deadline error") honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelError, "write deadline error") != nil }, "expected log") records := mockHandler.GetRecords() - record := findLogRecord(records, slog.LevelError, "write deadline error") + record := honeybeetest.FindLogRecord(records, slog.LevelError, "write deadline error") assert.NotNil(t, record) - assertAttributePresent(t, *record, "error", deadlineErr) + honeybeetest.AssertAttributePresent(t, *record, "error", deadlineErr) conn.Close() }) @@ -446,15 +332,15 @@ func TestWriterLogging(t *testing.T) { assert.ErrorContains(t, err, "write error") honeybeetest.Eventually(t, func() bool { - return findLogRecord( + return honeybeetest.FindLogRecord( mockHandler.GetRecords(), slog.LevelError, "write error") != nil }, "expected log") records := mockHandler.GetRecords() - record := findLogRecord(records, slog.LevelError, "write error") + record := honeybeetest.FindLogRecord(records, slog.LevelError, "write error") assert.NotNil(t, record) - assertAttributePresent(t, *record, "error", writeErr) + honeybeetest.AssertAttributePresent(t, *record, "error", writeErr) conn.Close() })