From 985979633808b4786dee56c0d1d86518252e6c34 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 20 Apr 2026 06:57:48 -0400 Subject: [PATCH] started responder pool --- responderpool/config.go | 159 +++++++++++++++++++++ responderpool/config_test.go | 188 +++++++++++++++++++++++++ responderpool/errors.go | 14 ++ responderpool/helpers_test.go | 56 ++++++++ responderpool/pool.go | 24 ++++ responderpool/worker.go | 141 +++++++++++++++++++ responderpool/worker_forwarder_test.go | 1 + responderpool/worker_reader_test.go | 1 + responderpool/worker_watchdog_test.go | 1 + 9 files changed, 585 insertions(+) create mode 100644 responderpool/config.go create mode 100644 responderpool/config_test.go create mode 100644 responderpool/errors.go create mode 100644 responderpool/helpers_test.go create mode 100644 responderpool/pool.go create mode 100644 responderpool/worker.go create mode 100644 responderpool/worker_forwarder_test.go create mode 100644 responderpool/worker_reader_test.go create mode 100644 responderpool/worker_watchdog_test.go diff --git a/responderpool/config.go b/responderpool/config.go new file mode 100644 index 0000000..7ba5f71 --- /dev/null +++ b/responderpool/config.go @@ -0,0 +1,159 @@ +// responderpool/config.go +package responderpool + +import ( + "git.wisehodl.dev/jay/go-honeybee/transport" + "time" +) + +// Worker Config + +type WorkerConfig struct { + MaxQueueSize int + DeadTimeout time.Duration +} + +type WorkerOption func(*WorkerConfig) error + +func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { + conf := GetDefaultWorkerConfig() + if err := applyWorkerOptions(conf, options...); err != nil { + return nil, err + } + if err := ValidateWorkerConfig(conf); err != nil { + return nil, err + } + return conf, nil +} + +func GetDefaultWorkerConfig() *WorkerConfig { + return &WorkerConfig{ + MaxQueueSize: 0, // queue can grow indefinitely by default + DeadTimeout: 0, // eviction disabled by default + } +} + +func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { + for _, option := range options { + if err := option(config); err != nil { + return err + } + } + return nil +} + +func ValidateWorkerConfig(config *WorkerConfig) error { + if err := validateMaxQueueSize(config.MaxQueueSize); err != nil { + return err + } + if err := validateDeadTimeout(config.DeadTimeout); err != nil { + return err + } + return nil +} + +func validateMaxQueueSize(value int) error { + if value < 0 { + return InvalidMaxQueueSize + } + return nil +} + +func validateDeadTimeout(value time.Duration) error { + if value < 0 { + return InvalidDeadTimeout + } + return nil +} + +// When MaxQueueSize is zero, queue limits are disabled. +func WithMaxQueueSize(value int) WorkerOption { + return func(c *WorkerConfig) error { + if err := validateMaxQueueSize(value); err != nil { + return err + } + c.MaxQueueSize = value + return nil + } +} + +// When DeadTimeout is zero, the watchdog is disabled. +func WithDeadTimeout(value time.Duration) WorkerOption { + return func(c *WorkerConfig) error { + if err := validateDeadTimeout(value); err != nil { + return err + } + c.DeadTimeout = value + return nil + } +} + +// Pool Config + +type PoolConfig struct { + ConnectionConfig *transport.ConnectionConfig + WorkerConfig *WorkerConfig +} + +type PoolOption func(*PoolConfig) error + +func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) { + conf := GetDefaultPoolConfig() + if err := applyPoolOptions(conf, options...); err != nil { + return nil, err + } + if err := ValidatePoolConfig(conf); err != nil { + return nil, err + } + return conf, nil +} + +func GetDefaultPoolConfig() *PoolConfig { + return &PoolConfig{ + ConnectionConfig: nil, + WorkerConfig: nil, + } +} + +func applyPoolOptions(config *PoolConfig, options ...PoolOption) error { + for _, option := range options { + if err := option(config); err != nil { + return err + } + } + return nil +} + +func ValidatePoolConfig(config *PoolConfig) error { + if config.ConnectionConfig != nil { + if err := transport.ValidateConnectionConfig(config.ConnectionConfig); err != nil { + return err + } + } + if config.WorkerConfig != nil { + if err := ValidateWorkerConfig(config.WorkerConfig); err != nil { + return err + } + } + return nil +} + +func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { + return func(c *PoolConfig) error { + if err := transport.ValidateConnectionConfig(cc); err != nil { + return err + } + c.ConnectionConfig = cc + return nil + } +} + +func WithWorkerConfig(wc *WorkerConfig) PoolOption { + return func(c *PoolConfig) error { + if err := ValidateWorkerConfig(wc); err != nil { + return err + } + c.WorkerConfig = wc + return nil + } +} diff --git a/responderpool/config_test.go b/responderpool/config_test.go new file mode 100644 index 0000000..f24139a --- /dev/null +++ b/responderpool/config_test.go @@ -0,0 +1,188 @@ +// responderpool/config_test.go +package responderpool + +import ( + "git.wisehodl.dev/jay/go-honeybee/transport" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestNewWorkerConfig(t *testing.T) { + conf, err := NewWorkerConfig() + assert.NoError(t, err) + assert.Equal(t, GetDefaultWorkerConfig(), conf) +} + +func TestDefaultWorkerConfig(t *testing.T) { + conf := GetDefaultWorkerConfig() + assert.Equal(t, &WorkerConfig{ + MaxQueueSize: 0, + DeadTimeout: 0, + }, conf) +} + +func TestValidateWorkerConfig(t *testing.T) { + cases := []struct { + name string + conf WorkerConfig + wantErr error + }{ + { + name: "valid defaults", + conf: *GetDefaultWorkerConfig(), + }, + { + name: "zero dead timeout disabled", + conf: WorkerConfig{DeadTimeout: 0}, + }, + { + name: "positive dead timeout", + conf: WorkerConfig{DeadTimeout: 30 * time.Second}, + }, + { + name: "negative max queue size", + conf: WorkerConfig{MaxQueueSize: -1}, + wantErr: InvalidMaxQueueSize, + }, + { + name: "negative dead timeout", + conf: WorkerConfig{DeadTimeout: -1 * time.Second}, + wantErr: InvalidDeadTimeout, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := ValidateWorkerConfig(&tc.conf) + if tc.wantErr != nil { + assert.ErrorIs(t, err, tc.wantErr) + return + } + assert.NoError(t, err) + }) + } +} + +func TestWithMaxQueueSize(t *testing.T) { + conf := &WorkerConfig{} + + err := applyWorkerOptions(conf, WithMaxQueueSize(10)) + assert.NoError(t, err) + assert.Equal(t, 10, conf.MaxQueueSize) + + err = applyWorkerOptions(conf, WithMaxQueueSize(0)) + assert.NoError(t, err) + + err = applyWorkerOptions(conf, WithMaxQueueSize(-1)) + assert.ErrorIs(t, err, InvalidMaxQueueSize) +} + +func TestWithDeadTimeout(t *testing.T) { + conf := &WorkerConfig{} + + err := applyWorkerOptions(conf, WithDeadTimeout(30*time.Second)) + assert.NoError(t, err) + assert.Equal(t, 30*time.Second, conf.DeadTimeout) + + err = applyWorkerOptions(conf, WithDeadTimeout(0)) + assert.NoError(t, err) + + err = applyWorkerOptions(conf, WithDeadTimeout(-1*time.Second)) + assert.ErrorIs(t, err, InvalidDeadTimeout) +} + +func TestNewPoolConfig(t *testing.T) { + conf, err := NewPoolConfig() + assert.NoError(t, err) + assert.Equal(t, GetDefaultPoolConfig(), conf) +} + +func TestDefaultPoolConfig(t *testing.T) { + conf := GetDefaultPoolConfig() + assert.Equal(t, &PoolConfig{ + ConnectionConfig: nil, + WorkerConfig: nil, + }, conf) +} + +func TestValidatePoolConfig(t *testing.T) { + cases := []struct { + name string + conf PoolConfig + wantErrText string + }{ + { + name: "valid empty", + conf: PoolConfig{}, + }, + { + name: "valid defaults", + conf: *GetDefaultPoolConfig(), + }, + { + name: "valid with configs", + conf: PoolConfig{ + ConnectionConfig: &transport.ConnectionConfig{}, + WorkerConfig: &WorkerConfig{}, + }, + }, + { + name: "invalid connection config", + conf: PoolConfig{ + ConnectionConfig: &transport.ConnectionConfig{ + Retry: &transport.RetryConfig{ + InitialDelay: 10 * time.Second, + MaxDelay: 1 * time.Second, + }, + }, + }, + wantErrText: "initial delay may not exceed maximum delay", + }, + { + name: "invalid worker config", + conf: PoolConfig{ + WorkerConfig: &WorkerConfig{MaxQueueSize: -1}, + }, + wantErrText: "maximum queue size cannot be negative", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := ValidatePoolConfig(&tc.conf) + if tc.wantErrText != "" { + assert.ErrorContains(t, err, tc.wantErrText) + return + } + assert.NoError(t, err) + }) + } +} + +func TestWithConnectionConfig(t *testing.T) { + conf := &PoolConfig{} + + err := applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{})) + assert.NoError(t, err) + assert.NotNil(t, conf.ConnectionConfig) + + err = applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{ + Retry: &transport.RetryConfig{ + InitialDelay: 10 * time.Second, + MaxDelay: 1 * time.Second, + }, + })) + assert.Error(t, err) +} + +func TestWithWorkerConfig(t *testing.T) { + conf := &PoolConfig{} + + err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{DeadTimeout: 30 * time.Second})) + assert.NoError(t, err) + assert.Equal(t, 30*time.Second, conf.WorkerConfig.DeadTimeout) + + err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1})) + assert.Error(t, err) +} diff --git a/responderpool/errors.go b/responderpool/errors.go new file mode 100644 index 0000000..c702df1 --- /dev/null +++ b/responderpool/errors.go @@ -0,0 +1,14 @@ +package responderpool + +import "errors" + +var ( + // Pool errors + ErrPoolClosed = errors.New("pool is closed") + ErrPeerNotFound = errors.New("peer not found") + ErrPeerExists = errors.New("peer already exists") + + // Config errors + InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") + InvalidDeadTimeout = errors.New("dead timeout cannot be negative") +) diff --git a/responderpool/helpers_test.go b/responderpool/helpers_test.go new file mode 100644 index 0000000..0c15dd8 --- /dev/null +++ b/responderpool/helpers_test.go @@ -0,0 +1,56 @@ +package responderpool + +import ( + "fmt" + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + "git.wisehodl.dev/jay/go-honeybee/transport" + "github.com/stretchr/testify/assert" + "io" + "testing" +) + +func setupReaderTestConnection(t *testing.T) ( + conn *transport.Connection, + mock *honeybeetest.MockSocket, + incoming chan honeybeetest.MockIncomingData, + outgoing chan honeybeetest.MockOutgoingData, +) { + t.Helper() + + incoming = make(chan honeybeetest.MockIncomingData, 10) + outgoing = make(chan honeybeetest.MockOutgoingData, 10) + mock = honeybeetest.NewMockSocket() + + mock.CloseFunc = func() error { + mock.Once.Do(func() { close(mock.Closed) }) + return nil + } + + mock.ReadMessageFunc = func() (int, []byte, error) { + select { + case data, ok := <-incoming: + if !ok { + return 0, nil, io.EOF + } + return data.MsgType, data.Data, data.Err + case <-mock.Closed: + return 0, nil, io.EOF + } + } + + mock.WriteMessageFunc = func(msgType int, data []byte) error { + select { + case outgoing <- honeybeetest.MockOutgoingData{MsgType: msgType, Data: data}: + return nil + case <-mock.Closed: + return io.EOF + default: + return fmt.Errorf("mock outgoing channel unavailable") + } + } + + var err error + conn, err = transport.NewConnectionFromSocket(mock, nil, nil) + assert.NoError(t, err) + return +} diff --git a/responderpool/pool.go b/responderpool/pool.go new file mode 100644 index 0000000..72faedc --- /dev/null +++ b/responderpool/pool.go @@ -0,0 +1,24 @@ +package responderpool + +import ( + "time" +) + +type PoolEventKind string + +const ( + EventPeerDisconnected PoolEventKind = "disconnected" + EventPeerDropped PoolEventKind = "dropped" + EventPeerEvicted PoolEventKind = "evicted" +) + +type PoolEvent struct { + ID string + Kind PoolEventKind +} + +type InboxMessage struct { + ID string + Data []byte + ReceivedAt time.Time +} diff --git a/responderpool/worker.go b/responderpool/worker.go new file mode 100644 index 0000000..bb8016a --- /dev/null +++ b/responderpool/worker.go @@ -0,0 +1,141 @@ +package responderpool + +import ( + "container/list" + "context" + "errors" + "git.wisehodl.dev/jay/go-honeybee/transport" + "time" +) + +type onExitFunc func(id string, kind PoolEventKind) + +type ReceivedMessage struct { + data []byte + receivedAt time.Time +} + +func RunReader( + ctx context.Context, + id string, + conn *transport.Connection, + messages chan<- ReceivedMessage, + heartbeat chan<- struct{}, + onPeerClose onExitFunc, +) { + for { + select { + case <-ctx.Done(): + return + case data, ok := <-conn.Incoming(): + if !ok { + // determine exit kind + // by default, the peer dropped unexpectedly + kind := EventPeerDropped + select { + case err := <-conn.Errors(): + if errors.Is(err, transport.ErrPeerClosedClean) { + kind = EventPeerDisconnected + } + default: + } + + onPeerClose(id, kind) + return + } + + messages <- ReceivedMessage{data: data, receivedAt: time.Now()} + + select { + case heartbeat <- struct{}{}: + case <-ctx.Done(): + return + } + } + } +} + +func RunForwarder( + ctx context.Context, + id string, + messages <-chan ReceivedMessage, + inbox chan<- InboxMessage, + maxQueueSize int, +) { + queue := list.New() + + for { + var out chan<- InboxMessage + var next ReceivedMessage + + // enable inbox if it is populated + if queue.Len() > 0 { + out = inbox + + // read the first message in the queue + next = queue.Front().Value.(ReceivedMessage) + } + + select { + case <-ctx.Done(): + return + case msg := <-messages: + // limit queue size if maximum is configured + if maxQueueSize > 0 && queue.Len() >= maxQueueSize { + // drop oldest message + queue.Remove(queue.Front()) + } + // add new message + queue.PushBack(msg) + // send next message to inbox + case out <- InboxMessage{ + ID: id, + Data: next.data, + ReceivedAt: next.receivedAt, + }: + // drop message from queue + queue.Remove(queue.Front()) + } + } +} + +func RunWatchdog( + ctx context.Context, + id string, + timeout time.Duration, + heartbeat <-chan struct{}, + onTimeout onExitFunc, +) { + // disable watchdog timeout if not configured + if timeout <= 0 { + // wait for cancel and exit + select { + case <-ctx.Done(): + } + return + } + + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-heartbeat: + // drain the timer channel and reset + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(timeout) + // timer completed + case <-timer.C: + // evict inactive peer + onTimeout(id, EventPeerEvicted) + return + } + } +} diff --git a/responderpool/worker_forwarder_test.go b/responderpool/worker_forwarder_test.go new file mode 100644 index 0000000..1d0ef5c --- /dev/null +++ b/responderpool/worker_forwarder_test.go @@ -0,0 +1 @@ +package responderpool diff --git a/responderpool/worker_reader_test.go b/responderpool/worker_reader_test.go new file mode 100644 index 0000000..1d0ef5c --- /dev/null +++ b/responderpool/worker_reader_test.go @@ -0,0 +1 @@ +package responderpool diff --git a/responderpool/worker_watchdog_test.go b/responderpool/worker_watchdog_test.go new file mode 100644 index 0000000..1d0ef5c --- /dev/null +++ b/responderpool/worker_watchdog_test.go @@ -0,0 +1 @@ +package responderpool