wrote embassy journal emissions
This commit is contained in:
+61
-6
@@ -154,11 +154,21 @@ func (e *Embassy) Dispatch(url string) error {
|
|||||||
subs := e.eventSubs
|
subs := e.eventSubs
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
at := time.Now()
|
||||||
|
if e.journals != nil {
|
||||||
|
c, _ := component.Get(e.ctx)
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
return fmt.Errorf("closing")
|
||||||
|
case e.journals <- NewPeerAddedJournal(url, c, PeerAddedData{At: at}):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, ch := range subs {
|
for _, ch := range subs {
|
||||||
select {
|
select {
|
||||||
case <-e.ctx.Done():
|
case <-e.ctx.Done():
|
||||||
return fmt.Errorf("closing")
|
return fmt.Errorf("closing")
|
||||||
case ch <- NewPoolEvent(url, EventAdded, time.Now()):
|
case ch <- NewPoolEvent(url, EventAdded, at):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,11 +190,21 @@ func (e *Embassy) Dismiss(url string) error {
|
|||||||
subs := e.eventSubs
|
subs := e.eventSubs
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
at := time.Now()
|
||||||
|
if e.journals != nil {
|
||||||
|
c, _ := component.Get(e.ctx)
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
return fmt.Errorf("closing")
|
||||||
|
case e.journals <- NewPeerRemovedJournal(url, c, PeerRemovedData{At: at}):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, ch := range subs {
|
for _, ch := range subs {
|
||||||
select {
|
select {
|
||||||
case <-e.ctx.Done():
|
case <-e.ctx.Done():
|
||||||
return fmt.Errorf("closing")
|
return fmt.Errorf("closing")
|
||||||
case ch <- NewPoolEvent(url, EventRemoved, time.Now()):
|
case ch <- NewPoolEvent(url, EventRemoved, at):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,7 +214,6 @@ func (e *Embassy) Dismiss(url string) error {
|
|||||||
func (e *Embassy) Close() {
|
func (e *Embassy) Close() {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
peers := e.peers
|
peers := e.peers
|
||||||
e.peers = make(map[string]bool)
|
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
// dismiss peers
|
// dismiss peers
|
||||||
@@ -206,6 +225,9 @@ func (e *Embassy) Close() {
|
|||||||
e.wg.Wait()
|
e.wg.Wait()
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
|
// reset peers after dismissal
|
||||||
|
e.peers = make(map[string]bool)
|
||||||
|
|
||||||
subs := e.eventSubs
|
subs := e.eventSubs
|
||||||
e.eventSubs = make([]chan PoolEvent, 0)
|
e.eventSubs = make([]chan PoolEvent, 0)
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
@@ -214,6 +236,10 @@ func (e *Embassy) Close() {
|
|||||||
for _, sub := range subs {
|
for _, sub := range subs {
|
||||||
close(sub)
|
close(sub)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if e.journals != nil {
|
||||||
|
close(e.journals)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Embassy) Peers() []string {
|
func (e *Embassy) Peers() []string {
|
||||||
@@ -281,22 +307,51 @@ func (e *Embassy) runEventRouter() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
url, err := honeybee.NormalizeURL(ev.ID)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.HasPeer(url) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
kind := convertPoolEvent[ev.Kind]
|
kind := convertPoolEvent[ev.Kind]
|
||||||
|
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
switch kind {
|
switch kind {
|
||||||
case EventConnected:
|
case EventConnected:
|
||||||
e.peers[ev.ID] = true
|
e.peers[url] = true
|
||||||
case EventDisconnected:
|
case EventDisconnected:
|
||||||
e.peers[ev.ID] = false
|
e.peers[url] = false
|
||||||
}
|
}
|
||||||
subs := e.eventSubs
|
subs := e.eventSubs
|
||||||
|
canJournal := e.journals != nil
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
|
if canJournal {
|
||||||
|
switch kind {
|
||||||
|
case EventConnected:
|
||||||
|
c, _ := component.Get(e.ctx)
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
case e.journals <- NewPeerConnectedJournal(
|
||||||
|
url, c, PeerConnectedData{At: ev.At}):
|
||||||
|
}
|
||||||
|
case EventDisconnected:
|
||||||
|
c, _ := component.Get(e.ctx)
|
||||||
|
select {
|
||||||
|
case <-e.ctx.Done():
|
||||||
|
case e.journals <- NewPeerDisconnectedJournal(
|
||||||
|
url, c, PeerDisconnectedData{At: ev.At}):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, ch := range subs {
|
for _, ch := range subs {
|
||||||
select {
|
select {
|
||||||
case <-e.ctx.Done():
|
case <-e.ctx.Done():
|
||||||
case ch <- NewPoolEvent(ev.ID, kind, ev.At):
|
case ch <- NewPoolEvent(url, kind, ev.At):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+102
@@ -50,6 +50,8 @@ func TestEmbassyPoolEvents(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("connected", func(t *testing.T) {
|
t.Run("connected", func(t *testing.T) {
|
||||||
|
e.Dispatch("wss://test")
|
||||||
|
|
||||||
eventsCh <- honeybee.OutboundPoolEvent{
|
eventsCh <- honeybee.OutboundPoolEvent{
|
||||||
ID: "wss://test",
|
ID: "wss://test",
|
||||||
Kind: honeybee.OutboundEventConnected,
|
Kind: honeybee.OutboundEventConnected,
|
||||||
@@ -67,6 +69,8 @@ func TestEmbassyPoolEvents(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("disconnected", func(t *testing.T) {
|
t.Run("disconnected", func(t *testing.T) {
|
||||||
|
e.Dispatch("wss://test")
|
||||||
|
|
||||||
eventsCh <- honeybee.OutboundPoolEvent{
|
eventsCh <- honeybee.OutboundPoolEvent{
|
||||||
ID: "wss://test",
|
ID: "wss://test",
|
||||||
Kind: honeybee.OutboundEventDisconnected,
|
Kind: honeybee.OutboundEventDisconnected,
|
||||||
@@ -243,3 +247,101 @@ func TestEmbassyClose(t *testing.T) {
|
|||||||
return !ok1 && !ok2
|
return !ok1 && !ok2
|
||||||
}, "subs should close")
|
}, "subs should close")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEmbassyJournals(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
jc := NewJournalCollector()
|
||||||
|
eventsCh := make(chan honeybee.OutboundPoolEvent, 1)
|
||||||
|
|
||||||
|
pool := EmbassyPlugin{
|
||||||
|
Connect: func(id string) error { return nil },
|
||||||
|
Remove: func(id string) error { return nil },
|
||||||
|
Send: func(id string, data []byte) error { return nil },
|
||||||
|
Events: eventsCh,
|
||||||
|
}
|
||||||
|
|
||||||
|
e := NewEmbassy(ctx, pool, jc, nil)
|
||||||
|
out := jc.Out()
|
||||||
|
peer := "wss://test"
|
||||||
|
|
||||||
|
// added
|
||||||
|
e.Dispatch(peer)
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case entry := <-out:
|
||||||
|
_, ok := entry.(PeerAddedJournal)
|
||||||
|
return ok
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected PeerAddedJournal")
|
||||||
|
|
||||||
|
// connected
|
||||||
|
eventsCh <- honeybee.OutboundPoolEvent{
|
||||||
|
ID: peer,
|
||||||
|
Kind: honeybee.OutboundEventConnected,
|
||||||
|
At: time.Now(),
|
||||||
|
}
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case entry := <-out:
|
||||||
|
e, ok := entry.(PeerConnectedJournal)
|
||||||
|
|
||||||
|
// ensure fields are correct
|
||||||
|
peerOk := e.PeerID() == "wss://test"
|
||||||
|
modOk := e.Component().Module() == "prism"
|
||||||
|
pathOk := e.Component().PathString() == "embassy"
|
||||||
|
|
||||||
|
return ok && peerOk && modOk && pathOk
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected PeerConnectedJournal")
|
||||||
|
|
||||||
|
// disconnected
|
||||||
|
eventsCh <- honeybee.OutboundPoolEvent{
|
||||||
|
ID: peer,
|
||||||
|
Kind: honeybee.OutboundEventDisconnected,
|
||||||
|
At: time.Now(),
|
||||||
|
}
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case entry := <-out:
|
||||||
|
_, ok := entry.(PeerDisconnectedJournal)
|
||||||
|
return ok
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected PeerDisconnectedJournal")
|
||||||
|
|
||||||
|
// removed
|
||||||
|
e.Dismiss(peer)
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case entry := <-out:
|
||||||
|
_, ok := entry.(PeerRemovedJournal)
|
||||||
|
return ok
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected PeerRemovedJournal")
|
||||||
|
|
||||||
|
// close embassy: closes journal channel
|
||||||
|
e.Close()
|
||||||
|
|
||||||
|
// Ensure jc can close now that embassy has closed its journal channel
|
||||||
|
jcClosed := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
jc.Close()
|
||||||
|
close(jcClosed)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-jcClosed:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "JournalCollector.Close() should return after Embassy.Close()")
|
||||||
|
}
|
||||||
|
|||||||
+2
-2
@@ -26,7 +26,7 @@ type JournalCollector struct {
|
|||||||
type JournalEntry interface {
|
type JournalEntry interface {
|
||||||
PeerID() string
|
PeerID() string
|
||||||
SealedAt() time.Time
|
SealedAt() time.Time
|
||||||
Author() component.Component
|
Component() component.Component
|
||||||
}
|
}
|
||||||
|
|
||||||
type entry struct {
|
type entry struct {
|
||||||
@@ -37,7 +37,7 @@ type entry struct {
|
|||||||
|
|
||||||
func (e *entry) PeerID() string { return e.peerID }
|
func (e *entry) PeerID() string { return e.peerID }
|
||||||
func (e *entry) SealedAt() time.Time { return e.sealedAt }
|
func (e *entry) SealedAt() time.Time { return e.sealedAt }
|
||||||
func (e *entry) Author() component.Component { return e.component }
|
func (e *entry) Component() component.Component { return e.component }
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
// Journal Collector
|
// Journal Collector
|
||||||
|
|||||||
+2
-2
@@ -158,8 +158,8 @@ func TestJournalCollector_ComponentIdentity(t *testing.T) {
|
|||||||
|
|
||||||
typed, ok := received.(*testJournalEntry)
|
typed, ok := received.(*testJournalEntry)
|
||||||
assert.True(t, ok, "should be correct concrete type")
|
assert.True(t, ok, "should be correct concrete type")
|
||||||
assert.Equal(t, mod, typed.Author().Module())
|
assert.Equal(t, mod, typed.Component().Module())
|
||||||
assert.Equal(t, path, typed.Author().PathString())
|
assert.Equal(t, path, typed.Component().PathString())
|
||||||
|
|
||||||
jc.Close()
|
jc.Close()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user