started responder pool

This commit is contained in:
Jay
2026-04-20 06:57:48 -04:00
parent 3066802f62
commit 9859796338
9 changed files with 585 additions and 0 deletions
+159
View File
@@ -0,0 +1,159 @@
// responderpool/config.go
package responderpool
import (
"git.wisehodl.dev/jay/go-honeybee/transport"
"time"
)
// Worker Config
type WorkerConfig struct {
MaxQueueSize int
DeadTimeout time.Duration
}
type WorkerOption func(*WorkerConfig) error
func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) {
conf := GetDefaultWorkerConfig()
if err := applyWorkerOptions(conf, options...); err != nil {
return nil, err
}
if err := ValidateWorkerConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
MaxQueueSize: 0, // queue can grow indefinitely by default
DeadTimeout: 0, // eviction disabled by default
}
}
func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
}
}
return nil
}
func ValidateWorkerConfig(config *WorkerConfig) error {
if err := validateMaxQueueSize(config.MaxQueueSize); err != nil {
return err
}
if err := validateDeadTimeout(config.DeadTimeout); err != nil {
return err
}
return nil
}
func validateMaxQueueSize(value int) error {
if value < 0 {
return InvalidMaxQueueSize
}
return nil
}
func validateDeadTimeout(value time.Duration) error {
if value < 0 {
return InvalidDeadTimeout
}
return nil
}
// When MaxQueueSize is zero, queue limits are disabled.
func WithMaxQueueSize(value int) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateMaxQueueSize(value); err != nil {
return err
}
c.MaxQueueSize = value
return nil
}
}
// When DeadTimeout is zero, the watchdog is disabled.
func WithDeadTimeout(value time.Duration) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateDeadTimeout(value); err != nil {
return err
}
c.DeadTimeout = value
return nil
}
}
// Pool Config
type PoolConfig struct {
ConnectionConfig *transport.ConnectionConfig
WorkerConfig *WorkerConfig
}
type PoolOption func(*PoolConfig) error
func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) {
conf := GetDefaultPoolConfig()
if err := applyPoolOptions(conf, options...); err != nil {
return nil, err
}
if err := ValidatePoolConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultPoolConfig() *PoolConfig {
return &PoolConfig{
ConnectionConfig: nil,
WorkerConfig: nil,
}
}
func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
}
}
return nil
}
func ValidatePoolConfig(config *PoolConfig) error {
if config.ConnectionConfig != nil {
if err := transport.ValidateConnectionConfig(config.ConnectionConfig); err != nil {
return err
}
}
if config.WorkerConfig != nil {
if err := ValidateWorkerConfig(config.WorkerConfig); err != nil {
return err
}
}
return nil
}
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
return func(c *PoolConfig) error {
if err := transport.ValidateConnectionConfig(cc); err != nil {
return err
}
c.ConnectionConfig = cc
return nil
}
}
func WithWorkerConfig(wc *WorkerConfig) PoolOption {
return func(c *PoolConfig) error {
if err := ValidateWorkerConfig(wc); err != nil {
return err
}
c.WorkerConfig = wc
return nil
}
}
+188
View File
@@ -0,0 +1,188 @@
// responderpool/config_test.go
package responderpool
import (
"git.wisehodl.dev/jay/go-honeybee/transport"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestNewWorkerConfig(t *testing.T) {
conf, err := NewWorkerConfig()
assert.NoError(t, err)
assert.Equal(t, GetDefaultWorkerConfig(), conf)
}
func TestDefaultWorkerConfig(t *testing.T) {
conf := GetDefaultWorkerConfig()
assert.Equal(t, &WorkerConfig{
MaxQueueSize: 0,
DeadTimeout: 0,
}, conf)
}
func TestValidateWorkerConfig(t *testing.T) {
cases := []struct {
name string
conf WorkerConfig
wantErr error
}{
{
name: "valid defaults",
conf: *GetDefaultWorkerConfig(),
},
{
name: "zero dead timeout disabled",
conf: WorkerConfig{DeadTimeout: 0},
},
{
name: "positive dead timeout",
conf: WorkerConfig{DeadTimeout: 30 * time.Second},
},
{
name: "negative max queue size",
conf: WorkerConfig{MaxQueueSize: -1},
wantErr: InvalidMaxQueueSize,
},
{
name: "negative dead timeout",
conf: WorkerConfig{DeadTimeout: -1 * time.Second},
wantErr: InvalidDeadTimeout,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := ValidateWorkerConfig(&tc.conf)
if tc.wantErr != nil {
assert.ErrorIs(t, err, tc.wantErr)
return
}
assert.NoError(t, err)
})
}
}
func TestWithMaxQueueSize(t *testing.T) {
conf := &WorkerConfig{}
err := applyWorkerOptions(conf, WithMaxQueueSize(10))
assert.NoError(t, err)
assert.Equal(t, 10, conf.MaxQueueSize)
err = applyWorkerOptions(conf, WithMaxQueueSize(0))
assert.NoError(t, err)
err = applyWorkerOptions(conf, WithMaxQueueSize(-1))
assert.ErrorIs(t, err, InvalidMaxQueueSize)
}
func TestWithDeadTimeout(t *testing.T) {
conf := &WorkerConfig{}
err := applyWorkerOptions(conf, WithDeadTimeout(30*time.Second))
assert.NoError(t, err)
assert.Equal(t, 30*time.Second, conf.DeadTimeout)
err = applyWorkerOptions(conf, WithDeadTimeout(0))
assert.NoError(t, err)
err = applyWorkerOptions(conf, WithDeadTimeout(-1*time.Second))
assert.ErrorIs(t, err, InvalidDeadTimeout)
}
func TestNewPoolConfig(t *testing.T) {
conf, err := NewPoolConfig()
assert.NoError(t, err)
assert.Equal(t, GetDefaultPoolConfig(), conf)
}
func TestDefaultPoolConfig(t *testing.T) {
conf := GetDefaultPoolConfig()
assert.Equal(t, &PoolConfig{
ConnectionConfig: nil,
WorkerConfig: nil,
}, conf)
}
func TestValidatePoolConfig(t *testing.T) {
cases := []struct {
name string
conf PoolConfig
wantErrText string
}{
{
name: "valid empty",
conf: PoolConfig{},
},
{
name: "valid defaults",
conf: *GetDefaultPoolConfig(),
},
{
name: "valid with configs",
conf: PoolConfig{
ConnectionConfig: &transport.ConnectionConfig{},
WorkerConfig: &WorkerConfig{},
},
},
{
name: "invalid connection config",
conf: PoolConfig{
ConnectionConfig: &transport.ConnectionConfig{
Retry: &transport.RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
},
},
wantErrText: "initial delay may not exceed maximum delay",
},
{
name: "invalid worker config",
conf: PoolConfig{
WorkerConfig: &WorkerConfig{MaxQueueSize: -1},
},
wantErrText: "maximum queue size cannot be negative",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := ValidatePoolConfig(&tc.conf)
if tc.wantErrText != "" {
assert.ErrorContains(t, err, tc.wantErrText)
return
}
assert.NoError(t, err)
})
}
}
func TestWithConnectionConfig(t *testing.T) {
conf := &PoolConfig{}
err := applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{}))
assert.NoError(t, err)
assert.NotNil(t, conf.ConnectionConfig)
err = applyPoolOptions(conf, WithConnectionConfig(&transport.ConnectionConfig{
Retry: &transport.RetryConfig{
InitialDelay: 10 * time.Second,
MaxDelay: 1 * time.Second,
},
}))
assert.Error(t, err)
}
func TestWithWorkerConfig(t *testing.T) {
conf := &PoolConfig{}
err := applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{DeadTimeout: 30 * time.Second}))
assert.NoError(t, err)
assert.Equal(t, 30*time.Second, conf.WorkerConfig.DeadTimeout)
err = applyPoolOptions(conf, WithWorkerConfig(&WorkerConfig{MaxQueueSize: -1}))
assert.Error(t, err)
}
+14
View File
@@ -0,0 +1,14 @@
package responderpool
import "errors"
var (
// Pool errors
ErrPoolClosed = errors.New("pool is closed")
ErrPeerNotFound = errors.New("peer not found")
ErrPeerExists = errors.New("peer already exists")
// Config errors
InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative")
InvalidDeadTimeout = errors.New("dead timeout cannot be negative")
)
+56
View File
@@ -0,0 +1,56 @@
package responderpool
import (
"fmt"
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"github.com/stretchr/testify/assert"
"io"
"testing"
)
func setupReaderTestConnection(t *testing.T) (
conn *transport.Connection,
mock *honeybeetest.MockSocket,
incoming chan honeybeetest.MockIncomingData,
outgoing chan honeybeetest.MockOutgoingData,
) {
t.Helper()
incoming = make(chan honeybeetest.MockIncomingData, 10)
outgoing = make(chan honeybeetest.MockOutgoingData, 10)
mock = honeybeetest.NewMockSocket()
mock.CloseFunc = func() error {
mock.Once.Do(func() { close(mock.Closed) })
return nil
}
mock.ReadMessageFunc = func() (int, []byte, error) {
select {
case data, ok := <-incoming:
if !ok {
return 0, nil, io.EOF
}
return data.MsgType, data.Data, data.Err
case <-mock.Closed:
return 0, nil, io.EOF
}
}
mock.WriteMessageFunc = func(msgType int, data []byte) error {
select {
case outgoing <- honeybeetest.MockOutgoingData{MsgType: msgType, Data: data}:
return nil
case <-mock.Closed:
return io.EOF
default:
return fmt.Errorf("mock outgoing channel unavailable")
}
}
var err error
conn, err = transport.NewConnectionFromSocket(mock, nil, nil)
assert.NoError(t, err)
return
}
+24
View File
@@ -0,0 +1,24 @@
package responderpool
import (
"time"
)
type PoolEventKind string
const (
EventPeerDisconnected PoolEventKind = "disconnected"
EventPeerDropped PoolEventKind = "dropped"
EventPeerEvicted PoolEventKind = "evicted"
)
type PoolEvent struct {
ID string
Kind PoolEventKind
}
type InboxMessage struct {
ID string
Data []byte
ReceivedAt time.Time
}
+141
View File
@@ -0,0 +1,141 @@
package responderpool
import (
"container/list"
"context"
"errors"
"git.wisehodl.dev/jay/go-honeybee/transport"
"time"
)
type onExitFunc func(id string, kind PoolEventKind)
type ReceivedMessage struct {
data []byte
receivedAt time.Time
}
func RunReader(
ctx context.Context,
id string,
conn *transport.Connection,
messages chan<- ReceivedMessage,
heartbeat chan<- struct{},
onPeerClose onExitFunc,
) {
for {
select {
case <-ctx.Done():
return
case data, ok := <-conn.Incoming():
if !ok {
// determine exit kind
// by default, the peer dropped unexpectedly
kind := EventPeerDropped
select {
case err := <-conn.Errors():
if errors.Is(err, transport.ErrPeerClosedClean) {
kind = EventPeerDisconnected
}
default:
}
onPeerClose(id, kind)
return
}
messages <- ReceivedMessage{data: data, receivedAt: time.Now()}
select {
case heartbeat <- struct{}{}:
case <-ctx.Done():
return
}
}
}
}
func RunForwarder(
ctx context.Context,
id string,
messages <-chan ReceivedMessage,
inbox chan<- InboxMessage,
maxQueueSize int,
) {
queue := list.New()
for {
var out chan<- InboxMessage
var next ReceivedMessage
// enable inbox if it is populated
if queue.Len() > 0 {
out = inbox
// read the first message in the queue
next = queue.Front().Value.(ReceivedMessage)
}
select {
case <-ctx.Done():
return
case msg := <-messages:
// limit queue size if maximum is configured
if maxQueueSize > 0 && queue.Len() >= maxQueueSize {
// drop oldest message
queue.Remove(queue.Front())
}
// add new message
queue.PushBack(msg)
// send next message to inbox
case out <- InboxMessage{
ID: id,
Data: next.data,
ReceivedAt: next.receivedAt,
}:
// drop message from queue
queue.Remove(queue.Front())
}
}
}
func RunWatchdog(
ctx context.Context,
id string,
timeout time.Duration,
heartbeat <-chan struct{},
onTimeout onExitFunc,
) {
// disable watchdog timeout if not configured
if timeout <= 0 {
// wait for cancel and exit
select {
case <-ctx.Done():
}
return
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-heartbeat:
// drain the timer channel and reset
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(timeout)
// timer completed
case <-timer.C:
// evict inactive peer
onTimeout(id, EventPeerEvicted)
return
}
}
}
+1
View File
@@ -0,0 +1 @@
package responderpool
+1
View File
@@ -0,0 +1 @@
package responderpool
+1
View File
@@ -0,0 +1 @@
package responderpool