Spark SQL присоединение в зависимости от приоритета - PullRequest
0 голосов
/ 15 февраля 2020

Я застрял в проблеме, когда мне нужно создать результирующий набор данных, соединив исходный набор данных с набором правил преобразования. Теперь для данной сущности может совпадать несколько правил, но для этого совпадения необходимо выбрать только первое совпадающее правило. Вот пример моего исходного набора данных

Id,Group,Person,Item,Count
1000000,Finance,Scott,Fan,100
1000000,HR,Taylor,Light,200
1000000,Finance,Troy,Table,100
1000000,Legal,Willian,Chair,100

Другой набор данных для объединения имеет ряд преобразований, которые выглядят следующим образом:

Field,Source,Targets
Person,Scott,[Taylor, William]
Item,Light,[Table, Chair]
GroupHR,Finance;[Legal]

Ниже приведен порядок обработки, для которого Я застрял в создании запроса SQL Join

Row1 -> Matches tansform1, Scott -100, Taylor +50, William +50
Row2 -> Matches tansform2,tansform3. Pick tansform2 only, Light -200, Table +100, Chair +100
Row3 -> Doesn't match any policy.
Row4 -> Doesn't match any policy.

Любые идеи о том, как этого можно достичь в Spark SQL? Как основной разработчик C# я мог бы сделать это с помощью foreach в строке, но разве это идеальный способ сделать это вычисление?

1 Ответ

1 голос
/ 19 февраля 2020

Хорошо, вот вы

val jdata = """[
{"id": 1, "group": "finance", "person": "scott", "item": "fan", "count": 100},
{"id": 2, "group": "hr", "person": "taylor", "item": "light", "count": 200},
{"id": 3, "group": "finance", "person": "troy", "item": "table", "count": 300},
{"id": 4, "group": "legal", "person": "willian", "item": "chair", "count": 400}
]"""

val data = spark.read.json(Seq(jdata).toDS)
data.registerTempTable("data")

val jtrans = """[
{"field": "person", "source": "scott", "targets": "taylor,willian"},
{"field": "item", "source": "light", "targets": "table,chair"},
{"field": "group", "source": "hr", "targets": "legal"}
]"""

val trans = spark.read.json(Seq(jtrans).toDS)
trans.registerTempTable("trans")

Давайте проверим

scala> spark.sql("SELECT * FROM data").show
+-----+-------+---+-----+-------+
|count|  group| id| item| person|
+-----+-------+---+-----+-------+
|  100|finance|  1|  fan|  scott|
|  200|     hr|  2|light| taylor|
|  300|finance|  3|table|   troy|
|  400|  legal|  4|chair|willian|
+-----+-------+---+-----+-------+


scala> spark.sql("SELECT * FROM trans").show
+------+------+--------------+
| field|source|       targets|
+------+------+--------------+
|person| scott|taylor,willian|
|  item| light|   table,chair|
| group|    hr|         legal|
+------+------+--------------+

И SQL

spark.sql("""
WITH
datac AS (
SELECT CONCAT('item_', item) itemc, CONCAT('person_', person) personc, count
FROM data
),
trans_person AS (
SELECT count, personc, itemc, source as source_person, targets as targets_person 
FROM datac LEFT JOIN trans ON CONCAT(field, '_', source) = personc
),
trans_item AS (
SELECT count, personc, itemc, source_person, targets_person, source as source_item, targets as targets_item 
FROM trans_person LEFT JOIN trans t2 ON CONCAT(t2.field, '_', t2.source) = itemc
),
trans_concat AS (
SELECT CASE WHEN source_person IS NOT NULL THEN 'person' WHEN source_item IS NOT NULL THEN 'item' END AS field,
CONCAT(COALESCE(source_person, ''), COALESCE(source_item, '')) as source,  
CONCAT(COALESCE(targets_person, ''), COALESCE(targets_item, '')) AS targets,
count
FROM trans_item
),
trans_source AS (
SELECT field, source, count as cnt
FROM trans_concat
WHERE field IS NOT NULL
),
trans_target AS (
SELECT field, EXPLODE(SPLIT(targets, ',')) as target, count / SIZE(SPLIT(targets, ',')) as cnt
FROM trans_concat
WHERE field IS NOT NULL
)
SELECT count + COALESCE(t1.cnt, 0) + COALESCE(t2.cnt, 0) - COALESCE(t3.cnt, 0) - COALESCE(t4.cnt, 0) AS count, 
group, id, item, person
FROM data 
LEFT JOIN trans_target t1 ON  CONCAT('person_', person) = CONCAT(t1.field, '_', t1.target)
LEFT JOIN trans_target t2 ON  CONCAT('item_', item) = CONCAT(t2.field, '_', t2.target)
LEFT JOIN trans_source t3 ON  CONCAT('person_', person) = CONCAT(t3.field, '_', t3.source)
LEFT JOIN trans_source t4 ON  CONCAT('item_', item) = CONCAT(t4.field, '_', t4.source)
""").show()

Результат

+-----+-------+---+-----+-------+
|count|  group| id| item| person|
+-----+-------+---+-----+-------+
| 50.0|     hr|  2|light| taylor|
|550.0|  legal|  4|chair|willian|
|400.0|finance|  3|table|   troy|
|  0.0|finance|  1|  fan|  scott|
+-----+-------+---+-----+-------+

Для простоты я сделал это только для 'item' и 'person', но вы можете легко расширить его для 'id' и 'group'.

Также, если вы хотите применить только первое правило, которое вы Я должен использовать оконную функцию, которую я также пропустил для простоты.

Ответ проверен в оболочке spark Scala, но основной код - простой SQL, и он должен работать в pyspark.

...