Хорошо, вот вы
val jdata = """[
{"id": 1, "group": "finance", "person": "scott", "item": "fan", "count": 100},
{"id": 2, "group": "hr", "person": "taylor", "item": "light", "count": 200},
{"id": 3, "group": "finance", "person": "troy", "item": "table", "count": 300},
{"id": 4, "group": "legal", "person": "willian", "item": "chair", "count": 400}
]"""
val data = spark.read.json(Seq(jdata).toDS)
data.registerTempTable("data")
val jtrans = """[
{"field": "person", "source": "scott", "targets": "taylor,willian"},
{"field": "item", "source": "light", "targets": "table,chair"},
{"field": "group", "source": "hr", "targets": "legal"}
]"""
val trans = spark.read.json(Seq(jtrans).toDS)
trans.registerTempTable("trans")
Давайте проверим
scala> spark.sql("SELECT * FROM data").show
+-----+-------+---+-----+-------+
|count| group| id| item| person|
+-----+-------+---+-----+-------+
| 100|finance| 1| fan| scott|
| 200| hr| 2|light| taylor|
| 300|finance| 3|table| troy|
| 400| legal| 4|chair|willian|
+-----+-------+---+-----+-------+
scala> spark.sql("SELECT * FROM trans").show
+------+------+--------------+
| field|source| targets|
+------+------+--------------+
|person| scott|taylor,willian|
| item| light| table,chair|
| group| hr| legal|
+------+------+--------------+
И SQL
spark.sql("""
WITH
datac AS (
SELECT CONCAT('item_', item) itemc, CONCAT('person_', person) personc, count
FROM data
),
trans_person AS (
SELECT count, personc, itemc, source as source_person, targets as targets_person
FROM datac LEFT JOIN trans ON CONCAT(field, '_', source) = personc
),
trans_item AS (
SELECT count, personc, itemc, source_person, targets_person, source as source_item, targets as targets_item
FROM trans_person LEFT JOIN trans t2 ON CONCAT(t2.field, '_', t2.source) = itemc
),
trans_concat AS (
SELECT CASE WHEN source_person IS NOT NULL THEN 'person' WHEN source_item IS NOT NULL THEN 'item' END AS field,
CONCAT(COALESCE(source_person, ''), COALESCE(source_item, '')) as source,
CONCAT(COALESCE(targets_person, ''), COALESCE(targets_item, '')) AS targets,
count
FROM trans_item
),
trans_source AS (
SELECT field, source, count as cnt
FROM trans_concat
WHERE field IS NOT NULL
),
trans_target AS (
SELECT field, EXPLODE(SPLIT(targets, ',')) as target, count / SIZE(SPLIT(targets, ',')) as cnt
FROM trans_concat
WHERE field IS NOT NULL
)
SELECT count + COALESCE(t1.cnt, 0) + COALESCE(t2.cnt, 0) - COALESCE(t3.cnt, 0) - COALESCE(t4.cnt, 0) AS count,
group, id, item, person
FROM data
LEFT JOIN trans_target t1 ON CONCAT('person_', person) = CONCAT(t1.field, '_', t1.target)
LEFT JOIN trans_target t2 ON CONCAT('item_', item) = CONCAT(t2.field, '_', t2.target)
LEFT JOIN trans_source t3 ON CONCAT('person_', person) = CONCAT(t3.field, '_', t3.source)
LEFT JOIN trans_source t4 ON CONCAT('item_', item) = CONCAT(t4.field, '_', t4.source)
""").show()
Результат
+-----+-------+---+-----+-------+
|count| group| id| item| person|
+-----+-------+---+-----+-------+
| 50.0| hr| 2|light| taylor|
|550.0| legal| 4|chair|willian|
|400.0|finance| 3|table| troy|
| 0.0|finance| 1| fan| scott|
+-----+-------+---+-----+-------+
Для простоты я сделал это только для 'item' и 'person', но вы можете легко расширить его для 'id' и 'group'.
Также, если вы хотите применить только первое правило, которое вы Я должен использовать оконную функцию, которую я также пропустил для простоты.
Ответ проверен в оболочке spark Scala, но основной код - простой SQL, и он должен работать в pyspark.