diff --git a/write.go b/write.go index ca8e8ac..afebc73 100644 --- a/write.go +++ b/write.go @@ -16,7 +16,7 @@ type WriteOptions struct { BoltReadBatchSize int } -type EventFollower struct { +type EventTraveller struct { ID string JSON string Event roots.Event @@ -30,8 +30,8 @@ type WriteResult struct { } type WriteReport struct { - InvalidEvents []EventFollower - SkippedEvents []EventFollower + InvalidEvents []EventTraveller + SkippedEvents []EventTraveller CreatedEventCount int Neo4jResultSummaries []neo4j.ResultSummary Duration time.Duration @@ -58,42 +58,42 @@ func WriteEvents( var wg sync.WaitGroup - // Create Event Followers + // Create Event Travellers jsonChan := make(chan string) - eventChan := make(chan EventFollower) + eventChan := make(chan EventTraveller) wg.Add(1) - go createEventFollowers(&wg, jsonChan, eventChan) + go createEventTravellers(&wg, jsonChan, eventChan) // Parse Event JSON - parsedChan := make(chan EventFollower) - invalidChan := make(chan EventFollower) + parsedChan := make(chan EventTraveller) + invalidChan := make(chan EventTraveller) wg.Add(1) go parseEventJSON(&wg, eventChan, parsedChan, invalidChan) // Collect Invalid Events - collectedInvalidChan := make(chan []EventFollower) + collectedInvalidChan := make(chan []EventTraveller) wg.Add(1) - go collectEvents(&wg, invalidChan, collectedInvalidChan) + go collectTravellers(&wg, invalidChan, collectedInvalidChan) // Enforce Policy Rules - queuedChan := make(chan EventFollower) - skippedChan := make(chan EventFollower) + queuedChan := make(chan EventTraveller) + skippedChan := make(chan EventTraveller) wg.Add(1) go enforcePolicyRules(&wg, driver, boltdb, opts.BoltReadBatchSize, parsedChan, queuedChan, skippedChan) // Collect Skipped Events - collectedSkippedChan := make(chan []EventFollower) + collectedSkippedChan := make(chan []EventTraveller) wg.Add(1) - go collectEvents(&wg, skippedChan, collectedSkippedChan) + go collectTravellers(&wg, skippedChan, collectedSkippedChan) // Convert Events To Subgraphs - convertedChan := make(chan EventFollower) + convertedChan := make(chan EventTraveller) wg.Add(1) go convertEventsToSubgraphs(&wg, opts.Expanders, queuedChan, convertedChan) @@ -139,29 +139,29 @@ func setDefaultWriteOptions(opts *WriteOptions) { } } -func createEventFollowers(wg *sync.WaitGroup, jsonChan chan string, eventChan chan EventFollower) { +func createEventTravellers(wg *sync.WaitGroup, jsonChan chan string, eventChan chan EventTraveller) { defer wg.Done() for json := range jsonChan { - eventChan <- EventFollower{JSON: json} + eventChan <- EventTraveller{JSON: json} } close(eventChan) } -func parseEventJSON(wg *sync.WaitGroup, inChan, parsedChan, invalidChan chan EventFollower) { +func parseEventJSON(wg *sync.WaitGroup, inChan, parsedChan, invalidChan chan EventTraveller) { defer wg.Done() - for follower := range inChan { + for traveller := range inChan { var event roots.Event - jsonBytes := []byte(follower.JSON) + jsonBytes := []byte(traveller.JSON) err := json.Unmarshal(jsonBytes, &event) if err != nil { - follower.Error = err - invalidChan <- follower + traveller.Error = err + invalidChan <- traveller continue } - follower.ID = event.ID - follower.Event = event - parsedChan <- follower + traveller.ID = event.ID + traveller.Event = event + parsedChan <- traveller } close(parsedChan) @@ -172,17 +172,17 @@ func enforcePolicyRules( wg *sync.WaitGroup, driver neo4j.Driver, boltdb *bolt.DB, batchSize int, - inChan, queuedChan, skippedChan chan EventFollower, + inChan, queuedChan, skippedChan chan EventTraveller, ) { defer wg.Done() - var batch []EventFollower + var batch []EventTraveller - for follower := range inChan { - batch = append(batch, follower) + for traveller := range inChan { + batch = append(batch, traveller) if len(batch) >= batchSize { processPolicyRulesBatch(boltdb, batch, queuedChan, skippedChan) - batch = []EventFollower{} + batch = []EventTraveller{} } } @@ -196,35 +196,35 @@ func enforcePolicyRules( func processPolicyRulesBatch( boltdb *bolt.DB, - batch []EventFollower, - queuedChan, skippedChan chan EventFollower, + batch []EventTraveller, + queuedChan, skippedChan chan EventTraveller, ) { eventIDs := make([]string, 0, len(batch)) - for _, follower := range batch { - eventIDs = append(eventIDs, follower.ID) + for _, traveller := range batch { + eventIDs = append(eventIDs, traveller.ID) } existsMap := BatchCheckEventsExist(boltdb, eventIDs) - for _, follower := range batch { - if existsMap[follower.ID] { - skippedChan <- follower + for _, traveller := range batch { + if existsMap[traveller.ID] { + skippedChan <- traveller } else { - queuedChan <- follower + queuedChan <- traveller } } } func convertEventsToSubgraphs( wg *sync.WaitGroup, expanders ExpanderPipeline, - inChan, convertedChan chan EventFollower, + inChan, convertedChan chan EventTraveller, ) { defer wg.Done() - for follower := range inChan { - subgraph := EventToSubgraph(follower.Event, expanders) - follower.Subgraph = subgraph - convertedChan <- follower + for traveller := range inChan { + subgraph := EventToSubgraph(traveller.Event, expanders) + traveller.Subgraph = subgraph + convertedChan <- traveller } close(convertedChan) } @@ -232,14 +232,14 @@ func convertEventsToSubgraphs( func writeEventsToDatabases( wg *sync.WaitGroup, driver neo4j.Driver, boltdb *bolt.DB, - inChan chan EventFollower, + inChan chan EventTraveller, resultChan chan WriteResult, ) { defer wg.Done() var localWg sync.WaitGroup - boltEventChan := make(chan EventFollower) - graphEventChan := make(chan EventFollower) + boltEventChan := make(chan EventTraveller) + graphEventChan := make(chan EventTraveller) boltErrorChan := make(chan error) graphResultChan := make(chan WriteResult) @@ -249,9 +249,9 @@ func writeEventsToDatabases( go writeEventsToGraphDB(&localWg, driver, graphEventChan, boltErrorChan, graphResultChan) // Fan out events to both writers - for follower := range inChan { - boltEventChan <- follower - graphEventChan <- follower + for traveller := range inChan { + boltEventChan <- traveller + graphEventChan <- traveller } close(boltEventChan) close(graphEventChan) @@ -265,15 +265,15 @@ func writeEventsToDatabases( func writeEventsToBoltDB( wg *sync.WaitGroup, boltdb *bolt.DB, - inChan chan EventFollower, + inChan chan EventTraveller, errorChan chan error, ) { defer wg.Done() var events []EventBlob - for follower := range inChan { + for traveller := range inChan { events = append(events, - EventBlob{ID: follower.ID, JSON: follower.JSON}) + EventBlob{ID: traveller.ID, JSON: traveller.JSON}) } err := BatchWriteEvents(boltdb, events) @@ -285,7 +285,7 @@ func writeEventsToBoltDB( func writeEventsToGraphDB( wg *sync.WaitGroup, driver neo4j.Driver, - inChan chan EventFollower, + inChan chan EventTraveller, boltErrorChan chan error, resultChan chan WriteResult, ) { @@ -293,11 +293,11 @@ func writeEventsToGraphDB( matchKeys := NewSimpleMatchKeys() batch := NewBatchSubgraph(matchKeys) - for follower := range inChan { - for _, node := range follower.Subgraph.Nodes() { + for traveller := range inChan { + for _, node := range traveller.Subgraph.Nodes() { batch.AddNode(node) } - for _, rel := range follower.Subgraph.Rels() { + for _, rel := range traveller.Subgraph.Rels() { batch.AddRel(rel) } } @@ -320,11 +320,11 @@ func writeEventsToGraphDB( close(resultChan) } -func collectEvents(wg *sync.WaitGroup, inChan chan EventFollower, resultChan chan []EventFollower) { +func collectTravellers(wg *sync.WaitGroup, inChan chan EventTraveller, resultChan chan []EventTraveller) { defer wg.Done() - var collected []EventFollower - for follower := range inChan { - collected = append(collected, follower) + var collected []EventTraveller + for traveller := range inChan { + collected = append(collected, traveller) } resultChan <- collected close(resultChan) diff --git a/write_test.go b/write_test.go index 33b8efe..6dba39a 100644 --- a/write_test.go +++ b/write_test.go @@ -20,28 +20,28 @@ func invalidEventJSON() string { // Pipeline stage tests -func TestCreateEventFollowers(t *testing.T) { +func TestCreateEventTravellers(t *testing.T) { cases := []struct { name string input []string - expected []EventFollower + expected []EventTraveller }{ { name: "empty input", input: []string{}, - expected: []EventFollower{}, + expected: []EventTraveller{}, }, { name: "single json", input: []string{"test1"}, - expected: []EventFollower{ + expected: []EventTraveller{ {JSON: "test1"}, }, }, { name: "multiple jsons", input: []string{"test1", "test2", "test3"}, - expected: []EventFollower{ + expected: []EventTraveller{ {JSON: "test1"}, {JSON: "test2"}, {JSON: "test3"}, @@ -53,10 +53,10 @@ func TestCreateEventFollowers(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var wg sync.WaitGroup jsonChan := make(chan string) - eventChan := make(chan EventFollower) + eventChan := make(chan EventTraveller) wg.Add(1) - go createEventFollowers(&wg, jsonChan, eventChan) + go createEventTravellers(&wg, jsonChan, eventChan) go func() { for _, raw := range tc.input { @@ -65,9 +65,9 @@ func TestCreateEventFollowers(t *testing.T) { close(jsonChan) }() - var result []EventFollower - for follower := range eventChan { - result = append(result, follower) + var result []EventTraveller + for traveller := range eventChan { + result = append(result, traveller) } wg.Wait() @@ -84,7 +84,7 @@ func TestCreateEventFollowers(t *testing.T) { func TestParseEventJSON(t *testing.T) { cases := []struct { name string - input []EventFollower + input []EventTraveller wantParsed int wantInvalid int checkParsedID bool @@ -92,7 +92,7 @@ func TestParseEventJSON(t *testing.T) { }{ { name: "valid event", - input: []EventFollower{ + input: []EventTraveller{ {JSON: validEventJSON("abc123", "pubkey1")}, }, wantParsed: 1, @@ -102,7 +102,7 @@ func TestParseEventJSON(t *testing.T) { }, { name: "invalid json", - input: []EventFollower{ + input: []EventTraveller{ {JSON: invalidEventJSON()}, }, wantParsed: 0, @@ -110,7 +110,7 @@ func TestParseEventJSON(t *testing.T) { }, { name: "mixed batch", - input: []EventFollower{ + input: []EventTraveller{ {JSON: validEventJSON("abc123", "pubkey1")}, {JSON: invalidEventJSON()}, {JSON: validEventJSON("def456", "pubkey2")}, @@ -123,22 +123,22 @@ func TestParseEventJSON(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { var wg sync.WaitGroup - inChan := make(chan EventFollower) - parsedChan := make(chan EventFollower) - invalidChan := make(chan EventFollower) + inChan := make(chan EventTraveller) + parsedChan := make(chan EventTraveller) + invalidChan := make(chan EventTraveller) wg.Add(1) go parseEventJSON(&wg, inChan, parsedChan, invalidChan) go func() { - for _, follower := range tc.input { - inChan <- follower + for _, traveller := range tc.input { + inChan <- traveller } close(inChan) }() - var parsed []EventFollower - var invalid []EventFollower + var parsed []EventTraveller + var invalid []EventTraveller var collectWg sync.WaitGroup collectWg.Add(2) @@ -204,8 +204,8 @@ func TestConvertEventsToSubgraphs(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { var wg sync.WaitGroup - inChan := make(chan EventFollower) - convertedChan := make(chan EventFollower) + inChan := make(chan EventTraveller) + convertedChan := make(chan EventTraveller) expanders := NewExpanderPipeline(DefaultExpanders()...) @@ -213,11 +213,11 @@ func TestConvertEventsToSubgraphs(t *testing.T) { go convertEventsToSubgraphs(&wg, expanders, inChan, convertedChan) go func() { - inChan <- EventFollower{Event: tc.event} + inChan <- EventTraveller{Event: tc.event} close(inChan) }() - var result EventFollower + var result EventTraveller for f := range convertedChan { result = f } @@ -236,17 +236,17 @@ func TestConvertEventsToSubgraphs(t *testing.T) { func TestCollectEvents(t *testing.T) { cases := []struct { name string - input []EventFollower + input []EventTraveller expected int }{ { name: "empty channel", - input: []EventFollower{}, + input: []EventTraveller{}, expected: 0, }, { - name: "multiple followers", - input: []EventFollower{ + name: "multiple travellers", + input: []EventTraveller{ {ID: "id1"}, {ID: "id2"}, {ID: "id3"}, @@ -258,11 +258,11 @@ func TestCollectEvents(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { var wg sync.WaitGroup - inChan := make(chan EventFollower) - resultChan := make(chan []EventFollower) + inChan := make(chan EventTraveller) + resultChan := make(chan []EventTraveller) wg.Add(1) - go collectEvents(&wg, inChan, resultChan) + go collectTravellers(&wg, inChan, resultChan) go func() { for _, f := range tc.input {