implement pool Send method.
This commit is contained in:
25
pool.go
25
pool.go
@@ -124,6 +124,22 @@ func (p *pool) removePeer(id string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *pool) send(id string, data []byte) error {
|
||||||
|
p.mu.RLock()
|
||||||
|
defer p.mu.RUnlock()
|
||||||
|
|
||||||
|
if p.closed {
|
||||||
|
return errors.NewPoolError("pool is closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
peer, exists := p.peers[id]
|
||||||
|
if !exists {
|
||||||
|
return errors.NewPoolError("connection not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return peer.conn.Send(data)
|
||||||
|
}
|
||||||
|
|
||||||
// Outbound Pool
|
// Outbound Pool
|
||||||
|
|
||||||
type OutboundPool struct {
|
type OutboundPool struct {
|
||||||
@@ -251,6 +267,15 @@ func (p *OutboundPool) Remove(url string) error {
|
|||||||
return p.removePeer(url)
|
return p.removePeer(url)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *OutboundPool) Send(url string, data []byte) error {
|
||||||
|
url, err := NormalizeURL(url)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.send(url, data)
|
||||||
|
}
|
||||||
|
|
||||||
// Inbound Pool
|
// Inbound Pool
|
||||||
|
|
||||||
type InboundPool struct {
|
type InboundPool struct {
|
||||||
|
|||||||
30
pool_test.go
30
pool_test.go
@@ -2,6 +2,7 @@ package honeybee
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -169,6 +170,35 @@ func TestPoolRemove(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPoolSend(t *testing.T) {
|
||||||
|
mockSocket := NewMockSocket()
|
||||||
|
outgoingData := make(chan mockOutgoingData, 10)
|
||||||
|
mockSocket.WriteMessageFunc = func(msgType int, data []byte) error {
|
||||||
|
outgoingData <- mockOutgoingData{msgType: msgType, data: data}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
mockDialer := &MockDialer{
|
||||||
|
DialFunc: func(string, http.Header) (Socket, *http.Response, error) {
|
||||||
|
return mockSocket, nil, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pool, err := NewOutboundPool(nil, nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
pool.dialer = mockDialer
|
||||||
|
|
||||||
|
err = pool.Connect("wss://test")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
expectEvent(t, pool.events, "wss://test", EventConnected)
|
||||||
|
|
||||||
|
err = pool.Send("wss://test", []byte("hello"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
expectWrite(t, outgoingData, websocket.TextMessage, []byte("hello"))
|
||||||
|
|
||||||
|
pool.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func expectEvent(
|
func expectEvent(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
events chan PoolEvent,
|
events chan PoolEvent,
|
||||||
|
|||||||
Reference in New Issue
Block a user