PySpark - Использование списков внутри оператора LIKE - PullRequest
1 голос
/ 24 апреля 2019

Я хотел бы использовать список внутри оператора LIKE в pyspark для создания столбца.

У меня есть следующий ввод df:

input_df :

+------+--------------------+-------+
|    ID|           customers|country|
+------+--------------------+-------+
|161   |xyz Limited         |U.K.   |
|262   |ABC  Limited        |U.K.   |
|165   |Sons & Sons         |U.K.   |
|361   |TÜV GmbH            |Germany|
|462   |Mueller GmbH        |Germany|
|369   |Schneider AG        |Germany|
|467   |Sahm UG             |Austria|
+------+--------------------+-------+

Я хотел бы добавить столбец CAT_ID.CAT_ID принимает значение 1, если «ID» содержит «16» или «26».CAT_ID принимает значение 2, если «ID» содержит «36» или «46».
Итак, я хочу, чтобы мой вывод df выглядел так -

The desired output_df :

+------+--------------------+-------+-------+
|    ID|           customers|country|Cat_ID |
+------+--------------------+-------+-------+
|161   |xyz Limited         |U.K.   |1      |
|262   |ABC  Limited        |U.K.   |1      |
|165   |Sons & Sons         |U.K.   |1      |
|361   |TÜV GmbH            |Germany|2      |
|462   |Mueller GmbH        |Germany|2      |
|369   |Schneider AG        |Germany|2      |
|467   |Sahm UG             |Austria|2      |
+------+--------------------+-------+-------+

Мне интересно узнать, как это можно сделать с помощью оператора LIKE и списков.

Я знаю, как реализовать его без списка, который прекрасно работает:

from pyspark.sql import functions as F

def add_CAT_ID(df):
    return df.withColumn(
        'CAT_ID', 
        F.when( ( (F.col('ID').like('16%')) | (F.col('ID').like('26%'))  ) , "1") \
         .when( ( (F.col('ID').like('36%')) | (F.col('ID').like('46%'))  ) , "2") \
         .otherwise('999')
    )


    output_df = add_CAT_ID(input_df)

Тем не менее, я хотел бы использовать список и иметь что-то вроде:

list1 =['16', '26']
list2 =['36', '46']


def add_CAT_ID(df):
    return df.withColumn(
        'CAT_ID', 
        F.when( ( (F.col('ID').like(list1 %))  ) , "1") \
         .when( ( (F.col('ID').like('list2 %'))  ) , "2") \
         .otherwise('999')
    )


    output_df = add_CAT_ID(input_df)

Заранее большое спасибо,

Ответы [ 2 ]

0 голосов
/ 25 апреля 2019

С Spark 2.4 и выше вы можете использовать функции более высокого порядка в spark-sql.

Попробуйте следующее, решение sql одинаково для обоих scala / python

val df = Seq(
  ("161","xyz Limited","U.K."),
  ("262","ABC  Limited","U.K."),
  ("165","Sons & Sons","U.K."),
  ("361","TÜV GmbH","Germany"),
  ("462","Mueller GmbH","Germany"),
  ("369","Schneider AG","Germany"),
  ("467","Sahm UG","Germany")
).toDF("ID","customers","country")

df.show(false)
df.createOrReplaceTempView("secil")
spark.sql(
  """ with t1 ( select id, customers, country, array('16','26') as a1, array('36','46') as a2 from secil),
     t2 (select id, customers, country,  filter(a1, x -> id like x||'%') a1f,  filter(a2, x -> id like x||'%') a2f from t1),
     t3 (select id, customers, country, a1f, a2f,
               case when size(a1f) > 0 then 1 else 0 end a1r,
               case when size(a2f) > 0 then 2 else 0 end a2r
               from t2)
     select id, customers, country, a1f, a2f, a1r, a2r, a1r+a2r as Cat_ID from t3
  """).show(false)

Результаты:

+---+------------+-------+
|ID |customers   |country|
+---+------------+-------+
|161|xyz Limited |U.K.   |
|262|ABC  Limited|U.K.   |
|165|Sons & Sons |U.K.   |
|361|TÜV GmbH    |Germany|
|462|Mueller GmbH|Germany|
|369|Schneider AG|Germany|
|467|Sahm UG     |Germany|
+---+------------+-------+

+---+------------+-------+----+----+---+---+------+
|id |customers   |country|a1f |a2f |a1r|a2r|Cat_ID|
+---+------------+-------+----+----+---+---+------+
|161|xyz Limited |U.K.   |[16]|[]  |1  |0  |1     |
|262|ABC  Limited|U.K.   |[26]|[]  |1  |0  |1     |
|165|Sons & Sons |U.K.   |[16]|[]  |1  |0  |1     |
|361|TÜV GmbH    |Germany|[]  |[36]|0  |2  |2     |
|462|Mueller GmbH|Germany|[]  |[46]|0  |2  |2     |
|369|Schneider AG|Germany|[]  |[36]|0  |2  |2     |
|467|Sahm UG     |Germany|[]  |[46]|0  |2  |2     |
+---+------------+-------+----+----+---+---+------+
0 голосов
/ 24 апреля 2019

Подстановочные знаки SQL не поддерживают предложения "или".Хотя есть несколько способов справиться с этим.

1.Регулярные выражения

Вы можете использовать rlike с регулярным выражением:

import pyspark.sql.functions as psf

list1 =['16', '26'] 
list2 =['36', '46']
df.withColumn(
        'CAT_ID', 
        psf.when(psf.col('ID').rlike('({})\d'.format('|'.join(list1))), '1') \
            .when(psf.col('ID').rlike('({})\d'.format('|'.join(list2))), '2') \
            .otherwise('999')) \
    .show()

        +---+------------+-------+------+
        | ID|   customers|country|CAT_ID|
        +---+------------+-------+------+
        |161| xyz Limited|   U.K.|     1|
        |262|ABC  Limited|   U.K.|     1|
        |165| Sons & Sons|   U.K.|     1|
        |361|    TÜV GmbH|Germany|     2|
        |462|Mueller GmbH|Germany|     2|
        |369|Schneider AG|Germany|     2|
        |467|     Sahm UG|Austria|     2|
        +---+------------+-------+------+

Здесь мы получаем для list1 регулярное выражение (16|26)\d, соответствующее 16 или 26 с последующимцелым числом (\d эквивалентно [0-9]).

2.Динамическое построение предложения SQL

Если вы хотите сохранить sql как, вы можете использовать selectExpr и объединить значения с помощью ' OR ':

df.selectExpr(
        '*', 
        "CASE WHEN ({}) THEN '1' WHEN ({}) THEN '2' ELSE '999' END AS CAT_ID"
            .format(*[' OR '.join(["ID LIKE '{}%'".format(x) for x in l]) for l in [list1, list2]]))

3.Динамическое построение выражения Python

Вы также можете использовать eval, если не хотите писать SQL:

df.withColumn(
        'CAT_ID', 
        psf.when(eval(" | ".join(["psf.col('ID').like('{}%')".format(x) for x in list1])), '1')
            .when(eval(" | ".join(["psf.col('ID').like('{}%')".format(x) for x in list2])), '2')
            .otherwise('999'))
...