Injected context cancellation for dial and retry cancellation.
This commit is contained in:
@@ -2,6 +2,7 @@ package initiatorpool
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"git.wisehodl.dev/jay/go-honeybee/transport"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -15,15 +16,16 @@ type receivedMessage struct {
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
id string
|
||||
stop <-chan struct{}
|
||||
config *WorkerConfig
|
||||
outbound chan []byte
|
||||
}
|
||||
|
||||
func NewWorker(
|
||||
ctx context.Context,
|
||||
id string,
|
||||
stop <-chan struct{},
|
||||
config *WorkerConfig,
|
||||
|
||||
) (*Worker, error) {
|
||||
@@ -36,9 +38,11 @@ func NewWorker(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wctx, cancel := context.WithCancel(ctx)
|
||||
w := &Worker{
|
||||
ctx: wctx,
|
||||
cancel: cancel,
|
||||
id: id,
|
||||
stop: stop,
|
||||
outbound: make(chan []byte, 64),
|
||||
config: config,
|
||||
}
|
||||
@@ -50,7 +54,7 @@ func (w *Worker) Send(data []byte) error {
|
||||
select {
|
||||
case w.outbound <- data:
|
||||
return nil
|
||||
case <-w.stop:
|
||||
case <-w.ctx.Done():
|
||||
return NewWorkerError(w.id, "worker is stopped")
|
||||
default:
|
||||
return NewWorkerError(w.id, "outbound queue full")
|
||||
@@ -63,7 +67,14 @@ func (w *Worker) Start(
|
||||
) {
|
||||
}
|
||||
|
||||
func (w *Worker) Stop() {
|
||||
w.cancel()
|
||||
}
|
||||
|
||||
func (w *Worker) runSession(
|
||||
ctx context.Context,
|
||||
wctx WorkerContext,
|
||||
|
||||
messages chan<- receivedMessage,
|
||||
heartbeat chan<- struct{},
|
||||
dial chan<- struct{},
|
||||
@@ -71,11 +82,6 @@ func (w *Worker) runSession(
|
||||
keepalive <-chan struct{},
|
||||
outbound <-chan []byte,
|
||||
newConn <-chan *transport.Connection,
|
||||
|
||||
ctx WorkerContext,
|
||||
|
||||
workerStop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
) {
|
||||
}
|
||||
|
||||
@@ -98,20 +104,18 @@ func (w *Worker) runWriter(
|
||||
}
|
||||
|
||||
func (w *Worker) runStopMonitor(
|
||||
ctx context.Context,
|
||||
conn *transport.Connection,
|
||||
keepalive <-chan struct{},
|
||||
workerStop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
sessionDone <-chan struct{},
|
||||
onStop func(),
|
||||
) {
|
||||
}
|
||||
|
||||
func (w *Worker) runForwarder(
|
||||
ctx context.Context,
|
||||
messages <-chan receivedMessage,
|
||||
inbox chan<- InboxMessage,
|
||||
stop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
maxQueueSize int,
|
||||
) {
|
||||
queue := list.New()
|
||||
@@ -129,9 +133,7 @@ func (w *Worker) runForwarder(
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-poolDone:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-messages:
|
||||
// limit queue size if maximum is configured
|
||||
@@ -154,17 +156,15 @@ func (w *Worker) runForwarder(
|
||||
}
|
||||
|
||||
func (w *Worker) runKeepalive(
|
||||
ctx context.Context,
|
||||
heartbeat <-chan struct{},
|
||||
keepalive chan<- struct{},
|
||||
stop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
) {
|
||||
// disable keepalive timeout if not configured
|
||||
if w.config.KeepaliveTimeout <= 0 {
|
||||
// wait for stop signal and exit
|
||||
// wait for cancel and exit
|
||||
select {
|
||||
case <-stop:
|
||||
case <-poolDone:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -174,9 +174,7 @@ func (w *Worker) runKeepalive(
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-poolDone:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-heartbeat:
|
||||
// drain the timer channel and reset
|
||||
@@ -199,28 +197,29 @@ func (w *Worker) runKeepalive(
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) dial(ctx WorkerContext) (*transport.Connection, error) {
|
||||
conn, err := transport.NewConnection(w.id, ctx.ConnectionConfig, ctx.Logger)
|
||||
func (w *Worker) dial(
|
||||
ctx context.Context,
|
||||
wctx WorkerContext,
|
||||
) (*transport.Connection, error) {
|
||||
conn, err := transport.NewConnection(w.id, wctx.ConnectionConfig, wctx.Logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.SetDialer(ctx.Dialer)
|
||||
return conn, conn.Connect()
|
||||
conn.SetDialer(wctx.Dialer)
|
||||
return conn, conn.Connect(ctx)
|
||||
}
|
||||
|
||||
func (w *Worker) runDialer(
|
||||
ctx context.Context,
|
||||
wctx WorkerContext,
|
||||
|
||||
dial <-chan struct{},
|
||||
newConn chan<- *transport.Connection,
|
||||
ctx WorkerContext,
|
||||
stop <-chan struct{},
|
||||
poolDone <-chan struct{},
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-poolDone:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-dial:
|
||||
// drain dial signals while connection is being established
|
||||
@@ -236,15 +235,14 @@ func (w *Worker) runDialer(
|
||||
}()
|
||||
|
||||
// dial a new connection
|
||||
conn, err := w.dial(ctx)
|
||||
conn, err := w.dial(ctx, wctx)
|
||||
close(done)
|
||||
|
||||
// send error if dial failed and continue
|
||||
if err != nil {
|
||||
select {
|
||||
case ctx.Errors <- err:
|
||||
case <-stop:
|
||||
case <-poolDone:
|
||||
case wctx.Errors <- err:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -252,10 +250,7 @@ func (w *Worker) runDialer(
|
||||
// send the new connection or close and exit
|
||||
select {
|
||||
case newConn <- conn:
|
||||
case <-stop:
|
||||
conn.Close()
|
||||
return
|
||||
case <-poolDone:
|
||||
case <-ctx.Done():
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user