Пы искра присоединиться к колонне трубопровода - PullRequest
0 голосов
/ 03 апреля 2020

У меня есть два фрейма данных, к которым я хочу присоединиться. Улов - одна из таблиц, у которых есть разделенная конвейером строка, к которой я хочу присоединить одно из значений. Как мне это в Писпарке. Ниже приведен пример ТАБЛИЦЫ A с

+-------+--------------------+
|id     |      name          |
+-------+--------------------+
| 613760|123|test|test2      |
| 613740|456|ABC             |
| 598946|OMG|567             | 

ТАБЛИЦА B с

+-------+--------------------+
|join_id|           prod_type|                           
+-------+--------------------+
| 123   |Direct De           |
| 456   |Direct              |
| 567   |In                  | 

Ожидаемый результат - Соедините таблицы A и Таблицу B, если есть совпадение с идентификатором, разделенным конвейером таблицы A, и таблицей Ценность Б Например, TableA.id - 613760, имя имеет 123 | test, и я хочу присоединиться к идентификатору соединения таблицы B 123, аналогично 456 и 567.

Результирующая таблица

+--------------------+-------+
|      name          |join_Id|
+-------+------------+-------+
|123|test|test2      |123    |
|456|ABC             |456    |
|OMG|567             |567    |

Может ли кто-нибудь помочь мне решить это. Я относительно новичок в pyspark, и я учусь

1 Ответ

0 голосов
/ 03 апреля 2020

Чтобы решить вашу проблему, вам необходимо:

См. Код ниже:

import pyspark.sql.functions as f

#First create the dataframes to test solution
table_A = spark.createDataFrame([(613760, '123|test|test2' ), (613740, '456|ABC'), (598946, 'OMG|567' )], ["id", "name"])
# +-------+--------------------+
# |id     |      name          |
# +-------+--------------------+
# | 613760|123|test|test2      |
# | 613740|456|ABC             |
# | 598946|OMG|567             | 

table_B = spark.createDataFrame([('123', 'Direct De' ), ('456', 'Direct'), ('567', 'In' )], ["join_id", "prod_type"])
# +-------+--------------------+
# |join_id|           prod_type|                           
# +-------+--------------------+
# | 123   |Direct De           |
# | 456   |Direct              |
# | 567   |In                  | 

result = table_A \
    .select(
        'name',
        f.posexplode(f.split(f.col('name'),'\|')).alias('pos', 'join_id')) \
    .join(table_B, on='join_id', how='inner') \
    .select('name', 'join_id')

result.show(10, False)
# +--------------+-------+
# |name          |join_id|
# +--------------+-------+
# |123|test|test2|123    |
# |456|ABC       |456    |
# |OMG|567       |567    |
# +--------------+-------+

Надеюсь, что это работает. Как вы продолжаете поправляться в Pyspark. Я бы порекомендовал вам go через функции в pyspark.sql.functions, и это подняло бы ваши навыки на следующий уровень.

...