Иерархия поиска Spark graphframe - PullRequest
0 голосов
/ 18 июня 2020

Я пытаюсь сделать довольно простой вариант использования. У меня есть два фрейма данных -

>>> g.vertices.show(20,False)
+------------------------+
|id                      |
+------------------------+
|Router_UPDATE_INSERT    |
|Seq_Unique_Key          |
|Target_New_Insert       |
|Target_Existing_Update  |
|Target_Existing_Insert  |
|SAMPLE_CUSTOMER         |
|SAMPLE_CUSTOMER_MASTER  |
|Sorter_SAMPLE_CUSTOMER  |
|Sorter_CUSTOMER_MASTER  |
|Join_Source_Target      |
|Exp_DetectChanges       |
|Filter_Unchanged_Records|

Детали ребер -

>>> g.edges.show(20,False)
+------------------------+----------+------------------------+----------+
|src                     |From Type |dst                     |To Type   |
+------------------------+----------+------------------------+----------+
|Sorter_SAMPLE_CUSTOMER  |sorter    |Join_Source_Target      |joiner    |
|Sorter_CUSTOMER_MASTER  |sorter    |Join_Source_Target      |joiner    |
|Join_Source_Target      |joiner    |Exp_DetectChanges       |expression|
|SAMPLE_CUSTOMER         |source    |Sorter_SAMPLE_CUSTOMER  |sorter    |
|Router_UPDATE_INSERT    |router    |Target_Existing_Update  |target    |
|Seq_Unique_Key          |sequencetx|Target_Existing_Insert  |target    |
|Filter_Unchanged_Records|filter    |Router_UPDATE_INSERT    |router    |
|Exp_DetectChanges       |expression|Filter_Unchanged_Records|filter    |
|Seq_Unique_Key          |sequencetx|Target_New_Insert       |target    |
|Router_UPDATE_INSERT    |router    |Seq_Unique_Key          |sequencetx|
|SAMPLE_CUSTOMER_MASTER  |source    |Sorter_CUSTOMER_MASTER  |sorter    |
+------------------------+----------+------------------------+----------+

g = GraphFrame(vertices, edges)

Теперь я могу найти две разные линии -

>>> filteredPaths = g.bfs(
...   fromExpr = "id = 'SAMPLE_CUSTOMER_MASTER'",
...   toExpr = "id = 'Router_UPDATE_INSERT'",
...   edgeFilter = "src != 'joiner1'",
...   maxPathLength = 10)

Вторая линия -

>>> filteredPaths = g.bfs(
...   fromExpr = "id = 'SAMPLE_CUSTOMER'",
...   toExpr = "id = 'Router_UPDATE_INSERT'",
...   edgeFilter = "src != 'joiner1'",
...   maxPathLength = 10)

Два источника объединяются и разделяются позже, все, что мне нужно, это отдельные значения с сохранением заказов -

SAMPLE_CUSTOMER
Sorter_SAMPLE_CUSTOMER
SAMPLE_CUSTOMER_MASTER
Sorter_CUSTOMER_MASTER
Join_Source_Target
Exp_DetectChanges
Filter_Unchanged_Records
Router_UPDATE_INSERT
Seq_Unique_Key
Target_New_Insert
Target_Existing_Insert
Target_Existing_Update
...