Rewrote connect method to start worker.
This commit is contained in:
@@ -147,58 +147,44 @@ func (p *Pool) Connect(id string) error {
|
|||||||
|
|
||||||
// Check for existing connection in pool
|
// Check for existing connection in pool
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
if p.closed {
|
if p.closed {
|
||||||
p.mu.Unlock()
|
|
||||||
return NewPoolError("pool is closed")
|
return NewPoolError("pool is closed")
|
||||||
}
|
}
|
||||||
_, exists := p.peers[id]
|
_, exists := p.peers[id]
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
return NewPoolError("connection already exists")
|
return NewPoolError("connection already exists")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create new connection
|
// Create new worker
|
||||||
|
stop := make(chan struct{})
|
||||||
|
|
||||||
|
worker, err := p.config.WorkerFactory(id, stop)
|
||||||
|
if err != nil {
|
||||||
|
close(stop)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
var logger *slog.Logger
|
var logger *slog.Logger
|
||||||
if p.logger != nil {
|
if p.logger != nil {
|
||||||
logger = p.logger.With("id", id)
|
logger = p.logger.With("id", id)
|
||||||
}
|
}
|
||||||
conn, err := transport.NewConnection(id, p.config.ConnectionConfig, logger)
|
ctx := WorkerContext{
|
||||||
if err != nil {
|
Inbox: p.inbox,
|
||||||
return err
|
Events: p.events,
|
||||||
}
|
Errors: p.errors,
|
||||||
conn.SetDialer(p.dialer)
|
PoolDone: p.done,
|
||||||
|
Logger: logger,
|
||||||
// Attempt to connect
|
Dialer: p.dialer,
|
||||||
if err := conn.Connect(); err != nil {
|
ConnectionConfig: p.config.ConnectionConfig,
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.mu.Lock()
|
p.wg.Add(1)
|
||||||
if p.closed {
|
go worker.Start(ctx, &p.wg)
|
||||||
// The pool closed while this connection was established.
|
|
||||||
p.mu.Unlock()
|
|
||||||
conn.Close()
|
|
||||||
return NewPoolError("pool is closed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add connection to pool
|
p.peers[id] = &Peer{id: id, worker: worker, stop: stop}
|
||||||
stop := make(chan struct{})
|
|
||||||
if _, exists := p.peers[id]; exists {
|
|
||||||
// Another process connected to this url while this one was connecting
|
|
||||||
// Discard this connection and retain the existing one
|
|
||||||
p.mu.Unlock()
|
|
||||||
conn.Close()
|
|
||||||
return NewPoolError("connection already exists")
|
|
||||||
}
|
|
||||||
p.peers[id] = &Peer{id: id, worker: nil, stop: stop} // TODO: create worker
|
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case p.events <- PoolEvent{ID: id, Kind: EventConnected}:
|
|
||||||
case <-p.done:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user