Pyspark объединяет фрейм данных с разделенными запятыми значениями в столбце - PullRequest
0 голосов
/ 13 января 2020

Итак, у меня есть два фрейма данных, к которым я хочу присоединиться. Уловка - вторая таблица, в которой хранятся разделенные запятыми значения, из которых одно совпадает со столбцом в Таблице А. Как мне это сделать в Pyspark. Ниже приведен пример

Таблица A имеет

+-------+--------------------+
|deal_id|           deal_name|
+-------+--------------------+
| 613760|ABCDEFGHI           |
| 613740|TEST123             |
| 598946|OMG                 |   

Таблица B содержит

+-------+---------------------------+--------------------+
|                            deal_id|           deal_type|                           
+-------+---------------------------+--------------------+
| 613760,613761,613762,613763       |Direct De           |
| 613740,613750,613770,613780,613790|Direct              |
| 598946                            |In                  |  

Ожидаемый результат - Соедините таблицы A и Таблица B, если есть совпадение с таблицей Идентификатор сделки A против значения, разделенного запятой в таблице B. Например, TableA.dealid - 613760 находится в 1-й строке таблицы B, я хочу, чтобы эта строка возвращалась.

+-------+--------------------+---------------+
|deal_id|           deal_name|      deal_type|
+-------+--------------------+---------------+
| 613760|ABCDEFGHI           |Direct De      |     
| 613740|TEST123             |Direct         |
| 598946|OMG                 |In             |

Любая помощь приветствуется. Мне нужно в pyspark.

Спасибо.

1 Ответ

1 голос
/ 15 января 2020

Пример данных

from pyspark.sql.types import IntegerType, LongType, StringType, StructField, StructType

tuples_a = [('613760', 'ABCDEFGHI'),
            ('613740', 'TEST123'),
            ('598946', 'OMG'),
           ]

schema_a = StructType([
         StructField('deal_id', StringType(), nullable=False),
         StructField('deal_name', StringType(), nullable=False)
        ])


tuples_b = [('613760,613761,613762,613763 ', 'Direct De'),
            ('613740,613750,613770,613780,613790', 'Direct'),
            ('598946', 'In'),
           ]

schema_b = StructType([
         StructField('deal_id', StringType(), nullable=False),
         StructField('deal_type', StringType(), nullable=False)
        ])        

df_a = spark_session.createDataFrame(data=tuples_a, schema=schema_a)
df_b = spark_session.createDataFrame(data=tuples_b, schema=schema_b) 

Вам нужно разбить столбец и разбить его, чтобы присоединиться.

from pyspark.sql.functions import split, col, explode

df_b = df_b.withColumn('split', split(col('deal_id'), ','))\
           .withColumn('exploded', explode(col('split')))\
           .drop('deal_id', 'split')\
           .withColumnRenamed('exploded', 'deal_id')


df_a.join(df_b, on = 'deal_id', how = 'left_outer')\
    .show(10, False)

и ожидаемый результат

+-------+---------+---------+
|deal_id|deal_name|deal_type|
+-------+---------+---------+
|613760 |ABCDEFGHI|Direct De|
|613740 |TEST123  |Direct   |
|598946 |OMG      |In       |
+-------+---------+---------+
...