Выполнение различных вычислений, обусловленных значением столбца в кадре данных искры - PullRequest
0 голосов
/ 21 ноября 2019

У меня есть фрейм данных pyspark с 2 столбцами, A и B. Мне нужно, чтобы строки B обрабатывались по-разному, в зависимости от значений столбца A. В простых пандах я мог бы сделать это:

import pandas as pd
funcDict = {}
funcDict['f1'] = (lambda x:x+1000)
funcDict['f2'] = (lambda x:x*x)
df = pd.DataFrame([['a',1],['b',2],['b',3],['a',4]], columns=['A','B'])
df['newCol'] = df.apply(lambda x: funcDict['f1'](x['B']) if x['A']=='a' else funcDict['f2']
(x['B']), axis=1)

Самый простой способ сделать это в (py) spark - это

Использовать файлы

  • считывает данные в фрейм данных
  • , секционируя их по столбцу A, и записывает в отдельные файлы (write.partitionBy)
  • , читает в каждом файле и затем обрабатывает их отдельно

или еще

использовать expr

  • прочитать данные в фрейм данных
  • записать громоздкий expr (изперспективность чтения / обслуживания), чтобы условно сделать что-то по-другому на основе значения столбца
  • , это не будет выглядеть нигде столь же "чистым", как приведенный выше код панд

Есть личто-нибудь еще, что является подходящим способом справиться с этим требованием? С точки зрения эффективности, я ожидаю, что первый подход будет чище, но будет иметь больше времени выполнения благодаря чтению раздела-записи, а второй подход не так хорош с точки зрения кода, и его сложнее расширять и поддерживать.

Более того, вы бы предпочли использовать что-то совершенно другое (например, очереди сообщений) вместо этого (несмотря на относительную разницу в задержке)?

РЕДАКТИРОВАТЬ 1

Исходя из моего ограниченного знания pyspark, решение, предложенное пользователем pissall (https://stackoverflow.com/users/8805315/pissall), работает, пока обработка не очень сложна. Если это произойдет, я не знаю, как это сделатьне прибегая к UDF, которые имеют свои недостатки. Рассмотрим простой пример ниже

# create a 2-column data frame
# where I wish to extract the city 
# in column B differently based on
# the type given in column A
# This requires taking a different 
# substring (prefix or suffix) from column B
df = sparkSession.createDataFrame([
  (1, "NewYork_NY"),
  (2, "FL_Miami"),
  (1, "LA_CA"),
  (1, "Chicago_IL"),
  (2,"PA_Kutztown")
], ["A", "B"])

# create UDFs to get left and right substrings
# I do not know how to avoid creating UDFs
# for this type of processing
getCityLeft = udf(lambda x:x[0:-3],StringType())
getCityRight = udf(lambda x:x[3:],StringType())

#apply UDFs
df = df.withColumn("city", F.when(F.col("A") == 1, getCityLeft(F.col("B"))) \
                            .otherwise(getCityRight(F.col("B"))))

Есть ли способ сделать это более простым способом, не прибегая к UDF? Если я использую expr, яможет сделать это, но, как я упоминал ранее, это не выглядит элегантно.

1 Ответ

0 голосов
/ 21 ноября 2019

Как насчет использования when?

import pyspark.sql.functions as F

df = df.withColumn("transformed_B", F.when(F.col("A") == "a", F.col("B") + 1000).otherwise(F.col("B") * F.col("B")))

РЕДАКТИРОВАТЬ после большей ясности в вопросе :

Вы можете использовать split в _ ивозьмите первую или вторую часть в зависимости от вашего состояния.

Это ожидаемый результат?

df.withColumn("city", F.when(F.col("A") == 1, F.split("B", "_")[0]).otherwise(F.split("B", "_")[1])).show()

+---+-----------+--------+
|  A|          B|    city|
+---+-----------+--------+
|  1| NewYork_NY| NewYork|
|  2|   FL_Miami|   Miami|
|  1|      LA_CA|      LA|
|  1| Chicago_IL| Chicago|
|  2|PA_Kutztown|Kutztown|
+---+-----------+--------+

Подход UDF:

def sub_string(ref_col, city_col):
    # ref_col is the reference column (A) and city_col is the string we want to sub (B)
    if ref_col == 1:
        return city_col[0:-3]
    return city_col[3:]

sub_str_udf = F.udf(sub_string, StringType())
df = df.withColumn("city", sub_str_udf(F.col("A"), F.col("B")))

Также, пожалуйста,посмотрите: удалите последние несколько символов в столбце данных PySpark

...