From 9ff8ee0c9eee5ef6f76af88d987b4cf620541a26 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 22 May 2026 16:51:09 -0400 Subject: [PATCH] Phase 7: remove EventTraveller; introduce preparedWrite and excludedEvent --- write.go | 68 +++++++++++++++++++++++++++------------------------ write_test.go | 53 +++++++++++++++++---------------------- 2 files changed, 58 insertions(+), 63 deletions(-) diff --git a/write.go b/write.go index 74e5ea5..be18d3e 100644 --- a/write.go +++ b/write.go @@ -16,11 +16,14 @@ type WriteOptions struct { BoltReadBatchSize int } -type EventTraveller struct { - ID string +type preparedWrite struct { Event roots.ValidatedEvent Subgraph *EventSubgraph - Error error +} + +type excludedEvent struct { + Event roots.ValidatedEvent + Reason error } type WriteResult struct { @@ -29,7 +32,7 @@ type WriteResult struct { } type WriteReport struct { - ExcludedEvents []EventTraveller + ExcludedEvents []excludedEvent CreatedEventCount int Neo4jResultSummaries []neo4j.ResultSummary Duration time.Duration @@ -56,12 +59,7 @@ func WriteEvents( return WriteReport{Error: fmt.Errorf("error setting up bolt db: %w", err)} } - travellers := make([]EventTraveller, 0, len(events)) - for _, e := range events { - travellers = append(travellers, EventTraveller{ID: e.ID(), Event: e}) - } - - queued, excluded := enforcePolicyRules(travellers, boltdb, opts.BoltReadBatchSize) + queued, excluded := enforcePolicyRules(events, boltdb, opts.BoltReadBatchSize) converted := convertEventsToSubgraphs(queued, opts.Expanders) writeResult := writeEventsToDatabases(driver, boltdb, converted) @@ -83,7 +81,7 @@ func setDefaultWriteOptions(opts *WriteOptions) { } } -func enforcePolicyRules(in []EventTraveller, boltdb *bolt.DB, batchSize int) (queued []EventTraveller, excluded []EventTraveller) { +func enforcePolicyRules(in []roots.ValidatedEvent, boltdb *bolt.DB, batchSize int) (queued []roots.ValidatedEvent, excluded []excludedEvent) { for i := 0; i < len(in); i += batchSize { end := i + batchSize if end > len(in) { @@ -92,32 +90,38 @@ func enforcePolicyRules(in []EventTraveller, boltdb *bolt.DB, batchSize int) (qu batch := in[i:end] eventIDs := make([]string, 0, len(batch)) - for _, traveller := range batch { - eventIDs = append(eventIDs, traveller.ID) + for _, e := range batch { + eventIDs = append(eventIDs, e.ID()) } existsMap := BatchCheckEventsExist(boltdb, eventIDs) - for _, traveller := range batch { - if existsMap[traveller.ID] { - traveller.Error = fmt.Errorf("skipped: %w", ErrDuplicate) - excluded = append(excluded, traveller) + for _, e := range batch { + if existsMap[e.ID()] { + excluded = append(excluded, excludedEvent{ + Event: e, + Reason: fmt.Errorf("skipped: %w", ErrDuplicate), + }) } else { - queued = append(queued, traveller) + queued = append(queued, e) } } } return queued, excluded } -func convertEventsToSubgraphs(in []EventTraveller, expanders ExpanderPipeline) []EventTraveller { - for i, traveller := range in { - in[i].Subgraph = EventToSubgraph(traveller.Event, expanders) +func convertEventsToSubgraphs(in []roots.ValidatedEvent, expanders ExpanderPipeline) []preparedWrite { + out := make([]preparedWrite, 0, len(in)) + for _, e := range in { + out = append(out, preparedWrite{ + Event: e, + Subgraph: EventToSubgraph(e, expanders), + }) } - return in + return out } -func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []EventTraveller) WriteResult { +func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []preparedWrite) WriteResult { boltErr := writeEventsToBoltDB(boltdb, travellers) if boltErr != nil { return WriteResult{ @@ -132,27 +136,27 @@ func writeEventsToDatabases(driver neo4j.Driver, boltdb *bolt.DB, travellers []E } } -func writeEventsToBoltDB(boltdb *bolt.DB, travellers []EventTraveller) error { +func writeEventsToBoltDB(boltdb *bolt.DB, travellers []preparedWrite) error { var events []EventBlob - for _, traveller := range travellers { - j, err := json.Marshal(traveller.Event) + for _, pw := range travellers { + j, err := json.Marshal(pw.Event) if err != nil { - return fmt.Errorf("failed to serialize event %s: %w", traveller.ID, err) + return fmt.Errorf("failed to serialize event %s: %w", pw.Event.ID(), err) } - events = append(events, EventBlob{ID: []byte(traveller.ID), JSON: j}) + events = append(events, EventBlob{ID: []byte(pw.Event.ID()), JSON: j}) } return BatchWriteEvents(boltdb, events) } -func writeEventsToGraphDB(driver neo4j.Driver, travellers []EventTraveller) ([]neo4j.ResultSummary, error) { +func writeEventsToGraphDB(driver neo4j.Driver, travellers []preparedWrite) ([]neo4j.ResultSummary, error) { matchKeys := NewSimpleMatchKeys() batch := NewBatchSubgraph(matchKeys) - for _, traveller := range travellers { - for _, node := range traveller.Subgraph.Nodes() { + for _, pw := range travellers { + for _, node := range pw.Subgraph.Nodes() { batch.AddNode(node) } - for _, rel := range traveller.Subgraph.Rels() { + for _, rel := range pw.Subgraph.Rels() { batch.AddRel(rel) } } diff --git a/write_test.go b/write_test.go index 2685aca..4b0735d 100644 --- a/write_test.go +++ b/write_test.go @@ -1,6 +1,7 @@ package heartwood import ( + roots "git.wisehodl.dev/jay/go-roots/events" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "testing" @@ -13,57 +14,47 @@ func TestEnforcePolicyRules(t *testing.T) { require.NoError(t, SetupBoltDB(db)) fx := LoadFixtures(t) - // Pre-write bare and generic_tag as existing events - bareJSON, _ := fx.ValidatedEvent(t, "bare").MarshalJSON() - genericJSON, _ := fx.ValidatedEvent(t, "generic_tag").MarshalJSON() - bareID := fx.ValidatedEvent(t, "bare").ID() - genericID := fx.ValidatedEvent(t, "generic_tag").ID() + bareEvent := fx.ValidatedEvent(t, "bare") + genericEvent := fx.ValidatedEvent(t, "generic_tag") + e_tag_event := fx.ValidatedEvent(t, "e_tag_valid") + p_tag_event := fx.ValidatedEvent(t, "p_tag_valid") + // Pre-write bare and generic_tag as existing events + bareJSON, _ := bareEvent.MarshalJSON() + genericJSON, _ := genericEvent.MarshalJSON() err := BatchWriteEvents(db, []EventBlob{ - {ID: []byte(bareID), JSON: bareJSON}, - {ID: []byte(genericID), JSON: genericJSON}, + {ID: []byte(bareEvent.ID()), JSON: bareJSON}, + {ID: []byte(genericEvent.ID()), JSON: genericJSON}, }) assert.NoError(t, err) - e_tag_id := fx.ValidatedEvent(t, "e_tag_valid").ID() - p_tag_id := fx.ValidatedEvent(t, "p_tag_valid").ID() - cases := []struct { name string - input []EventTraveller + input []roots.ValidatedEvent wantQueued int wantExcluded int }{ { name: "empty input", - input: []EventTraveller{}, + input: []roots.ValidatedEvent{}, wantQueued: 0, wantExcluded: 0, }, { - name: "no duplicates", - input: []EventTraveller{ - {ID: e_tag_id}, - {ID: p_tag_id}, - }, + name: "no duplicates", + input: []roots.ValidatedEvent{e_tag_event, p_tag_event}, wantQueued: 2, wantExcluded: 0, }, { - name: "some duplicates", - input: []EventTraveller{ - {ID: bareID}, - {ID: e_tag_id}, - }, + name: "some duplicates", + input: []roots.ValidatedEvent{bareEvent, e_tag_event}, wantQueued: 1, wantExcluded: 1, }, { - name: "all duplicates", - input: []EventTraveller{ - {ID: bareID}, - {ID: genericID}, - }, + name: "all duplicates", + input: []roots.ValidatedEvent{bareEvent, genericEvent}, wantQueued: 0, wantExcluded: 2, }, @@ -76,7 +67,7 @@ func TestEnforcePolicyRules(t *testing.T) { assert.Equal(t, tc.wantQueued, len(queued)) assert.Equal(t, tc.wantExcluded, len(excluded)) for _, ex := range excluded { - assert.ErrorIs(t, ex.Error, ErrDuplicate) + assert.ErrorIs(t, ex.Reason, ErrDuplicate) } }) } @@ -87,13 +78,13 @@ func TestConvertEventsToSubgraphs(t *testing.T) { cases := []struct { name string - traveller EventTraveller + event roots.ValidatedEvent wantNodeCount int wantRelCount int }{ { name: "event with no tags", - traveller: EventTraveller{Event: fx.ValidatedEvent(t, "bare")}, + event: fx.ValidatedEvent(t, "bare"), wantNodeCount: 2, // event + user wantRelCount: 1, // signed }, @@ -102,7 +93,7 @@ func TestConvertEventsToSubgraphs(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { expanders := NewExpanderPipeline(DefaultExpanders()...) - results := convertEventsToSubgraphs([]EventTraveller{tc.traveller}, expanders) + results := convertEventsToSubgraphs([]roots.ValidatedEvent{tc.event}, expanders) assert.Len(t, results, 1) assert.NotNil(t, results[0].Subgraph)