Created public api, cleaned up internals.

This commit is contained in:
Jay
2026-04-19 14:23:10 -04:00
parent d2528d3ac7
commit dfd28d65bc
7 changed files with 158 additions and 126 deletions
+85
View File
@@ -0,0 +1,85 @@
package honeybee
import (
"context"
"log/slog"
"git.wisehodl.dev/jay/go-honeybee/initiatorpool"
"git.wisehodl.dev/jay/go-honeybee/transport"
)
// Connection types
type Connection = transport.Connection
type ConnectionConfig = transport.ConnectionConfig
type RetryConfig = transport.RetryConfig
type ConnectionOption = transport.ConnectionOption
// Initator Pool types
type InitiatorPool = initiatorpool.Pool
type InitiatorPoolConfig = initiatorpool.PoolConfig
type InitiatorPoolOption = initiatorpool.PoolOption
type InitiatorWorkerConfig = initiatorpool.WorkerConfig
type InitiatorWorkerOption = initiatorpool.WorkerOption
type InitiatorInboxMessage = initiatorpool.InboxMessage
type InitiatorPoolEvent = initiatorpool.PoolEvent
type InitiatorPoolEventKind = initiatorpool.PoolEventKind
// Pool event constants
const (
EventConnected = initiatorpool.EventConnected
EventDisconnected = initiatorpool.EventDisconnected
)
// Connection constructors
func NewConnection(url string, config *ConnectionConfig, logger *slog.Logger) (*Connection, error) {
return transport.NewConnection(url, config, logger)
}
func NewConnectionConfig(opts ...ConnectionOption) (*ConnectionConfig, error) {
return transport.NewConnectionConfig(opts...)
}
// Connection options
var (
WithoutRetry = transport.WithoutRetry
WithRetryMaxRetries = transport.WithRetryMaxRetries
WithRetryInitialDelay = transport.WithRetryInitialDelay
WithRetryMaxDelay = transport.WithRetryMaxDelay
WithRetryJitterFactor = transport.WithRetryJitterFactor
WithWriteTimeout = transport.WithWriteTimeout
WithCloseHandler = transport.WithCloseHandler
)
// Initiator Pool constructors
func NewInitiatorPool(ctx context.Context, config *InitiatorPoolConfig, logger *slog.Logger) (*InitiatorPool, error) {
return initiatorpool.NewPool(ctx, config, logger)
}
func NewInitiatorPoolConfig(opts ...InitiatorPoolOption) (*InitiatorPoolConfig, error) {
return initiatorpool.NewPoolConfig(opts...)
}
func NewInitiatorWorkerConfig(opts ...InitiatorWorkerOption) (*InitiatorWorkerConfig, error) {
return initiatorpool.NewWorkerConfig(opts...)
}
// Initiator Pool options
var (
WithConnectionConfig = initiatorpool.WithConnectionConfig
WithWorkerConfig = initiatorpool.WithWorkerConfig
WithWorkerFactory = initiatorpool.WithWorkerFactory
)
// Initiator Worker options
var (
WithKeepaliveTimeout = initiatorpool.WithKeepaliveTimeout
WithMaxQueueSize = initiatorpool.WithMaxQueueSize
)
+11 -3
View File
@@ -100,8 +100,15 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger,
return p, nil return p, nil
} }
func (p *Pool) Peers() map[string]*Peer { func (p *Pool) Peers() []string {
return p.peers p.mu.RLock()
defer p.mu.RUnlock()
ids := make([]string, 0, len(p.peers))
for i, _ := range p.peers {
ids = append(ids, i)
}
return ids
} }
func (p *Pool) Inbox() chan InboxMessage { func (p *Pool) Inbox() chan InboxMessage {
@@ -131,8 +138,9 @@ func (p *Pool) Close() {
} }
p.closed = true p.closed = true
p.cancel() p.cancel() // closes all workers
// remove all peers
p.peers = make(map[string]*Peer) p.peers = make(map[string]*Peer)
p.mu.Unlock() p.mu.Unlock()
+47 -105
View File
@@ -4,31 +4,31 @@ import (
"context" "context"
"fmt" "fmt"
"git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net/http" "net/http"
"testing" "testing"
"time"
) )
// TODO: Worker must connect and emit events. func setupPool(t *testing.T) (*Pool, *honeybeetest.MockDialer) {
func _TestPoolConnect(t *testing.T) { t.Helper()
t.Run("successfully adds connection", func(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket()
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
pool, err := NewPool(context.Background(), nil, nil) pool, err := NewPool(context.Background(), nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
dialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return honeybeetest.NewMockSocket(), nil, nil
},
}
pool.dialer = dialer
return pool, dialer
}
pool.dialer = mockDialer func TestPoolConnect(t *testing.T) {
t.Run("successfully adds connection", func(t *testing.T) {
pool, _ := setupPool(t)
err = pool.Connect("wss://test") err := pool.Connect("wss://test")
assert.NoError(t, err) assert.NoError(t, err)
honeybeetest.Eventually(t, func() bool { honeybeetest.Eventually(t, func() bool {
@@ -40,25 +40,15 @@ func _TestPoolConnect(t *testing.T) {
} }
}, "expected event") }, "expected event")
_, exists := pool.peers["wss://test"] assert.Contains(t, pool.Peers(), "wss://test")
assert.True(t, exists)
pool.Close() pool.Close()
}) })
t.Run("does not add duplicate", func(t *testing.T) { t.Run("does not add duplicate", func(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() pool, _ := setupPool(t)
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
pool, err := NewPool(context.Background(), nil, nil) err := pool.Connect("wss://test")
assert.NoError(t, err)
pool.dialer = mockDialer
err = pool.Connect("wss://test")
assert.NoError(t, err) assert.NoError(t, err)
// trailing slash normalizes to same key // trailing slash normalizes to same key
@@ -66,119 +56,71 @@ func _TestPoolConnect(t *testing.T) {
assert.Error(t, err) assert.Error(t, err)
assert.ErrorIs(t, err, ErrPeerExists) assert.ErrorIs(t, err, ErrPeerExists)
pool.mu.RLock() assert.Len(t, pool.Peers(), 1)
assert.Len(t, pool.peers, 1)
pool.mu.RUnlock()
pool.Close()
})
t.Run("fails to add connection", func(t *testing.T) {
pool, err := NewPool(
context.Background(),
&PoolConfig{
ConnectionConfig: &transport.ConnectionConfig{
Retry: &transport.RetryConfig{
MaxRetries: 1,
InitialDelay: 1 * time.Millisecond,
MaxDelay: 5 * time.Millisecond,
}},
}, nil)
assert.NoError(t, err)
pool.dialer = &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return nil, nil, fmt.Errorf("dial failed")
},
}
err = pool.Connect("wss://test")
assert.Error(t, err)
pool.mu.RLock()
assert.Len(t, pool.peers, 0)
pool.mu.RUnlock()
select {
case event := <-pool.events:
t.Fatalf("unexpected event: %+v", event)
default:
}
pool.Close() pool.Close()
}) })
} }
// TODO: Worker must stop connection and emit events func TestPoolClose(t *testing.T) {
func _TestPoolRemove(t *testing.T) { t.Run("channels close after pool close", func(t *testing.T) {
pool, _ := NewPool(context.Background(), nil, nil)
pool.Close()
_, ok := <-pool.Inbox()
assert.False(t, ok)
_, ok = <-pool.Events()
assert.False(t, ok)
_, ok = <-pool.Errors()
assert.False(t, ok)
})
t.Run("connect after close returns error", func(t *testing.T) {
pool, _ := NewPool(context.Background(), nil, nil)
pool.Close()
err := pool.Connect("wss://test")
assert.ErrorIs(t, err, ErrPoolClosed)
})
}
func TestPoolRemove(t *testing.T) {
t.Run("removes known url", func(t *testing.T) { t.Run("removes known url", func(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() pool, _ := setupPool(t)
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
pool, err := NewPool(context.Background(), nil, nil)
assert.NoError(t, err)
pool.dialer = mockDialer
pool.Connect("wss://test") pool.Connect("wss://test")
expectEvent(t, pool.events, "wss://test", EventConnected) expectEvent(t, pool.events, "wss://test", EventConnected)
err = pool.Remove("wss://test/") err := pool.Remove("wss://test/")
assert.NoError(t, err) assert.NoError(t, err)
// expect a disconnected event // expect a disconnected event
expectEvent(t, pool.events, "wss://test", EventDisconnected) expectEvent(t, pool.events, "wss://test", EventDisconnected)
// connection no longer in pool // connection no longer in pool
pool.mu.Lock() assert.NotContains(t, pool.Peers(), "wss://test")
defer pool.mu.Unlock()
_, ok := pool.peers["wss://peer2"]
assert.False(t, ok, "connection is still in pool")
}) })
t.Run("unknown url returns error", func(t *testing.T) { t.Run("unknown url returns error", func(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() pool, _ := setupPool(t)
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
pool, err := NewPool(context.Background(), nil, nil)
assert.NoError(t, err)
pool.dialer = mockDialer
// remove unknown connection // remove unknown connection
err = pool.Remove("wss://unknown") err := pool.Remove("wss://unknown")
assert.ErrorIs(t, err, ErrPeerNotFound) assert.ErrorIs(t, err, ErrPeerNotFound)
}) })
t.Run("closed pool returns error", func(t *testing.T) { t.Run("closed pool returns error", func(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() pool, _ := setupPool(t)
mockDialer := &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
return mockSocket, nil, nil
},
}
pool, err := NewPool(context.Background(), nil, nil)
assert.NoError(t, err)
pool.dialer = mockDialer
// close pool // close pool
pool.Close() pool.Close()
// attempt to remove connection // attempt to remove connection
err = pool.Remove("wss://test") err := pool.Remove("wss://test")
assert.ErrorIs(t, err, ErrPoolClosed) assert.ErrorIs(t, err, ErrPoolClosed)
}) })
} }
// TODO: update worker to be responsible for send func TestPoolSend(t *testing.T) {
func _TestPoolSend(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() mockSocket := honeybeetest.NewMockSocket()
outgoingData := make(chan honeybeetest.MockOutgoingData, 10) outgoingData := make(chan honeybeetest.MockOutgoingData, 10)
mockSocket.WriteMessageFunc = func(msgType int, data []byte) error { mockSocket.WriteMessageFunc = func(msgType int, data []byte) error {
+5 -1
View File
@@ -8,6 +8,7 @@ import (
"git.wisehodl.dev/jay/go-honeybee/types" "git.wisehodl.dev/jay/go-honeybee/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"net/http" "net/http"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@@ -57,11 +58,14 @@ func TestRunDialer(t *testing.T) {
mockSocket := honeybeetest.NewMockSocket() mockSocket := honeybeetest.NewMockSocket()
connConfig := &transport.ConnectionConfig{Retry: nil} // disable retry connConfig := &transport.ConnectionConfig{Retry: nil} // disable retry
started := make(chan struct{})
startOnce := sync.Once{}
wctx := WorkerContext{ wctx := WorkerContext{
Errors: make(chan error, 1), Errors: make(chan error, 1),
Dialer: &honeybeetest.MockDialer{ Dialer: &honeybeetest.MockDialer{
DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) { DialContextFunc: func(context.Context, string, http.Header) (types.Socket, *http.Response, error) {
dialCount.Add(1) dialCount.Add(1)
startOnce.Do(func() { close(started) })
<-gate <-gate
return mockSocket, nil, nil return mockSocket, nil, nil
}, },
@@ -73,7 +77,7 @@ func TestRunDialer(t *testing.T) {
dial <- struct{}{} dial <- struct{}{}
// wait for dial to start blocking on gate // wait for dial to start blocking on gate
time.Sleep(20 * time.Millisecond) <-started
// flood dial while dialer is blocked // flood dial while dialer is blocked
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
+2 -2
View File
@@ -15,14 +15,14 @@ func TestRunKeepalive(t *testing.T) {
defer cancel() defer cancel()
w := &DefaultWorker{ w := &DefaultWorker{
Config: &WorkerConfig{KeepaliveTimeout: 100 * time.Millisecond}, Config: &WorkerConfig{KeepaliveTimeout: 200 * time.Millisecond},
Heartbeat: heartbeat, Heartbeat: heartbeat,
} }
go w.RunKeepalive(ctx, keepalive) go w.RunKeepalive(ctx, keepalive)
// send heartbeats faster than the timeout // send heartbeats faster than the timeout
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
time.Sleep(30 * time.Millisecond) time.Sleep(20 * time.Millisecond)
w.Heartbeat <- struct{}{} w.Heartbeat <- struct{}{}
} }
+2 -8
View File
@@ -147,15 +147,9 @@ func WithWriteTimeout(value time.Duration) ConnectionOption {
} }
} }
// WithRetry enables retry with default parameters (infinite retries, func WithoutRetry() ConnectionOption {
// 1s initial delay, 5s max delay, 0.5 jitter factor).
//
// If passed after granular retry options (WithRetryMaxRetries, etc.),
// it will overwrite them. Use either WithRetry alone or the granular
// options; not both.
func WithRetry() ConnectionOption {
return func(c *ConnectionConfig) error { return func(c *ConnectionConfig) error {
c.Retry = GetDefaultRetryConfig() c.Retry = nil
return nil return nil
} }
} }
+4 -5
View File
@@ -107,13 +107,12 @@ func TestWithWriteTimeout(t *testing.T) {
} }
func TestWithRetry(t *testing.T) { func TestWithRetry(t *testing.T) {
t.Run("default", func(t *testing.T) { t.Run("without retry", func(t *testing.T) {
conf := &ConnectionConfig{} conf := GetDefaultConnectionConfig()
opt := WithRetry() opt := WithoutRetry()
err := applyConnectionOptions(conf, opt) err := applyConnectionOptions(conf, opt)
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, conf.Retry) assert.Nil(t, conf.Retry)
assert.Equal(t, conf.Retry, GetDefaultRetryConfig())
}) })
t.Run("with attempts", func(t *testing.T) { t.Run("with attempts", func(t *testing.T) {