Add slog attributes at pool, worker, and connection levels.

This commit is contained in:
Jay
2026-04-23 17:58:40 -04:00
parent 91717457dd
commit 727dc18b57
17 changed files with 488 additions and 248 deletions
+4 -4
View File
@@ -97,8 +97,8 @@ var (
// Outbound Pool constructors // Outbound Pool constructors
func NewOutboundPool(ctx context.Context, config *OutboundPoolConfig, logger *slog.Logger) (*OutboundPool, error) { func NewOutboundPool(ctx context.Context, id string, config *OutboundPoolConfig, handler slog.Handler) (*OutboundPool, error) {
return outbound.NewPool(ctx, config, logger) return outbound.NewPool(ctx, id, config, handler)
} }
func NewOutboundPoolConfig(opts ...OutboundPoolOption) (*OutboundPoolConfig, error) { func NewOutboundPoolConfig(opts ...OutboundPoolOption) (*OutboundPoolConfig, error) {
@@ -129,8 +129,8 @@ var (
// Inbound Pool constructors // Inbound Pool constructors
func NewInboundPool(ctx context.Context, config *InboundPoolConfig, logger *slog.Logger) (*InboundPool, error) { func NewInboundPool(ctx context.Context, id string, config *InboundPoolConfig, handler slog.Handler) (*InboundPool, error) {
return inbound.NewPool(ctx, config, logger) return inbound.NewPool(ctx, id, config, handler)
} }
func NewInboundPoolConfig(opts ...InboundPoolOption) (*InboundPoolConfig, error) { func NewInboundPoolConfig(opts ...InboundPoolOption) (*InboundPoolConfig, error) {
+114
View File
@@ -4,6 +4,8 @@ import (
"bytes" "bytes"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"io" "io"
"log/slog"
"strings"
"testing" "testing"
"time" "time"
) )
@@ -29,6 +31,12 @@ type MockOutgoingData struct {
Data []byte Data []byte
} }
type ExpectedLog struct {
Level slog.Level
Msg string
Attrs map[string]any
}
// Setup // Setup
func SetupTestSocket(t *testing.T) ( func SetupTestSocket(t *testing.T) (
@@ -117,3 +125,109 @@ func Never(t *testing.T, condition func() bool, msg string) {
t.Helper() t.Helper()
assert.Never(t, condition, NegativeTestTimeout, TestTick, msg) assert.Never(t, condition, NegativeTestTimeout, TestTick, msg)
} }
// Logging Helpers
func AssertLogSequence(t *testing.T, records []slog.Record, expected []ExpectedLog) {
t.Helper()
recIndex := 0
for expIndex, exp := range expected {
found := false
for recIndex < len(records) {
rec := records[recIndex]
if rec.Level == exp.Level && strings.Contains(rec.Message, exp.Msg) {
allAttrsMatch := true
for key, expectedValue := range exp.Attrs {
if !AssertAttributePresent(t, rec, key, expectedValue) {
allAttrsMatch = false
break
}
}
if allAttrsMatch {
found = true
recIndex++
break
}
}
recIndex++
}
if !found {
t.Fatalf(
"expected log not found: index=%d level=%v msg=%q attrs=%v",
expIndex, exp.Level, exp.Msg, exp.Attrs,
)
}
}
}
func FindLogRecord(records []slog.Record, level slog.Level, msgSnippet string) *slog.Record {
for i := range records {
if records[i].Level == level && strings.Contains(records[i].Message, msgSnippet) {
return &records[i]
}
}
return nil
}
func AssertAttributePresent(t *testing.T, record slog.Record, key string, expectedValue any) bool {
t.Helper()
var found bool
var actualValue any
record.Attrs(func(attr slog.Attr) bool {
if attr.Key == key {
found = true
actualValue = attr.Value.Any()
return false
}
return true
})
if !found {
t.Fatalf("attribute %q not found in log record", key)
return false
}
if !logValuesEqual(actualValue, expectedValue) {
t.Errorf("attribute %q: expected=%v actual=%v", key, expectedValue, actualValue)
return false
}
return true
}
func logValuesEqual(a, b any) bool {
if a == b {
return true
}
aInt, aOk := toInt64(a)
bInt, bOk := toInt64(b)
if aOk && bOk {
return aInt == bInt
}
return false
}
func toInt64(v any) (int64, bool) {
switch val := v.(type) {
case int:
return int64(val), true
case int64:
return val, true
case int32:
return int64(val), true
case int16:
return int64(val), true
case int8:
return int64(val), true
default:
return 0, false
}
}
+16 -7
View File
@@ -79,20 +79,24 @@ func (m *MockSocket) SetCloseHandler(h func(code int, text string) error) {
// Logging mocks // Logging mocks
type MockSlogHandler struct { type MockSlogHandler struct {
records []slog.Record records *[]slog.Record
attrs []slog.Attr
mu sync.RWMutex mu sync.RWMutex
} }
func NewMockSlogHandler() *MockSlogHandler { func NewMockSlogHandler() *MockSlogHandler {
records := make([]slog.Record, 0)
return &MockSlogHandler{ return &MockSlogHandler{
records: make([]slog.Record, 0), records: &records,
attrs: make([]slog.Attr, 0),
} }
} }
func (m *MockSlogHandler) Handle(ctx context.Context, record slog.Record) error { func (m *MockSlogHandler) Handle(ctx context.Context, record slog.Record) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.records = append(m.records, record) record.AddAttrs(m.attrs...)
*m.records = append(*m.records, record)
return nil return nil
} }
@@ -101,7 +105,12 @@ func (m *MockSlogHandler) Enabled(ctx context.Context, level slog.Level) bool {
} }
func (m *MockSlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { func (m *MockSlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return m m.mu.RLock()
defer m.mu.RUnlock()
return &MockSlogHandler{
records: m.records, // shared records slice
attrs: append(m.attrs, attrs...),
}
} }
func (m *MockSlogHandler) WithGroup(name string) slog.Handler { func (m *MockSlogHandler) WithGroup(name string) slog.Handler {
@@ -111,13 +120,13 @@ func (m *MockSlogHandler) WithGroup(name string) slog.Handler {
func (m *MockSlogHandler) GetRecords() []slog.Record { func (m *MockSlogHandler) GetRecords() []slog.Record {
m.mu.RLock() m.mu.RLock()
defer m.mu.RUnlock() defer m.mu.RUnlock()
result := make([]slog.Record, len(m.records)) result := make([]slog.Record, len(*m.records))
copy(result, m.records) copy(result, *m.records)
return result return result
} }
func (m *MockSlogHandler) Clear() { func (m *MockSlogHandler) Clear() {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.records = make([]slog.Record, 0) *m.records = make([]slog.Record, 0)
} }
+2
View File
@@ -4,6 +4,7 @@ package inbound
import ( import (
"context" "context"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"log/slog"
"time" "time"
) )
@@ -96,6 +97,7 @@ type WorkerFactory func(
id string, id string,
conn *transport.Connection, conn *transport.Connection,
config *WorkerConfig, config *WorkerConfig,
logger *slog.Logger,
) (Worker, error) ) (Worker, error)
type PoolConfig struct { type PoolConfig struct {
+1
View File
@@ -5,6 +5,7 @@ import "errors"
var ( var (
// Pool errors // Pool errors
PoolError = errors.New("pool error") PoolError = errors.New("pool error")
ErrInvalidPoolID = errors.New("pool id cannot be empty")
ErrPoolClosed = errors.New("pool is closed") ErrPoolClosed = errors.New("pool is closed")
ErrPeerNotFound = errors.New("peer not found") ErrPeerNotFound = errors.New("peer not found")
ErrPeerExists = errors.New("peer already exists") ErrPeerExists = errors.New("peer already exists")
+32 -11
View File
@@ -3,6 +3,7 @@ package inbound
import ( import (
"context" "context"
"fmt" "fmt"
"git.wisehodl.dev/jay/go-honeybee/logging"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"log/slog" "log/slog"
@@ -43,8 +44,8 @@ type PoolPlugin struct {
Inbox chan<- InboxMessage Inbox chan<- InboxMessage
Events chan<- PoolEvent Events chan<- PoolEvent
Errors chan<- error Errors chan<- error
Logger *slog.Logger
OnExit OnExitFunction OnExit OnExitFunction
Handler slog.Handler
} }
// Pool // Pool
@@ -60,12 +61,15 @@ type Pool struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
id string
peers map[string]*Peer peers map[string]*Peer
inbox chan InboxMessage inbox chan InboxMessage
events chan PoolEvent events chan PoolEvent
errors chan error errors chan error
config *PoolConfig config *PoolConfig
handler slog.Handler
logger *slog.Logger logger *slog.Logger
mu sync.RWMutex mu sync.RWMutex
@@ -73,7 +77,11 @@ type Pool struct {
closed bool closed bool
} }
func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Pool, error) { func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Handler) (*Pool, error) {
if id == "" {
return nil, ErrInvalidPoolID
}
if config == nil { if config == nil {
config = GetDefaultPoolConfig() config = GetDefaultPoolConfig()
} }
@@ -82,13 +90,15 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Poo
// The factory function should be non-blocking or else Connect() may cause // The factory function should be non-blocking or else Connect() may cause
// deadlocks. // deadlocks.
if config.WorkerFactory == nil { if config.WorkerFactory == nil {
// TODO: Construct worker logger
config.WorkerFactory = func( config.WorkerFactory = func(
ctx context.Context, ctx context.Context,
id string, id string,
conn *transport.Connection, conn *transport.Connection,
config *WorkerConfig, config *WorkerConfig,
logger *slog.Logger,
) (Worker, error) { ) (Worker, error) {
return NewWorker(ctx, id, conn, config) return NewWorker(ctx, id, conn, config, logger)
} }
} }
@@ -98,14 +108,21 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Poo
pctx, cancel := context.WithCancel(ctx) pctx, cancel := context.WithCancel(ctx)
var logger *slog.Logger
if handler != nil {
logger = logging.NewInboundPoolLogger(handler, id)
}
return &Pool{ return &Pool{
ctx: pctx, ctx: pctx,
cancel: cancel, cancel: cancel,
id: id,
peers: make(map[string]*Peer), peers: make(map[string]*Peer),
inbox: make(chan InboxMessage, config.InboxBufferSize), inbox: make(chan InboxMessage, config.InboxBufferSize),
events: make(chan PoolEvent, config.EventsBufferSize), events: make(chan PoolEvent, config.EventsBufferSize),
errors: make(chan error, config.ErrorsBufferSize), errors: make(chan error, config.ErrorsBufferSize),
config: config, config: config,
handler: handler,
logger: logger, logger: logger,
}, nil }, nil
} }
@@ -231,15 +248,24 @@ func (p *Pool) Send(id string, data []byte) error {
// addLocked constructs and registers a peer. Caller must hold p.mu write lock. // addLocked constructs and registers a peer. Caller must hold p.mu write lock.
func (p *Pool) addLocked(id string, socket types.Socket) error { func (p *Pool) addLocked(id string, socket types.Socket) error {
var logger *slog.Logger
if p.handler != nil {
logger = logging.NewConnectionLogger(p.handler, p.id, id)
}
conn, err := transport.NewConnectionFromSocket( conn, err := transport.NewConnectionFromSocket(
socket, p.config.ConnectionConfig, p.logger) socket, p.config.ConnectionConfig, logger)
if err != nil { if err != nil {
return err return err
} }
// The worker factory must be non-blocking to avoid deadlocks // The worker factory must be non-blocking to avoid deadlocks
wctx, cancel := context.WithCancel(p.ctx) wctx, cancel := context.WithCancel(p.ctx)
worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig) if p.handler != nil {
logger = logging.NewInboundWorkerLogger(p.handler, p.id, id)
}
worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger)
if err != nil { if err != nil {
cancel() cancel()
conn.Close() conn.Close()
@@ -263,17 +289,12 @@ func (p *Pool) addLocked(id string, socket types.Socket) error {
}) })
} }
var logger *slog.Logger
if p.logger != nil {
logger = p.logger.With("id", id)
}
pool := PoolPlugin{ pool := PoolPlugin{
Inbox: p.inbox, Inbox: p.inbox,
Events: p.events, Events: p.events,
Errors: p.errors, Errors: p.errors,
Logger: logger,
OnExit: onExit, OnExit: onExit,
Handler: p.handler,
} }
peer := &Peer{ peer := &Peer{
+7 -2
View File
@@ -15,7 +15,7 @@ import (
func setupPool(t *testing.T) *Pool { func setupPool(t *testing.T) *Pool {
t.Helper() t.Helper()
pool, err := NewPool(context.Background(), nil, nil) pool, err := NewPool(context.Background(), "pool-1", nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
return pool return pool
} }
@@ -39,6 +39,11 @@ func expectEvent(
// Tests // Tests
func TestPoolID(t *testing.T) {
_, err := NewPool(context.Background(), "", nil, nil)
assert.ErrorIs(t, err, ErrInvalidPoolID)
}
func TestPoolAdd(t *testing.T) { func TestPoolAdd(t *testing.T) {
t.Run("successfully adds peer", func(t *testing.T) { t.Run("successfully adds peer", func(t *testing.T) {
pool := setupPool(t) pool := setupPool(t)
@@ -341,7 +346,7 @@ func TestPoolEvents(t *testing.T) {
) )
assert.NoError(t, err) assert.NoError(t, err)
pool, err := NewPool(context.Background(), config, nil) pool, err := NewPool(context.Background(), "pool-1", config, nil)
assert.NoError(t, err) assert.NoError(t, err)
defer pool.Close() defer pool.Close()
+4
View File
@@ -6,6 +6,7 @@ import (
"git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/queue"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"log/slog"
"sync" "sync"
"time" "time"
) )
@@ -31,6 +32,7 @@ type DefaultWorker struct {
config *WorkerConfig config *WorkerConfig
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
logger *slog.Logger
} }
func NewWorker( func NewWorker(
@@ -38,6 +40,7 @@ func NewWorker(
id string, id string,
conn *transport.Connection, conn *transport.Connection,
config *WorkerConfig, config *WorkerConfig,
logger *slog.Logger,
) (*DefaultWorker, error) { ) (*DefaultWorker, error) {
if config == nil { if config == nil {
config = GetDefaultWorkerConfig() config = GetDefaultWorkerConfig()
@@ -54,6 +57,7 @@ func NewWorker(
config: config, config: config,
ctx: wctx, ctx: wctx,
cancel: cancel, cancel: cancel,
logger: logger,
}, nil }, nil
} }
+2 -2
View File
@@ -32,7 +32,7 @@ func setupWorkerTest(t *testing.T) workerTestVars {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
var err error var err error
worker, err := NewWorker(ctx, "peer-1", conn, nil) worker, err := NewWorker(ctx, "peer-1", conn, nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
worker.cancel = cancel worker.cancel = cancel
@@ -125,7 +125,7 @@ func TestWorkerStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{ worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{
InactivityTimeout: 20 * time.Millisecond, InactivityTimeout: 20 * time.Millisecond,
}) }, nil)
assert.NoError(t, err) assert.NoError(t, err)
worker.cancel = cancel worker.cancel = cancel
defer worker.Stop() defer worker.Stop()
+77
View File
@@ -0,0 +1,77 @@
package logging
import (
"log/slog"
)
// Constants
const KEY_MODULE = "module"
const KEY_COMPONENT = "component"
const KEY_POOL_ID = "pool_id"
const KEY_PEER_ID = "peer_id"
const MODULE_NAME = "honeybee"
const COMPONENT_OUTBOUND_POOL = "outbound_pool"
const COMPONENT_OUTBOUND_WORKER = "outbound_worker"
const COMPONENT_INBOUND_POOL = "inbound_pool"
const COMPONENT_INBOUND_WORKER = "inbound_worker"
const COMPONENT_CONNECTION = "connection"
// Outbound loggers
func NewOutboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger {
return newLogger(handler,
KEY_MODULE, MODULE_NAME,
KEY_COMPONENT, COMPONENT_OUTBOUND_POOL,
KEY_POOL_ID, poolID,
)
}
func NewOutboundWorkerLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger {
return newLogger(handler,
KEY_MODULE, MODULE_NAME,
KEY_COMPONENT, COMPONENT_OUTBOUND_WORKER,
KEY_POOL_ID, poolID,
KEY_PEER_ID, peerID,
)
}
// Inbound loggers
func NewInboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger {
return newLogger(handler,
KEY_MODULE, MODULE_NAME,
KEY_COMPONENT, COMPONENT_INBOUND_POOL,
KEY_POOL_ID, poolID,
)
}
func NewInboundWorkerLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger {
return newLogger(handler,
KEY_MODULE, MODULE_NAME,
KEY_COMPONENT, COMPONENT_INBOUND_WORKER,
KEY_POOL_ID, poolID,
KEY_PEER_ID, peerID,
)
}
// Connection logger
func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger {
return newLogger(handler,
KEY_MODULE, MODULE_NAME,
KEY_COMPONENT, COMPONENT_CONNECTION,
KEY_POOL_ID, poolID,
KEY_PEER_ID, peerID,
)
}
// Helpers
func newLogger(handler slog.Handler, attrs ...any) *slog.Logger {
return slog.New(handler).With(attrs...)
}
+84
View File
@@ -0,0 +1,84 @@
package logging
import (
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
// "github.com/stretchr/testify/assert"
"log/slog"
"testing"
)
// Helpers
func log(level slog.Level, msg string, attrs map[string]any) honeybeetest.ExpectedLog {
return honeybeetest.ExpectedLog{Level: level, Msg: msg, Attrs: attrs}
}
// Tests
func TestOutboundLogger(t *testing.T) {
const POOL_ID = "pool-1"
const PEER_ID = "wss://test"
handler := honeybeetest.NewMockSlogHandler()
poolLogger := NewOutboundPoolLogger(handler, POOL_ID)
workerLogger := NewOutboundWorkerLogger(handler, POOL_ID, PEER_ID)
connLogger := NewConnectionLogger(handler, POOL_ID, PEER_ID)
poolLogger.Info("test")
workerLogger.Info("test")
connLogger.Info("test")
honeybeetest.Eventually(t, func() bool {
return len(handler.GetRecords()) == 3
}, "expected a log record")
records := handler.GetRecords()
honeybeetest.AssertAttributePresent(t, records[0], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[0], KEY_COMPONENT, COMPONENT_OUTBOUND_POOL)
honeybeetest.AssertAttributePresent(t, records[0], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[1], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[1], KEY_COMPONENT, COMPONENT_OUTBOUND_WORKER)
honeybeetest.AssertAttributePresent(t, records[1], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[1], KEY_PEER_ID, PEER_ID)
honeybeetest.AssertAttributePresent(t, records[2], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[2], KEY_COMPONENT, COMPONENT_CONNECTION)
honeybeetest.AssertAttributePresent(t, records[2], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[2], KEY_PEER_ID, PEER_ID)
}
func TestInboundLogger(t *testing.T) {
const POOL_ID = "pool-1"
const PEER_ID = "peer-1"
handler := honeybeetest.NewMockSlogHandler()
poolLogger := NewInboundPoolLogger(handler, POOL_ID)
workerLogger := NewInboundWorkerLogger(handler, POOL_ID, PEER_ID)
connLogger := NewConnectionLogger(handler, POOL_ID, PEER_ID)
poolLogger.Info("test")
workerLogger.Info("test")
connLogger.Info("test")
honeybeetest.Eventually(t, func() bool {
return len(handler.GetRecords()) == 3
}, "expected a log record")
records := handler.GetRecords()
honeybeetest.AssertAttributePresent(t, records[0], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[0], KEY_COMPONENT, COMPONENT_INBOUND_POOL)
honeybeetest.AssertAttributePresent(t, records[0], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[1], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[1], KEY_COMPONENT, COMPONENT_INBOUND_WORKER)
honeybeetest.AssertAttributePresent(t, records[1], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[1], KEY_PEER_ID, PEER_ID)
honeybeetest.AssertAttributePresent(t, records[2], KEY_MODULE, MODULE_NAME)
honeybeetest.AssertAttributePresent(t, records[2], KEY_COMPONENT, COMPONENT_CONNECTION)
honeybeetest.AssertAttributePresent(t, records[2], KEY_POOL_ID, POOL_ID)
honeybeetest.AssertAttributePresent(t, records[2], KEY_PEER_ID, PEER_ID)
}
+6 -1
View File
@@ -3,12 +3,17 @@ package outbound
import ( import (
"context" "context"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"log/slog"
"time" "time"
) )
// Types // Types
type WorkerFactory func(ctx context.Context, id string) (Worker, error) type WorkerFactory func(
ctx context.Context,
id string,
logger *slog.Logger,
) (Worker, error)
// Pool Config // Pool Config
+1
View File
@@ -10,6 +10,7 @@ var (
InvalidBufferSize = errors.New("buffer size must be greater than zero") InvalidBufferSize = errors.New("buffer size must be greater than zero")
// Pool errors // Pool errors
ErrInvalidPoolID = errors.New("pool id cannot be empty")
ErrPoolClosed = errors.New("pool is closed") ErrPoolClosed = errors.New("pool is closed")
ErrPeerNotFound = errors.New("peer not found") ErrPeerNotFound = errors.New("peer not found")
ErrPeerExists = errors.New("peer already exists") ErrPeerExists = errors.New("peer already exists")
+28 -11
View File
@@ -2,6 +2,7 @@ package outbound
import ( import (
"context" "context"
"git.wisehodl.dev/jay/go-honeybee/logging"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"log/slog" "log/slog"
@@ -30,12 +31,13 @@ type InboxMessage struct {
} }
type PoolPlugin struct { type PoolPlugin struct {
ID string
Inbox chan<- InboxMessage Inbox chan<- InboxMessage
Events chan<- PoolEvent Events chan<- PoolEvent
Errors chan<- error Errors chan<- error
Logger *slog.Logger
Dialer types.Dialer Dialer types.Dialer
ConnectionConfig *transport.ConnectionConfig ConnectionConfig *transport.ConnectionConfig
Handler slog.Handler
} }
// Pool // Pool
@@ -49,6 +51,8 @@ type Pool struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
id string
peers map[string]*Peer peers map[string]*Peer
inbox chan InboxMessage inbox chan InboxMessage
events chan PoolEvent events chan PoolEvent
@@ -56,6 +60,7 @@ type Pool struct {
dialer types.Dialer dialer types.Dialer
config *PoolConfig config *PoolConfig
handler slog.Handler
logger *slog.Logger logger *slog.Logger
mu sync.RWMutex mu sync.RWMutex
@@ -63,8 +68,12 @@ type Pool struct {
closed bool closed bool
} }
func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Handler,
) (*Pool, error) { ) (*Pool, error) {
if id == "" {
return nil, ErrInvalidPoolID
}
if config == nil { if config == nil {
config = GetDefaultPoolConfig() config = GetDefaultPoolConfig()
} }
@@ -74,8 +83,8 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger,
// deadlocks. // deadlocks.
if config.WorkerFactory == nil { if config.WorkerFactory == nil {
config.WorkerFactory = func( config.WorkerFactory = func(
ctx context.Context, id string) (Worker, error) { ctx context.Context, id string, logger *slog.Logger) (Worker, error) {
return NewWorker(ctx, id, config.WorkerConfig) return NewWorker(ctx, id, config.WorkerConfig, logger)
} }
} }
@@ -85,15 +94,22 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger,
pctx, cancel := context.WithCancel(ctx) pctx, cancel := context.WithCancel(ctx)
var logger *slog.Logger
if handler != nil {
logger = logging.NewOutboundPoolLogger(handler, id)
}
return &Pool{ return &Pool{
ctx: pctx, ctx: pctx,
cancel: cancel, cancel: cancel,
id: id,
peers: make(map[string]*Peer), peers: make(map[string]*Peer),
inbox: make(chan InboxMessage, config.InboxBufferSize), inbox: make(chan InboxMessage, config.InboxBufferSize),
events: make(chan PoolEvent, config.EventsBufferSize), events: make(chan PoolEvent, config.EventsBufferSize),
errors: make(chan error, config.ErrorsBufferSize), errors: make(chan error, config.ErrorsBufferSize),
dialer: transport.NewDialer(), dialer: transport.NewDialer(),
config: config, config: config,
handler: handler,
logger: logger, logger: logger,
}, nil }, nil
} }
@@ -168,24 +184,25 @@ func (p *Pool) Connect(id string) error {
return NewPoolError(ErrPeerExists) return NewPoolError(ErrPeerExists)
} }
var logger *slog.Logger
if p.handler != nil {
logger = logging.NewOutboundWorkerLogger(p.handler, p.id, id)
}
// The worker factory must be non-blocking to avoid deadlocks // The worker factory must be non-blocking to avoid deadlocks
worker, err := p.config.WorkerFactory(p.ctx, id) worker, err := p.config.WorkerFactory(p.ctx, id, logger)
if err != nil { if err != nil {
return err return err
} }
var logger *slog.Logger
if p.logger != nil {
logger = p.logger.With("id", id)
}
pool := PoolPlugin{ pool := PoolPlugin{
ID: p.id,
Inbox: p.inbox, Inbox: p.inbox,
Events: p.events, Events: p.events,
Errors: p.errors, Errors: p.errors,
Logger: logger,
Dialer: p.dialer, Dialer: p.dialer,
ConnectionConfig: p.config.ConnectionConfig, ConnectionConfig: p.config.ConnectionConfig,
Handler: p.handler,
} }
p.wg.Add(1) p.wg.Add(1)
+9 -4
View File
@@ -15,7 +15,7 @@ import (
func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) { func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) {
t.Helper() t.Helper()
pool, err := NewPool(context.Background(), nil, nil) pool, err := NewPool(context.Background(), "pool-1", nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
dialer := &honeybeetest.MockDialer{ dialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
@@ -45,6 +45,11 @@ func expectEvent(
// Tests // Tests
func TestPoolID(t *testing.T) {
_, err := NewPool(context.Background(), "", nil, nil)
assert.ErrorIs(t, err, ErrInvalidPoolID)
}
func TestPoolConnect(t *testing.T) { func TestPoolConnect(t *testing.T) {
t.Run("successfully adds connection", func(t *testing.T) { t.Run("successfully adds connection", func(t *testing.T) {
pool, _ := setupPool(t) pool, _ := setupPool(t)
@@ -85,7 +90,7 @@ func TestPoolConnect(t *testing.T) {
func TestPoolClose(t *testing.T) { func TestPoolClose(t *testing.T) {
t.Run("channels close after pool close", func(t *testing.T) { t.Run("channels close after pool close", func(t *testing.T) {
pool, _ := NewPool(context.Background(), nil, nil) pool, _ := NewPool(context.Background(), "pool-1", nil, nil)
pool.Close() pool.Close()
_, ok := <-pool.Inbox() _, ok := <-pool.Inbox()
assert.False(t, ok) assert.False(t, ok)
@@ -96,7 +101,7 @@ func TestPoolClose(t *testing.T) {
}) })
t.Run("connect after close returns error", func(t *testing.T) { t.Run("connect after close returns error", func(t *testing.T) {
pool, _ := NewPool(context.Background(), nil, nil) pool, _ := NewPool(context.Background(), "pool-1", nil, nil)
pool.Close() pool.Close()
err := pool.Connect("wss://test") err := pool.Connect("wss://test")
assert.ErrorIs(t, err, ErrPoolClosed) assert.ErrorIs(t, err, ErrPoolClosed)
@@ -154,7 +159,7 @@ func TestPoolSend(t *testing.T) {
}, },
} }
pool, err := NewPool(context.Background(), nil, nil) pool, err := NewPool(context.Background(), "pool-1", nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
pool.dialer = mockDialer pool.dialer = mockDialer
+11 -2
View File
@@ -2,9 +2,11 @@ package outbound
import ( import (
"context" "context"
"git.wisehodl.dev/jay/go-honeybee/logging"
"git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/queue"
"git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"log/slog"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -25,13 +27,14 @@ type DefaultWorker struct {
config *WorkerConfig config *WorkerConfig
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
logger *slog.Logger
} }
func NewWorker( func NewWorker(
ctx context.Context, ctx context.Context,
id string, id string,
config *WorkerConfig, config *WorkerConfig,
logger *slog.Logger,
) (*DefaultWorker, error) { ) (*DefaultWorker, error) {
if config == nil { if config == nil {
config = GetDefaultWorkerConfig() config = GetDefaultWorkerConfig()
@@ -47,6 +50,7 @@ func NewWorker(
heartbeat: make(chan struct{}), heartbeat: make(chan struct{}),
ctx: wctx, ctx: wctx,
cancel: wcancel, cancel: wcancel,
logger: logger,
} }
return w, nil return w, nil
@@ -333,7 +337,12 @@ func connect(
ctx context.Context, ctx context.Context,
pool PoolPlugin, pool PoolPlugin,
) (*transport.Connection, error) { ) (*transport.Connection, error) {
conn, err := transport.NewConnection(id, pool.ConnectionConfig, pool.Logger) var logger *slog.Logger
if pool.Handler != nil {
logger = logging.NewConnectionLogger(pool.Handler, pool.ID, id)
}
conn, err := transport.NewConnection(id, pool.ConnectionConfig, logger)
if err != nil { if err != nil {
return nil, err return nil, err
} }
+53 -167
View File
@@ -6,7 +6,6 @@ import (
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"strings"
"testing" "testing"
"time" "time"
@@ -18,121 +17,8 @@ import (
// Helpers // Helpers
type expectedLog struct { func log(level slog.Level, msg string, attrs map[string]any) honeybeetest.ExpectedLog {
level slog.Level return honeybeetest.ExpectedLog{Level: level, Msg: msg, Attrs: attrs}
msg string
attrs map[string]any
}
func assertLogSequence(t *testing.T, records []slog.Record, expected []expectedLog) {
t.Helper()
recIndex := 0
for expIndex, exp := range expected {
found := false
// Search forward through records
for recIndex < len(records) {
rec := records[recIndex]
if rec.Level == exp.level && strings.Contains(rec.Message, exp.msg) {
allAttrsMatch := true
for key, expectedValue := range exp.attrs {
if !assertAttributePresent(t, rec, key, expectedValue) {
allAttrsMatch = false
break
}
}
if allAttrsMatch {
found = true
recIndex++ // Consume this record
break
}
}
recIndex++ // Move to next record
}
if !found {
t.Fatalf(
"expected log not found: index=%d level=%v msg=%q attrs=%v",
expIndex, exp.level, exp.msg, exp.attrs)
}
}
}
func findLogRecord(
records []slog.Record, level slog.Level, msgSnippet string,
) *slog.Record {
for i := range records {
if records[i].Level == level && strings.Contains(records[i].Message, msgSnippet) {
return &records[i]
}
}
return nil
}
func assertAttributePresent(
t *testing.T, record slog.Record, key string, expectedValue any,
) bool {
t.Helper()
var found bool
var actualValue any
record.Attrs(func(attr slog.Attr) bool {
if attr.Key == key {
found = true
actualValue = attr.Value.Any()
return false
}
return true
})
if !found {
t.Fatalf("attribute %q not found in log record", key)
}
if !valuesEqual(actualValue, expectedValue) {
t.Errorf("attribute %q mismatch: expected=%v actual=%v", key, expectedValue, actualValue)
return false
}
return true
}
func valuesEqual(a, b any) bool {
// Direct equality
if a == b {
return true
}
// Handle int/int64 conversions
aInt, aIsInt := toInt64(a)
bInt, bIsInt := toInt64(b)
if aIsInt && bIsInt {
return aInt == bInt
}
return false
}
func toInt64(v any) (int64, bool) {
switch val := v.(type) {
case int:
return int64(val), true
case int64:
return val, true
case int32:
return int64(val), true
case int16:
return int64(val), true
case int8:
return int64(val), true
default:
return 0, false
}
} }
// Tests // Tests
@@ -159,14 +45,14 @@ func TestConnectLogging(t *testing.T) {
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
expected := []expectedLog{ expected := []honeybeetest.ExpectedLog{
{slog.LevelInfo, "connecting", map[string]any{}}, log(slog.LevelInfo, "connecting", map[string]any{}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}),
{slog.LevelInfo, "dial successful", map[string]any{"attempt": 1}}, log(slog.LevelInfo, "dial successful", map[string]any{"attempt": 1}),
{slog.LevelInfo, "connected", map[string]any{}}, log(slog.LevelInfo, "connected", map[string]any{}),
} }
assertLogSequence(t, records, expected) honeybeetest.AssertLogSequence(t, records, expected)
}) })
t.Run("max retries failure", func(t *testing.T) { t.Run("max retries failure", func(t *testing.T) {
@@ -198,18 +84,18 @@ func TestConnectLogging(t *testing.T) {
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
expected := []expectedLog{ expected := []honeybeetest.ExpectedLog{
{slog.LevelInfo, "connecting", map[string]any{}}, log(slog.LevelInfo, "connecting", map[string]any{}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}),
{slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}}, log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 2}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 2}),
{slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}}, log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 3}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 3}),
{slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}}, log(slog.LevelError, "dial failed, max retries reached", map[string]any{"attempt": 3, "error": dialErr}),
{slog.LevelError, "connection failed", map[string]any{"error": dialErr}}, log(slog.LevelError, "connection failed", map[string]any{"error": dialErr}),
} }
assertLogSequence(t, records, expected) honeybeetest.AssertLogSequence(t, records, expected)
}) })
t.Run("success after retry", func(t *testing.T) { t.Run("success after retry", func(t *testing.T) {
@@ -247,18 +133,18 @@ func TestConnectLogging(t *testing.T) {
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
expected := []expectedLog{ expected := []honeybeetest.ExpectedLog{
{slog.LevelInfo, "connecting", map[string]any{}}, log(slog.LevelInfo, "connecting", map[string]any{}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 1}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 1}),
{slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}}, log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 1, "error": dialErr}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 2}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 2}),
{slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}}, log(slog.LevelWarn, "dial failed, retrying", map[string]any{"attempt": 2, "error": dialErr}),
{slog.LevelInfo, "dialing", map[string]any{"attempt": 3}}, log(slog.LevelInfo, "dialing", map[string]any{"attempt": 3}),
{slog.LevelInfo, "dial successful", map[string]any{"attempt": 3}}, log(slog.LevelInfo, "dial successful", map[string]any{"attempt": 3}),
{slog.LevelInfo, "connected", map[string]any{}}, log(slog.LevelInfo, "connected", map[string]any{}),
} }
assertLogSequence(t, records, expected) honeybeetest.AssertLogSequence(t, records, expected)
}) })
} }
@@ -274,18 +160,18 @@ func TestCloseLogging(t *testing.T) {
conn.Close() conn.Close()
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelInfo, "closed") != nil mockHandler.GetRecords(), slog.LevelInfo, "closed") != nil
}, "expected log") }, "expected log")
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
expected := []expectedLog{ expected := []honeybeetest.ExpectedLog{
{slog.LevelInfo, "shutting down", map[string]any{}}, log(slog.LevelInfo, "shutting down", map[string]any{}),
{slog.LevelInfo, "closed", map[string]any{}}, log(slog.LevelInfo, "closed", map[string]any{}),
} }
assertLogSequence(t, records, expected) honeybeetest.AssertLogSequence(t, records, expected)
}) })
t.Run("close with socket error", func(t *testing.T) { t.Run("close with socket error", func(t *testing.T) {
@@ -304,18 +190,18 @@ func TestCloseLogging(t *testing.T) {
conn.Close() conn.Close()
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "socket close failed") != nil mockHandler.GetRecords(), slog.LevelError, "socket close failed") != nil
}, "expected log") }, "expected log")
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
expected := []expectedLog{ expected := []honeybeetest.ExpectedLog{
{slog.LevelInfo, "shutting down", map[string]any{}}, log(slog.LevelInfo, "shutting down", map[string]any{}),
{slog.LevelError, "socket close failed", map[string]any{"error": closeErr}}, log(slog.LevelError, "socket close failed", map[string]any{"error": closeErr}),
} }
assertLogSequence(t, records, expected) honeybeetest.AssertLogSequence(t, records, expected)
}) })
} }
@@ -337,14 +223,14 @@ func TestReaderLogging(t *testing.T) {
defer conn.Close() defer conn.Close()
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") != nil mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") != nil
}, "expected log") }, "expected log")
record := findLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer") record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelInfo, "connection closed by peer")
assert.NotNil(t, record) assert.NotNil(t, record)
assertAttributePresent(t, *record, "code", websocket.CloseNormalClosure) honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseNormalClosure)
assertAttributePresent(t, *record, "text", "goodbye") honeybeetest.AssertAttributePresent(t, *record, "text", "goodbye")
}) })
@@ -365,14 +251,14 @@ func TestReaderLogging(t *testing.T) {
defer conn.Close() defer conn.Close()
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "unexpected close") != nil mockHandler.GetRecords(), slog.LevelError, "unexpected close") != nil
}, "expected log") }, "expected log")
record := findLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close") record := honeybeetest.FindLogRecord(mockHandler.GetRecords(), slog.LevelError, "unexpected close")
assert.NotNil(t, record) assert.NotNil(t, record)
assertAttributePresent(t, *record, "code", websocket.CloseProtocolError) honeybeetest.AssertAttributePresent(t, *record, "code", websocket.CloseProtocolError)
assertAttributePresent(t, *record, "text", "bad protocol") honeybeetest.AssertAttributePresent(t, *record, "text", "bad protocol")
}) })
@@ -390,7 +276,7 @@ func TestReaderLogging(t *testing.T) {
defer conn.Close() defer conn.Close()
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "read error") != nil mockHandler.GetRecords(), slog.LevelError, "read error") != nil
}, "expected log") }, "expected log")
}) })
@@ -416,15 +302,15 @@ func TestWriterLogging(t *testing.T) {
assert.ErrorContains(t, err, "failed to set write deadline: deadline error") assert.ErrorContains(t, err, "failed to set write deadline: deadline error")
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "write deadline error") != nil mockHandler.GetRecords(), slog.LevelError, "write deadline error") != nil
}, "expected log") }, "expected log")
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
record := findLogRecord(records, slog.LevelError, "write deadline error") record := honeybeetest.FindLogRecord(records, slog.LevelError, "write deadline error")
assert.NotNil(t, record) assert.NotNil(t, record)
assertAttributePresent(t, *record, "error", deadlineErr) honeybeetest.AssertAttributePresent(t, *record, "error", deadlineErr)
conn.Close() conn.Close()
}) })
@@ -446,15 +332,15 @@ func TestWriterLogging(t *testing.T) {
assert.ErrorContains(t, err, "write error") assert.ErrorContains(t, err, "write error")
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
return findLogRecord( return honeybeetest.FindLogRecord(
mockHandler.GetRecords(), slog.LevelError, "write error") != nil mockHandler.GetRecords(), slog.LevelError, "write error") != nil
}, "expected log") }, "expected log")
records := mockHandler.GetRecords() records := mockHandler.GetRecords()
record := findLogRecord(records, slog.LevelError, "write error") record := honeybeetest.FindLogRecord(records, slog.LevelError, "write error")
assert.NotNil(t, record) assert.NotNil(t, record)
assertAttributePresent(t, *record, "error", writeErr) honeybeetest.AssertAttributePresent(t, *record, "error", writeErr)
conn.Close() conn.Close()
}) })