diff --git a/honeybee.go b/honeybee.go index 94ce177..439dee8 100644 --- a/honeybee.go +++ b/honeybee.go @@ -24,6 +24,10 @@ type RetryConfig = transport.RetryConfig type ConnectionOption = transport.ConnectionOption type ConnectionStats = transport.ConnectionStats +// Common Types + +type InboxMessage = types.InboxMessage + // Outbound Pool types type OutboundPool = outbound.Pool @@ -31,7 +35,6 @@ type OutboundPoolConfig = outbound.PoolConfig type OutboundPoolOption = outbound.PoolOption type OutboundWorkerConfig = outbound.WorkerConfig type OutboundWorkerOption = outbound.WorkerOption -type OutboundInboxMessage = outbound.InboxMessage type OutboundPoolEvent = outbound.PoolEvent type OutboundPoolEventKind = outbound.PoolEventKind type OutboundPoolStats = outbound.PoolStats @@ -54,7 +57,6 @@ type InboundWorkerOption = inbound.WorkerOption type InboundWorkerFactory = inbound.WorkerFactory type InboundWorker = inbound.Worker type InboundWorkerExitKind = inbound.WorkerExitKind -type InboundInboxMessage = inbound.InboxMessage type InboundPoolEvent = inbound.PoolEvent type InboundPoolEventKind = inbound.PoolEventKind type InboundPoolStats = inbound.PoolStats diff --git a/inbound/pool.go b/inbound/pool.go index 5f3ecf7..3242980 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -9,7 +9,6 @@ import ( "log/slog" "sync" "sync/atomic" - "time" ) // Types @@ -55,14 +54,8 @@ type PeerStats struct { Connection transport.ConnectionStats } -type InboxMessage struct { - ID string - Data []byte - ReceivedAt time.Time -} - type PoolPlugin struct { - Inbox chan<- InboxMessage + Inbox chan<- types.InboxMessage Events chan<- PoolEvent Errors chan<- error InboxCounter *atomic.Uint64 @@ -86,7 +79,7 @@ type Pool struct { id string peers map[string]*Peer - inbox chan InboxMessage + inbox chan types.InboxMessage events chan PoolEvent errors chan error @@ -143,7 +136,7 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha cancel: cancel, id: id, peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), + inbox: make(chan types.InboxMessage, config.InboxBufferSize), events: make(chan PoolEvent, config.EventsBufferSize), errors: make(chan error, config.ErrorsBufferSize), inboxCounter: &atomic.Uint64{}, @@ -166,7 +159,7 @@ func (p *Pool) Peers() []string { return ids } -func (p *Pool) Inbox() <-chan InboxMessage { +func (p *Pool) Inbox() <-chan types.InboxMessage { return p.inbox } diff --git a/inbound/worker.go b/inbound/worker.go index 9e521d1..40eaa76 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -252,7 +252,7 @@ func RunForwarder( id string, ctx context.Context, messages <-chan types.ReceivedMessage, - inbox chan<- InboxMessage, + inbox chan<- types.InboxMessage, workerProcessedCount *atomic.Uint64, poolInboxCount *atomic.Uint64, ) { @@ -268,7 +268,7 @@ func RunForwarder( case <-ctx.Done(): return - case inbox <- InboxMessage{ + case inbox <- types.InboxMessage{ ID: id, Data: msg.Data, ReceivedAt: msg.ReceivedAt, diff --git a/inbound/worker_forwarder_test.go b/inbound/worker_forwarder_test.go index f89157c..c6f8272 100644 --- a/inbound/worker_forwarder_test.go +++ b/inbound/worker_forwarder_test.go @@ -13,7 +13,7 @@ func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { id := "wss://test" messages := make(chan types.ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) + inbox := make(chan types.InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/inbound/worker_test.go b/inbound/worker_test.go index 2a9dff1..ee5b725 100644 --- a/inbound/worker_test.go +++ b/inbound/worker_test.go @@ -5,6 +5,7 @@ import ( "fmt" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "sync" @@ -19,7 +20,7 @@ type workerTestVars struct { incoming chan honeybeetest.MockIncomingData outgoing chan honeybeetest.MockOutgoingData pool PoolPlugin - inbox chan InboxMessage + inbox chan types.InboxMessage events chan PoolEvent exitKind *atomic.Value wg *sync.WaitGroup @@ -36,7 +37,7 @@ func setupWorkerTest(t *testing.T) workerTestVars { assert.NoError(t, err) worker.cancel = cancel - inbox := make(chan InboxMessage, 256) + inbox := make(chan types.InboxMessage, 256) events := make(chan PoolEvent, 10) exitKind := &atomic.Value{} @@ -134,7 +135,7 @@ func TestWorkerStart(t *testing.T) { exitKind := &atomic.Value{} var once sync.Once pool := PoolPlugin{ - Inbox: make(chan InboxMessage, 256), + Inbox: make(chan types.InboxMessage, 256), Events: make(chan PoolEvent, 10), OnExit: func(kind WorkerExitKind) { once.Do(func() { exitKind.Store(kind) }) diff --git a/outbound/pool.go b/outbound/pool.go index 3777b0e..658c2ce 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -8,7 +8,6 @@ import ( "log/slog" "sync" "sync/atomic" - "time" ) // Types @@ -42,15 +41,9 @@ type PeerStats struct { Worker WorkerStats } -type InboxMessage struct { - ID string - Data []byte - ReceivedAt time.Time -} - type PoolPlugin struct { ID string - Inbox chan<- InboxMessage + Inbox chan<- types.InboxMessage Events chan<- PoolEvent Errors chan<- error InboxCounter *atomic.Uint64 @@ -73,7 +66,7 @@ type Pool struct { id string peers map[string]*Peer - inbox chan InboxMessage + inbox chan types.InboxMessage events chan PoolEvent errors chan error @@ -127,7 +120,7 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha cancel: cancel, id: id, peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), + inbox: make(chan types.InboxMessage, config.InboxBufferSize), events: make(chan PoolEvent, config.EventsBufferSize), errors: make(chan error, config.ErrorsBufferSize), inboxCounter: &atomic.Uint64{}, @@ -150,7 +143,7 @@ func (p *Pool) Peers() []string { return ids } -func (p *Pool) Inbox() <-chan InboxMessage { +func (p *Pool) Inbox() <-chan types.InboxMessage { return p.inbox } diff --git a/outbound/worker.go b/outbound/worker.go index a3bc613..8ac0132 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -407,7 +407,7 @@ func RunForwarder( id string, ctx context.Context, messages <-chan types.ReceivedMessage, - inbox chan<- InboxMessage, + inbox chan<- types.InboxMessage, workerProcessedCount *atomic.Uint64, poolInboxCount *atomic.Uint64, ) { @@ -423,7 +423,7 @@ func RunForwarder( case <-ctx.Done(): return - case inbox <- InboxMessage{ + case inbox <- types.InboxMessage{ ID: id, Data: msg.Data, ReceivedAt: msg.ReceivedAt, diff --git a/outbound/worker_forwarder_test.go b/outbound/worker_forwarder_test.go index 60a4a72..7b1e519 100644 --- a/outbound/worker_forwarder_test.go +++ b/outbound/worker_forwarder_test.go @@ -13,7 +13,7 @@ func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { id := "wss://test" messages := make(chan types.ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) + inbox := make(chan types.InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/outbound/worker_start_test.go b/outbound/worker_start_test.go index e8b1f50..6a67d35 100644 --- a/outbound/worker_start_test.go +++ b/outbound/worker_start_test.go @@ -16,13 +16,13 @@ import ( ) func makeWorkerContext(t *testing.T) ( - inbox chan InboxMessage, + inbox chan types.InboxMessage, events chan PoolEvent, errors chan error, pool PoolPlugin, ) { t.Helper() - inbox = make(chan InboxMessage, 256) + inbox = make(chan types.InboxMessage, 256) events = make(chan PoolEvent, 10) errors = make(chan error, 10) pool = PoolPlugin{ diff --git a/types/types.go b/types/types.go index 0ff4f46..5847f1b 100644 --- a/types/types.go +++ b/types/types.go @@ -29,3 +29,9 @@ type ReceivedMessage struct { Data []byte ReceivedAt time.Time } + +type InboxMessage struct { + ID string + Data []byte + ReceivedAt time.Time +}