diff --git a/initiatorpool/config.go b/initiatorpool/config.go index 2c7f038..7816668 100644 --- a/initiatorpool/config.go +++ b/initiatorpool/config.go @@ -8,7 +8,7 @@ import ( // Types -type WorkerFactory func(ctx context.Context, id string) (*Worker, error) +type WorkerFactory func(ctx context.Context, id string) (Worker, error) // Pool Config diff --git a/initiatorpool/pool.go b/initiatorpool/pool.go index ed85c78..0ca1232 100644 --- a/initiatorpool/pool.go +++ b/initiatorpool/pool.go @@ -13,7 +13,7 @@ import ( type Peer struct { id string - worker *Worker + worker Worker } type WorkerContext struct { @@ -74,7 +74,7 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, // deadlocks. if config.WorkerFactory == nil { config.WorkerFactory = func( - ctx context.Context, id string) (*Worker, error) { + ctx context.Context, id string) (Worker, error) { return NewWorker(ctx, id, config.WorkerConfig) } } diff --git a/initiatorpool/worker.go b/initiatorpool/worker.go index 4c7074b..f5db150 100644 --- a/initiatorpool/worker.go +++ b/initiatorpool/worker.go @@ -11,20 +11,26 @@ import ( // Worker -type receivedMessage struct { +type Worker interface { + Start(wctx WorkerContext, wg *sync.WaitGroup) + Stop() + Send(data []byte) error +} + +type ReceivedMessage struct { data []byte receivedAt time.Time } -type Worker struct { - ctx context.Context - cancel context.CancelFunc +type DefaultWorker struct { + Ctx context.Context + Cancel context.CancelFunc - id string - config *WorkerConfig + Id string + Config *WorkerConfig - conn atomic.Pointer[transport.Connection] - heartbeat chan struct{} + Conn atomic.Pointer[transport.Connection] + Heartbeat chan struct{} } func NewWorker( @@ -32,7 +38,7 @@ func NewWorker( id string, config *WorkerConfig, -) (*Worker, error) { +) (*DefaultWorker, error) { if config == nil { config = GetDefaultWorkerConfig() } @@ -43,68 +49,68 @@ func NewWorker( } wctx, cancel := context.WithCancel(ctx) - w := &Worker{ - ctx: wctx, - cancel: cancel, - id: id, - config: config, - heartbeat: make(chan struct{}), + w := &DefaultWorker{ + Ctx: wctx, + Cancel: cancel, + Id: id, + Config: config, + Heartbeat: make(chan struct{}), } return w, nil } -func (w *Worker) Start( +func (w *DefaultWorker) Start( wctx WorkerContext, wg *sync.WaitGroup, ) { dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) - messages := make(chan receivedMessage, 256) + messages := make(chan ReceivedMessage, 256) keepalive := make(chan struct{}, 1) var owg sync.WaitGroup owg.Add(4) - go func() { defer owg.Done(); w.runDialer(w.ctx, wctx, dial, newConn) }() - go func() { defer owg.Done(); w.runKeepalive(w.ctx, keepalive) }() - go func() { defer owg.Done(); w.runForwarder(w.ctx, messages, wctx.Inbox, w.config.MaxQueueSize) }() - go func() { defer owg.Done(); w.runSession(w.ctx, wctx, messages, dial, keepalive, newConn) }() + go func() { defer owg.Done(); w.RunDialer(w.Ctx, wctx, dial, newConn) }() + go func() { defer owg.Done(); w.RunKeepalive(w.Ctx, keepalive) }() + go func() { defer owg.Done(); w.RunForwarder(w.Ctx, messages, wctx.Inbox, w.Config.MaxQueueSize) }() + go func() { defer owg.Done(); w.RunSession(w.Ctx, wctx, messages, dial, keepalive, newConn) }() owg.Wait() wg.Done() } -func (w *Worker) Stop() { - w.cancel() +func (w *DefaultWorker) Stop() { + w.Cancel() } -func (w *Worker) Send(data []byte) error { - conn := w.conn.Load() +func (w *DefaultWorker) Send(data []byte) error { + conn := w.Conn.Load() if conn == nil { // connection not established by session - return NewWorkerError(w.id, ErrConnectionUnavailable) + return NewWorkerError(w.Id, ErrConnectionUnavailable) } err := conn.Send(data) if err != nil { - return NewWorkerError(w.id, err) + return NewWorkerError(w.Id, err) } select { - case w.heartbeat <- struct{}{}: - case <-w.ctx.Done(): + case w.Heartbeat <- struct{}{}: + case <-w.Ctx.Done(): } return nil } -func (w *Worker) runSession( +func (w *DefaultWorker) RunSession( ctx context.Context, wctx WorkerContext, - messages chan<- receivedMessage, + messages chan<- ReceivedMessage, dial chan<- struct{}, keepalive <-chan struct{}, @@ -135,8 +141,8 @@ func (w *Worker) runSession( } // set up new connection - w.conn.Store(conn) - wctx.Events <- PoolEvent{ID: w.id, Kind: EventConnected} + w.Conn.Store(conn) + wctx.Events <- PoolEvent{ID: w.Id, Kind: EventConnected} // set up session sessionDone := make(chan struct{}) @@ -150,19 +156,19 @@ func (w *Worker) runSession( wg.Add(2) go func() { defer wg.Done() - w.runReader(conn, messages, sessionDone, onStop) + w.RunReader(conn, messages, sessionDone, onStop) }() go func() { defer wg.Done() - w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + w.RunStopMonitor(ctx, conn, keepalive, sessionDone, onStop) }() // complete session wg.Wait() // tear down connection - w.conn.Store(nil) - wctx.Events <- PoolEvent{ID: w.id, Kind: EventDisconnected} + w.Conn.Store(nil) + wctx.Events <- PoolEvent{ID: w.Id, Kind: EventDisconnected} // exit if worker is shutting down select { @@ -176,9 +182,9 @@ func (w *Worker) runSession( } -func (w *Worker) runReader( +func (w *DefaultWorker) RunReader( conn *transport.Connection, - messages chan<- receivedMessage, + messages chan<- ReceivedMessage, sessionDone <-chan struct{}, onStop func(), ) { @@ -198,14 +204,14 @@ func (w *Worker) runReader( } // send message forward - messages <- receivedMessage{ + messages <- ReceivedMessage{ data: data, receivedAt: time.Now(), } // send heartbeat select { - case w.heartbeat <- struct{}{}: + case w.Heartbeat <- struct{}{}: case <-sessionDone: return } @@ -213,7 +219,7 @@ func (w *Worker) runReader( } } -func (w *Worker) runStopMonitor( +func (w *DefaultWorker) RunStopMonitor( ctx context.Context, conn *transport.Connection, keepalive <-chan struct{}, @@ -232,9 +238,9 @@ func (w *Worker) runStopMonitor( } } -func (w *Worker) runForwarder( +func (w *DefaultWorker) RunForwarder( ctx context.Context, - messages <-chan receivedMessage, + messages <-chan ReceivedMessage, inbox chan<- InboxMessage, maxQueueSize int, ) { @@ -242,14 +248,14 @@ func (w *Worker) runForwarder( for { var out chan<- InboxMessage - var next receivedMessage + var next ReceivedMessage // enable inbox if it is populated if queue.Len() > 0 { out = inbox // read the first message in the queue - next = queue.Front().Value.(receivedMessage) + next = queue.Front().Value.(ReceivedMessage) } select { @@ -265,7 +271,7 @@ func (w *Worker) runForwarder( queue.PushBack(msg) // send next message to inbox case out <- InboxMessage{ - ID: w.id, + ID: w.Id, Data: next.data, ReceivedAt: next.receivedAt, }: @@ -275,12 +281,12 @@ func (w *Worker) runForwarder( } } -func (w *Worker) runKeepalive( +func (w *DefaultWorker) RunKeepalive( ctx context.Context, keepalive chan<- struct{}, ) { // disable keepalive timeout if not configured - if w.config.KeepaliveTimeout <= 0 { + if w.Config.KeepaliveTimeout <= 0 { // wait for cancel and exit select { case <-ctx.Done(): @@ -288,14 +294,14 @@ func (w *Worker) runKeepalive( return } - timer := time.NewTimer(w.config.KeepaliveTimeout) + timer := time.NewTimer(w.Config.KeepaliveTimeout) defer timer.Stop() for { select { case <-ctx.Done(): return - case <-w.heartbeat: + case <-w.Heartbeat: // drain the timer channel and reset if !timer.Stop() { select { @@ -303,7 +309,7 @@ func (w *Worker) runKeepalive( default: } } - timer.Reset(w.config.KeepaliveTimeout) + timer.Reset(w.Config.KeepaliveTimeout) // timer completed case <-timer.C: // send keepalive signal, then reset the timer @@ -311,16 +317,16 @@ func (w *Worker) runKeepalive( case keepalive <- struct{}{}: default: } - timer.Reset(w.config.KeepaliveTimeout) + timer.Reset(w.Config.KeepaliveTimeout) } } } -func (w *Worker) dial( +func (w *DefaultWorker) Dial( ctx context.Context, wctx WorkerContext, ) (*transport.Connection, error) { - conn, err := transport.NewConnection(w.id, wctx.ConnectionConfig, wctx.Logger) + conn, err := transport.NewConnection(w.Id, wctx.ConnectionConfig, wctx.Logger) if err != nil { return nil, err } @@ -329,7 +335,7 @@ func (w *Worker) dial( return conn, conn.Connect(ctx) } -func (w *Worker) runDialer( +func (w *DefaultWorker) RunDialer( ctx context.Context, wctx WorkerContext, @@ -354,7 +360,7 @@ func (w *Worker) runDialer( }() // dial a new connection - conn, err := w.dial(ctx, wctx) + conn, err := w.Dial(ctx, wctx) close(done) // send error if dial failed and continue diff --git a/initiatorpool/worker_dialer_test.go b/initiatorpool/worker_dialer_test.go index a3876ea..73d6fe9 100644 --- a/initiatorpool/worker_dialer_test.go +++ b/initiatorpool/worker_dialer_test.go @@ -15,7 +15,7 @@ import ( func TestRunDialer(t *testing.T) { t.Run("successful dial delivers connection to newConn", func(t *testing.T) { - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -31,7 +31,7 @@ func TestRunDialer(t *testing.T) { }, } - go w.runDialer(ctx, wctx, dial, newConn) + go w.RunDialer(ctx, wctx, dial, newConn) dial <- struct{}{} honeybeetest.Eventually(t, func() bool { @@ -46,7 +46,7 @@ func TestRunDialer(t *testing.T) { t.Run("concurrent dial signals are drained; only one connection produced.", func(t *testing.T) { - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -69,7 +69,7 @@ func TestRunDialer(t *testing.T) { ConnectionConfig: connConfig, } - go w.runDialer(ctx, wctx, dial, newConn) + go w.RunDialer(ctx, wctx, dial, newConn) dial <- struct{}{} // wait for dial to start blocking on gate @@ -107,7 +107,7 @@ func TestRunDialer(t *testing.T) { }) t.Run("dial failure emits error, succeeds on next signal", func(t *testing.T) { - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} errors := make(chan error, 1) dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) @@ -135,7 +135,7 @@ func TestRunDialer(t *testing.T) { ConnectionConfig: connConfig, } - go w.runDialer(ctx, wctx, dial, newConn) + go w.RunDialer(ctx, wctx, dial, newConn) dial <- struct{}{} honeybeetest.Eventually(t, func() bool { @@ -160,7 +160,7 @@ func TestRunDialer(t *testing.T) { }) t.Run("exits on context cancellation", func(t *testing.T) { - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -169,7 +169,7 @@ func TestRunDialer(t *testing.T) { done := make(chan struct{}) go func() { - w.runDialer(ctx, wctx, dial, newConn) + w.RunDialer(ctx, wctx, dial, newConn) close(done) }() @@ -186,7 +186,7 @@ func TestRunDialer(t *testing.T) { }) t.Run("context cancelled during in-progress dial exits without delivering connection", func(t *testing.T) { - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) ctx, cancel := context.WithCancel(context.Background()) @@ -207,7 +207,7 @@ func TestRunDialer(t *testing.T) { done := make(chan struct{}) go func() { - w.runDialer(ctx, wctx, dial, newConn) + w.RunDialer(ctx, wctx, dial, newConn) close(done) }() diff --git a/initiatorpool/worker_forwarder_test.go b/initiatorpool/worker_forwarder_test.go index 45db6fc..fd752db 100644 --- a/initiatorpool/worker_forwarder_test.go +++ b/initiatorpool/worker_forwarder_test.go @@ -10,15 +10,15 @@ import ( func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) inbox := make(chan InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{id: "wss://test"} - go w.runForwarder(ctx, messages, inbox, 0) + w := &DefaultWorker{Id: "wss://test"} + go w.RunForwarder(ctx, messages, inbox, 0) - messages <- receivedMessage{data: []byte("hello"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("hello"), receivedAt: time.Now()} honeybeetest.Eventually(t, func() bool { select { @@ -31,7 +31,7 @@ func TestRunForwarder(t *testing.T) { }) t.Run("oldest message dropped when queue is full", func(t *testing.T) { - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) inbox := make(chan InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -47,13 +47,13 @@ func TestRunForwarder(t *testing.T) { } }() - w := &Worker{id: "wss://test"} - go w.runForwarder(ctx, messages, gatedInbox, 2) + w := &DefaultWorker{Id: "wss://test"} + go w.RunForwarder(ctx, messages, gatedInbox, 2) // send three messages while the gated inbox is blocked - messages <- receivedMessage{data: []byte("first"), receivedAt: time.Now()} - messages <- receivedMessage{data: []byte("second"), receivedAt: time.Now()} - messages <- receivedMessage{data: []byte("third"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("first"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("second"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("third"), receivedAt: time.Now()} // allow time for the first message to be dropped time.Sleep(20 * time.Millisecond) @@ -78,15 +78,15 @@ func TestRunForwarder(t *testing.T) { }) t.Run("exits on context cancellation", func(t *testing.T) { - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) inbox := make(chan InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{id: "wss://test"} + w := &DefaultWorker{Id: "wss://test"} done := make(chan struct{}) go func() { - w.runForwarder(ctx, messages, inbox, 0) + w.RunForwarder(ctx, messages, inbox, 0) close(done) }() diff --git a/initiatorpool/worker_keepalive_test.go b/initiatorpool/worker_keepalive_test.go index b6871a7..b5a069a 100644 --- a/initiatorpool/worker_keepalive_test.go +++ b/initiatorpool/worker_keepalive_test.go @@ -14,16 +14,16 @@ func TestRunKeepalive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{ - config: &WorkerConfig{KeepaliveTimeout: 100 * time.Millisecond}, - heartbeat: heartbeat, + w := &DefaultWorker{ + Config: &WorkerConfig{KeepaliveTimeout: 100 * time.Millisecond}, + Heartbeat: heartbeat, } - go w.runKeepalive(ctx, keepalive) + go w.RunKeepalive(ctx, keepalive) // send heartbeats faster than the timeout for i := 0; i < 5; i++ { time.Sleep(30 * time.Millisecond) - w.heartbeat <- struct{}{} + w.Heartbeat <- struct{}{} } // because the timer is being reset, keepalive signal should not be sent @@ -42,8 +42,8 @@ func TestRunKeepalive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{config: &WorkerConfig{KeepaliveTimeout: 20 * time.Millisecond}} - go w.runKeepalive(ctx, keepalive) + w := &DefaultWorker{Config: &WorkerConfig{KeepaliveTimeout: 20 * time.Millisecond}} + go w.RunKeepalive(ctx, keepalive) // send no heartbeats, wait for timeout and keepalive signal honeybeetest.Eventually(t, func() bool { @@ -60,10 +60,10 @@ func TestRunKeepalive(t *testing.T) { keepalive := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) - w := &Worker{config: &WorkerConfig{KeepaliveTimeout: 20 * time.Second}} + w := &DefaultWorker{Config: &WorkerConfig{KeepaliveTimeout: 20 * time.Second}} done := make(chan struct{}) go func() { - w.runKeepalive(ctx, keepalive) + w.RunKeepalive(ctx, keepalive) close(done) }() diff --git a/initiatorpool/worker_send_test.go b/initiatorpool/worker_send_test.go index 684e623..aded7da 100644 --- a/initiatorpool/worker_send_test.go +++ b/initiatorpool/worker_send_test.go @@ -19,14 +19,14 @@ func TestWorkerSend(t *testing.T) { heartbeat := make(chan struct{}) heartbeatCount := atomic.Int32{} - w := &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Heartbeat: heartbeat, } - w.conn.Store(conn) - defer w.cancel() + w.Conn.Store(conn) + defer w.Cancel() go func() { for range heartbeat { @@ -61,14 +61,14 @@ func TestWorkerSend(t *testing.T) { heartbeat := make(chan struct{}) heartbeatCount := atomic.Int32{} - w := &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Heartbeat: heartbeat, } - w.conn.Store(conn) - defer w.cancel() + w.Conn.Store(conn) + defer w.Cancel() go func() { for range heartbeat { @@ -92,13 +92,13 @@ func TestWorkerSend(t *testing.T) { heartbeat := make(chan struct{}) - w := &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Heartbeat: heartbeat, } - defer w.cancel() + defer w.Cancel() go func() { for range heartbeat { diff --git a/initiatorpool/worker_session_inner_test.go b/initiatorpool/worker_session_inner_test.go index 3002729..f86fea1 100644 --- a/initiatorpool/worker_session_inner_test.go +++ b/initiatorpool/worker_session_inner_test.go @@ -18,24 +18,24 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupWorkerTestConnection(t) defer conn.Close() - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) heartbeat := make(chan struct{}) sessionDone := make(chan struct{}) onStop := func() {} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Heartbeat: heartbeat, } go func() { for range heartbeat { } }() - go w.runReader(conn, messages, sessionDone, onStop) + go w.RunReader(conn, messages, sessionDone, onStop) before := time.Now() incomingData <- honeybeetest.MockIncomingData{ @@ -57,18 +57,18 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupWorkerTestConnection(t) defer conn.Close() - messages := make(chan receivedMessage, 10) + messages := make(chan ReceivedMessage, 10) heartbeat := make(chan struct{}) sessionDone := make(chan struct{}) onStop := func() {} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - w := &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Heartbeat: heartbeat, } received := atomic.Int32{} @@ -81,7 +81,7 @@ func TestRunReader(t *testing.T) { for range messages { } }() - go w.runReader(conn, messages, sessionDone, onStop) + go w.RunReader(conn, messages, sessionDone, onStop) const count = 3 for i := 0; i < count; i++ { @@ -99,17 +99,17 @@ func TestRunReader(t *testing.T) { t.Run("incoming channel close calls conn.Close and onStop", func(t *testing.T) { conn, _, incomingData, _ := setupWorkerTestConnection(t) - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) heartbeat := make(chan struct{}) sessionDone := make(chan struct{}) onStopCalled := atomic.Bool{} onStop := func() { onStopCalled.Store(true) } ctx := context.Background() - w := &Worker{ - ctx: ctx, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Id: "wss://test", + Heartbeat: heartbeat, } go func() { for range heartbeat { @@ -119,7 +119,7 @@ func TestRunReader(t *testing.T) { for range messages { } }() - go w.runReader(conn, messages, sessionDone, onStop) + go w.RunReader(conn, messages, sessionDone, onStop) // induce connection closure via reader incomingData <- honeybeetest.MockIncomingData{Err: io.EOF} @@ -139,19 +139,19 @@ func TestRunReader(t *testing.T) { t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { conn, _, _, _ := setupWorkerTestConnection(t) - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) heartbeat := make(chan struct{}) sessionDone := make(chan struct{}) onStopCalled := atomic.Bool{} onStop := func() { onStopCalled.Store(true) } ctx := context.Background() - w := &Worker{ - ctx: ctx, - id: "wss://test", - heartbeat: heartbeat, + w := &DefaultWorker{ + Ctx: ctx, + Id: "wss://test", + Heartbeat: heartbeat, } - go w.runReader(conn, messages, sessionDone, onStop) + go w.RunReader(conn, messages, sessionDone, onStop) close(sessionDone) @@ -176,8 +176,8 @@ func TestRunStopMonitor(t *testing.T) { onStopCalled := atomic.Bool{} onStop := func() { onStopCalled.Store(true) } - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + w := &DefaultWorker{Id: "wss://test"} + go w.RunStopMonitor(ctx, conn, keepalive, sessionDone, onStop) keepalive <- struct{}{} @@ -199,8 +199,8 @@ func TestRunStopMonitor(t *testing.T) { onStopCalled := atomic.Bool{} onStop := func() { onStopCalled.Store(true) } - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + w := &DefaultWorker{Id: "wss://test"} + go w.RunStopMonitor(ctx, conn, keepalive, sessionDone, onStop) cancel() @@ -223,8 +223,8 @@ func TestRunStopMonitor(t *testing.T) { onStopCalled := atomic.Bool{} onStop := func() { onStopCalled.Store(true) } - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + w := &DefaultWorker{Id: "wss://test"} + go w.RunStopMonitor(ctx, conn, keepalive, sessionDone, onStop) close(sessionDone) diff --git a/initiatorpool/worker_session_test.go b/initiatorpool/worker_session_test.go index 3deab49..776c005 100644 --- a/initiatorpool/worker_session_test.go +++ b/initiatorpool/worker_session_test.go @@ -22,7 +22,7 @@ func drainEvent(t *testing.T, events <-chan PoolEvent, kind PoolEventKind) { func TestRunSessionDial(t *testing.T) { setup := func(t *testing.T) ( - w *Worker, + w *DefaultWorker, ctx context.Context, cancel context.CancelFunc, dial chan struct{}, @@ -31,12 +31,12 @@ func TestRunSessionDial(t *testing.T) { ) { t.Helper() ctx, cancel = context.WithCancel(context.Background()) - w = &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: GetDefaultWorkerConfig(), - heartbeat: make(chan struct{}), + w = &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Config: GetDefaultWorkerConfig(), + Heartbeat: make(chan struct{}), } dial = make(chan struct{}, 1) keepalive = make(chan struct{}, 1) @@ -60,10 +60,10 @@ func TestRunSessionDial(t *testing.T) { w, ctx, cancel, dial, keepalive, newConn := setup(t) defer cancel() - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) wctx := WorkerContext{Events: make(chan PoolEvent, 10)} - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) expectDial(t, dial) }) @@ -72,10 +72,10 @@ func TestRunSessionDial(t *testing.T) { w, ctx, cancel, dial, keepalive, newConn := setup(t) defer cancel() - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) wctx := WorkerContext{Events: make(chan PoolEvent, 10)} - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) // drain initial dial expectDial(t, dial) @@ -88,10 +88,10 @@ func TestRunSessionDial(t *testing.T) { w, ctx, cancel, dial, keepalive, newConn := setup(t) defer cancel() - messages := make(chan receivedMessage, 1) + messages := make(chan ReceivedMessage, 1) wctx := WorkerContext{Events: make(chan PoolEvent, 10)} - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) // drain initial dial expectDial(t, dial) @@ -105,27 +105,27 @@ func TestRunSessionDial(t *testing.T) { func TestRunSessionConnect(t *testing.T) { setup := func(t *testing.T) ( - w *Worker, + w *DefaultWorker, ctx context.Context, cancel context.CancelFunc, dial chan struct{}, keepalive chan struct{}, newConn chan *transport.Connection, - messages chan receivedMessage, + messages chan ReceivedMessage, ) { t.Helper() ctx, cancel = context.WithCancel(context.Background()) - w = &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: GetDefaultWorkerConfig(), - heartbeat: make(chan struct{}), + w = &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Config: GetDefaultWorkerConfig(), + Heartbeat: make(chan struct{}), } dial = make(chan struct{}, 1) keepalive = make(chan struct{}, 1) newConn = make(chan *transport.Connection, 1) - messages = make(chan receivedMessage, 256) + messages = make(chan ReceivedMessage, 256) return } @@ -135,12 +135,12 @@ func TestRunSessionConnect(t *testing.T) { defer cancel() conn, _, _, _ := setupWorkerTestConnection(t) - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn honeybeetest.Eventually(t, func() bool { - return w.conn.Load() != nil + return w.Conn.Load() != nil }, "expected w.conn to be set") }) @@ -151,14 +151,14 @@ func TestRunSessionConnect(t *testing.T) { defer cancel() conn, _, _, _ := setupWorkerTestConnection(t) - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn honeybeetest.Eventually(t, func() bool { select { case event := <-events: - return event.ID == w.id && event.Kind == EventConnected + return event.ID == w.Id && event.Kind == EventConnected default: return false } @@ -168,29 +168,29 @@ func TestRunSessionConnect(t *testing.T) { func TestRunSessionDisconnect(t *testing.T) { setup := func(t *testing.T) ( - w *Worker, + w *DefaultWorker, ctx context.Context, cancel context.CancelFunc, dial chan struct{}, keepalive chan struct{}, newConn chan *transport.Connection, - messages chan receivedMessage, + messages chan ReceivedMessage, conn *transport.Connection, incomingData chan honeybeetest.MockIncomingData, ) { t.Helper() ctx, cancel = context.WithCancel(context.Background()) - w = &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: GetDefaultWorkerConfig(), - heartbeat: make(chan struct{}), + w = &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Config: GetDefaultWorkerConfig(), + Heartbeat: make(chan struct{}), } dial = make(chan struct{}, 1) keepalive = make(chan struct{}, 1) newConn = make(chan *transport.Connection, 1) - messages = make(chan receivedMessage, 256) + messages = make(chan ReceivedMessage, 256) conn, _, incomingData, _ = setupWorkerTestConnection(t) return } @@ -201,7 +201,7 @@ func TestRunSessionDisconnect(t *testing.T) { wctx := WorkerContext{Events: events} defer cancel() - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn drainEvent(t, events, EventConnected) @@ -216,7 +216,7 @@ func TestRunSessionDisconnect(t *testing.T) { wctx := WorkerContext{Events: events} defer cancel() - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn drainEvent(t, events, EventConnected) @@ -224,7 +224,7 @@ func TestRunSessionDisconnect(t *testing.T) { drainEvent(t, events, EventDisconnected) honeybeetest.Eventually(t, func() bool { - return w.conn.Load() == nil + return w.Conn.Load() == nil }, "expected w.conn to be cleared") }) @@ -234,7 +234,7 @@ func TestRunSessionDisconnect(t *testing.T) { wctx := WorkerContext{Events: events} defer cancel() - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn drainEvent(t, events, EventConnected) @@ -260,7 +260,7 @@ func TestRunSessionDisconnect(t *testing.T) { wctx := WorkerContext{Events: events} defer cancel() - go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) newConn <- conn drainEvent(t, events, EventConnected) @@ -276,27 +276,27 @@ func TestRunSessionDisconnect(t *testing.T) { func TestRunSessionCancellation(t *testing.T) { setup := func(t *testing.T) ( - w *Worker, + w *DefaultWorker, ctx context.Context, cancel context.CancelFunc, dial chan struct{}, keepalive chan struct{}, newConn chan *transport.Connection, - messages chan receivedMessage, + messages chan ReceivedMessage, ) { t.Helper() ctx, cancel = context.WithCancel(context.Background()) - w = &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: GetDefaultWorkerConfig(), - heartbeat: make(chan struct{}), + w = &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Config: GetDefaultWorkerConfig(), + Heartbeat: make(chan struct{}), } dial = make(chan struct{}, 1) keepalive = make(chan struct{}, 1) newConn = make(chan *transport.Connection, 1) - messages = make(chan receivedMessage, 256) + messages = make(chan ReceivedMessage, 256) return } @@ -308,7 +308,7 @@ func TestRunSessionCancellation(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) }() cancel() @@ -342,7 +342,7 @@ func TestRunSessionCancellation(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) }() newConn <- conn @@ -373,7 +373,7 @@ func TestRunSessionCancellation(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + w.RunSession(ctx, wctx, messages, dial, keepalive, newConn) }() newConn <- conn @@ -385,7 +385,7 @@ func TestRunSessionCancellation(t *testing.T) { drainEvent(t, events, EventDisconnected) honeybeetest.Eventually(t, func() bool { - return w.conn.Load() == nil + return w.Conn.Load() == nil }, "expected w.conn to clear") }) } diff --git a/initiatorpool/worker_start_test.go b/initiatorpool/worker_start_test.go index 4e78389..7a82f39 100644 --- a/initiatorpool/worker_start_test.go +++ b/initiatorpool/worker_start_test.go @@ -31,14 +31,14 @@ func makeWorkerContext(t *testing.T) ( return } -func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *Worker { +func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *DefaultWorker { t.Helper() - return &Worker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: GetDefaultWorkerConfig(), - heartbeat: make(chan struct{}), + return &DefaultWorker{ + Ctx: ctx, + Cancel: cancel, + Id: "wss://test", + Config: GetDefaultWorkerConfig(), + Heartbeat: make(chan struct{}), } } @@ -67,7 +67,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { select { case e := <-events: - return e.ID == w.id && e.Kind == EventConnected + return e.ID == w.Id && e.Kind == EventConnected default: return false } @@ -154,7 +154,7 @@ func TestWorkerStart(t *testing.T) { honeybeetest.Eventually(t, func() bool { select { case msg := <-inbox: - return msg.ID == w.id && string(msg.Data) == "hello" + return msg.ID == w.Id && string(msg.Data) == "hello" default: return false }