Initial buildout of worker pattern.

This commit is contained in:
Jay
2026-04-17 13:29:49 -04:00
parent 8a9253a9a4
commit 4407e4f202
7 changed files with 272 additions and 102 deletions

180
config.go
View File

@@ -8,35 +8,42 @@ import (
// Types
type CloseHandler func(code int, text string) error
type WorkerFactory func(
id string,
conn *Connection,
onReconnect func() (*Connection, error),
) Worker
// Pool Config
// Initiator Pool Config
type PoolConfig struct {
type InitiatorPoolConfig struct {
ConnectionConfig *ConnectionConfig
IdleTimeout time.Duration
WorkerFactory WorkerFactory
WorkerConfig *InitiatorWorkerConfig
}
type PoolOption func(*PoolConfig) error
type InitiatorPoolOption func(*InitiatorPoolConfig) error
func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) {
conf := GetDefaultPoolConfig()
if err := applyPoolOptions(conf, options...); err != nil {
func NewInitiatorPoolConfig(options ...InitiatorPoolOption) (*InitiatorPoolConfig, error) {
conf := GetDefaultInitiatorPoolConfig()
if err := applyInitiatorPoolOptions(conf, options...); err != nil {
return nil, err
}
if err := validatePoolConfig(conf); err != nil {
if err := validateInitiatorPoolConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultPoolConfig() *PoolConfig {
return &PoolConfig{
IdleTimeout: 20 * time.Second,
func GetDefaultInitiatorPoolConfig() *InitiatorPoolConfig {
return &InitiatorPoolConfig{
ConnectionConfig: nil,
WorkerFactory: nil,
WorkerConfig: nil,
}
}
func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
func applyInitiatorPoolOptions(config *InitiatorPoolConfig, options ...InitiatorPoolOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
@@ -45,11 +52,8 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
return nil
}
func validatePoolConfig(config *PoolConfig) error {
err := validateIdleTimeout(config.IdleTimeout)
if err != nil {
return err
}
func validateInitiatorPoolConfig(config *InitiatorPoolConfig) error {
var err error
if config.ConnectionConfig != nil {
err = validateConnectionConfig(config.ConnectionConfig)
@@ -58,30 +62,18 @@ func validatePoolConfig(config *PoolConfig) error {
}
}
return nil
}
func validateIdleTimeout(value time.Duration) error {
if value < 0 {
return errors.InvalidIdleTimeout
}
return nil
}
// When IdleTimeout is set to zero, idle timeouts are disabled.
func WithIdleTimeout(value time.Duration) PoolOption {
return func(c *PoolConfig) error {
err := validateIdleTimeout(value)
if config.WorkerConfig != nil {
err = validateInitiatorWorkerConfig(config.WorkerConfig)
if err != nil {
return err
}
c.IdleTimeout = value
return nil
}
return nil
}
func WithConnectionConfig(cc *ConnectionConfig) PoolOption {
return func(c *PoolConfig) error {
func WithInitiatorConnectionConfig(cc *ConnectionConfig) InitiatorPoolOption {
return func(c *InitiatorPoolConfig) error {
err := validateConnectionConfig(cc)
if err != nil {
return err
@@ -91,6 +83,32 @@ func WithConnectionConfig(cc *ConnectionConfig) PoolOption {
}
}
func WithInitiatorWorkerConfig(wc *InitiatorWorkerConfig) InitiatorPoolOption {
return func(c *InitiatorPoolConfig) error {
err := validateInitiatorWorkerConfig(wc)
if err != nil {
return err
}
c.WorkerConfig = wc
return nil
}
}
func WithInitiatorWorkerFactory(wf WorkerFactory) InitiatorPoolOption {
return func(c *InitiatorPoolConfig) error {
c.WorkerFactory = wf
return nil
}
}
// Responder Pool Config
type ResponderPoolConfig struct {
ConnectionConfig *ConnectionConfig
WorkerFactory WorkerFactory
WorkerConfig *ResponderWorkerConfig
}
// Connection Config
type ConnectionConfig struct {
@@ -310,3 +328,95 @@ func WithRetryJitterFactor(value float64) ConnectionOption {
return nil
}
}
// Initiator Worker Config
type InitiatorWorkerConfig struct {
IdleTimeout time.Duration
MaxQueueSize int
}
type InitiatorWorkerOption func(*InitiatorWorkerConfig) error
func NewInitiatorWorkerConfig(options ...InitiatorWorkerOption) (*InitiatorWorkerConfig, error) {
conf := GetDefaultInitiatorWorkerConfig()
if err := applyInitiatorWorkerOptions(conf, options...); err != nil {
return nil, err
}
if err := validateInitiatorWorkerConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultInitiatorWorkerConfig() *InitiatorWorkerConfig {
return &InitiatorWorkerConfig{
IdleTimeout: 20 * time.Second,
MaxQueueSize: 0, // disabled by default
}
}
func applyInitiatorWorkerOptions(config *InitiatorWorkerConfig, options ...InitiatorWorkerOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
}
}
return nil
}
func validateInitiatorWorkerConfig(config *InitiatorWorkerConfig) error {
err := validateIdleTimeout(config.IdleTimeout)
if err != nil {
return err
}
err = validateMaxQueueSize(config.MaxQueueSize)
if err != nil {
return err
}
return nil
}
func validateMaxQueueSize(value int) error {
if value < 0 {
return errors.InvalidMaxQueueSize
}
return nil
}
func validateIdleTimeout(value time.Duration) error {
if value < 0 {
return errors.InvalidIdleTimeout
}
return nil
}
// When IdleTimeout is set to zero, idle timeouts are disabled.
func WithIdleTimeout(value time.Duration) InitiatorWorkerOption {
return func(c *InitiatorWorkerConfig) error {
err := validateIdleTimeout(value)
if err != nil {
return err
}
c.IdleTimeout = value
return nil
}
}
// When MaxQueueSize is set to zero, queue limits are disabled.
func WithMaxQueueSize(value int) InitiatorWorkerOption {
return func(c *InitiatorWorkerConfig) error {
err := validateMaxQueueSize(value)
if err != nil {
return err
}
c.MaxQueueSize = value
return nil
}
}
// Responder Worker Config
type ResponderWorkerConfig struct{}

View File

@@ -1,118 +1,82 @@
package honeybee
import (
"git.wisehodl.dev/jay/go-honeybee/errors"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestNewPoolConfig(t *testing.T) {
conf, err := NewPoolConfig()
conf, err := NewInitiatorPoolConfig()
assert.NoError(t, err)
assert.Equal(t, conf, &PoolConfig{
IdleTimeout: 20 * time.Second,
assert.Equal(t, conf, &InitiatorPoolConfig{
ConnectionConfig: nil,
WorkerConfig: nil,
WorkerFactory: nil,
})
// errors propagate
_, err = NewPoolConfig(WithIdleTimeout(-1))
assert.Error(t, err)
}
func TestDefaultPoolConfig(t *testing.T) {
conf := GetDefaultPoolConfig()
conf := GetDefaultInitiatorPoolConfig()
assert.Equal(t, conf, &PoolConfig{
IdleTimeout: 20 * time.Second,
assert.Equal(t, conf, &InitiatorPoolConfig{
ConnectionConfig: nil,
WorkerConfig: nil,
WorkerFactory: nil,
})
}
func TestApplyPoolOptions(t *testing.T) {
conf := &PoolConfig{}
err := applyPoolOptions(
conf := &InitiatorPoolConfig{}
err := applyInitiatorPoolOptions(
conf,
WithIdleTimeout(15),
WithConnectionConfig(&ConnectionConfig{}),
WithInitiatorConnectionConfig(&ConnectionConfig{}),
)
assert.NoError(t, err)
assert.Equal(t, time.Duration(15), conf.IdleTimeout)
assert.Equal(t, 0*time.Second, conf.ConnectionConfig.WriteTimeout)
// errors propagate
err = applyPoolOptions(
conf,
WithIdleTimeout(-1),
)
assert.ErrorIs(t, err, errors.InvalidIdleTimeout)
}
func TestWithIdleTimeout(t *testing.T) {
conf := &PoolConfig{}
opt := WithIdleTimeout(30)
err := applyPoolOptions(conf, opt)
assert.NoError(t, err)
assert.Equal(t, conf.IdleTimeout, time.Duration(30))
// zero allowed
conf = &PoolConfig{}
opt = WithIdleTimeout(0)
err = applyPoolOptions(conf, opt)
assert.NoError(t, err)
assert.Equal(t, conf.IdleTimeout, time.Duration(0))
// negative disallowed
conf = &PoolConfig{}
opt = WithIdleTimeout(-30)
err = applyPoolOptions(conf, opt)
assert.ErrorIs(t, err, errors.InvalidIdleTimeout)
assert.ErrorContains(t, err, "idle timeout cannot be negative")
}
func TestWithConnectionConfig(t *testing.T) {
conf := &PoolConfig{}
opt := WithConnectionConfig(&ConnectionConfig{WriteTimeout: 1 * time.Second})
err := applyPoolOptions(conf, opt)
conf := &InitiatorPoolConfig{}
opt := WithInitiatorConnectionConfig(&ConnectionConfig{WriteTimeout: 1 * time.Second})
err := applyInitiatorPoolOptions(conf, opt)
assert.NoError(t, err)
assert.NotNil(t, conf.ConnectionConfig)
assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout)
// invalid config is rejected
conf = &PoolConfig{}
opt = WithConnectionConfig(&ConnectionConfig{WriteTimeout: -1 * time.Second})
err = applyPoolOptions(conf, opt)
conf = &InitiatorPoolConfig{}
opt = WithInitiatorConnectionConfig(&ConnectionConfig{WriteTimeout: -1 * time.Second})
err = applyInitiatorPoolOptions(conf, opt)
assert.Error(t, err)
}
func TestValidatePoolConfig(t *testing.T) {
cases := []struct {
name string
conf PoolConfig
conf InitiatorPoolConfig
wantErr error
wantErrText string
}{
{
name: "valid empty",
conf: *&PoolConfig{},
conf: *&InitiatorPoolConfig{},
},
{
name: "valid defaults",
conf: *GetDefaultPoolConfig(),
conf: *GetDefaultInitiatorPoolConfig(),
},
{
name: "valid complete",
conf: PoolConfig{
IdleTimeout: 15 * time.Second,
conf: InitiatorPoolConfig{
ConnectionConfig: &ConnectionConfig{},
},
},
{
name: "invalid connection config",
conf: PoolConfig{
conf: InitiatorPoolConfig{
ConnectionConfig: &ConnectionConfig{
Retry: &RetryConfig{
InitialDelay: 10 * time.Second,
@@ -126,7 +90,7 @@ func TestValidatePoolConfig(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := validatePoolConfig(&tc.conf)
err := validateInitiatorPoolConfig(&tc.conf)
if tc.wantErr != nil || tc.wantErrText != "" {
if tc.wantErr != nil {

View File

@@ -14,6 +14,7 @@ var (
InvalidRetryInitialDelay = errors.New("initial delay must be positive")
InvalidRetryMaxDelay = errors.New("max delay must be positive")
InvalidRetryJitterFactor = errors.New("jitter factor must be between 0.0 and 1.0")
InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative")
)
func NewConfigError(text string) error {

View File

@@ -62,7 +62,7 @@ type pool struct {
errors chan error
done chan struct{}
config *PoolConfig
config *InitiatorPoolConfig
logger *slog.Logger
mu sync.RWMutex
@@ -147,12 +147,12 @@ type InitiatorPool struct {
dialer Dialer
}
func NewInitiatorPool(config *PoolConfig, logger *slog.Logger) (*InitiatorPool, error) {
func NewInitiatorPool(config *InitiatorPoolConfig, logger *slog.Logger) (*InitiatorPool, error) {
if config == nil {
config = GetDefaultPoolConfig()
config = GetDefaultInitiatorPoolConfig()
}
if err := validatePoolConfig(config); err != nil {
if err := validateInitiatorPoolConfig(config); err != nil {
return nil, err
}

View File

@@ -70,7 +70,7 @@ func TestPoolConnect(t *testing.T) {
t.Run("fails to add connection", func(t *testing.T) {
pool, err := NewInitiatorPool(
&PoolConfig{
&InitiatorPoolConfig{
ConnectionConfig: &ConnectionConfig{
Retry: &RetryConfig{
MaxRetries: 1,

View File

@@ -1,18 +1,107 @@
package honeybee
import (
"log/slog"
"sync"
"time"
)
// Types
// Worker Implementation
type Worker interface{}
type Worker interface {
Start(
ctx *WorkerContext,
wg *sync.WaitGroup,
)
}
type WorkerContext struct {
Inbox chan<- InboxMessage
Events chan<- PoolEvent
Errors chan<- error
Stop <-chan struct{}
PoolDone <-chan struct{}
Logger *slog.Logger
}
// Base Struct
type worker struct{}
type worker struct {
id string
}
func (w *worker) runForwarder(
messages <-chan []byte,
inbox chan<- []byte,
stop <-chan struct{},
poolDone <-chan struct{},
maxQueueSize int,
) {
}
// Initiator Worker
type InitiatorWorker struct{}
type InitiatorWorker struct {
*worker
config *InitiatorWorkerConfig
onReconnect func() (*Connection, error)
}
func newInitiatorWorker(
id string,
config *InitiatorWorkerConfig,
onReconnect func() (*Connection, error),
logger *slog.Logger,
) (*InitiatorWorker, error) {
w := &InitiatorWorker{
worker: &worker{
id: id,
logger: logger,
},
config: config,
onReconnect: onReconnect,
}
return w, nil
}
func (w *InitiatorWorker) Start(
inbox chan<- InboxMessage,
events chan<- PoolEvent,
stop <-chan struct{},
poolDone <-chan struct{},
wg *sync.WaitGroup,
) {
}
func runReader(conn *Connection,
messages chan<- []byte,
heartbeat chan<- time.Time,
reconnect chan<- struct{},
newConn <-chan *Connection,
stop <-chan struct{},
poolDone <-chan struct{},
) {
}
func runHealthMonitor(
heartbeat <-chan time.Time,
stop <-chan struct{},
poolDone <-chan struct{},
) {
}
func runReconnector(
reconnect <-chan struct{},
newConn chan<- *Connection,
stop <-chan struct{},
poolDone <-chan struct{},
) {
}
// Responder Worker

View File

@@ -1 +1,7 @@
package honeybee
import (
// "github.com/stretchr/testify/assert"
// "testing"
// "time"
)