PySpark сортировка отсортировано - PullRequest
0 голосов
/ 10 декабря 2018

Пожалуйста, помогите новичку.Обычная база данных для размещения заказа, все в одной таблице.

Анализ данных в Apache Spark с использованием Python.Хотите написать запрос, который бы обрабатывал все транзакции клиентов, отсортированных по электронной почте, которые заказывали продукт, который больше не выпускается, и имел бы заказы, которые еще не были отправлены.В основном с "& item_in_list (F.lit (" NotShipped "), ShippedStatus)" это не работает.

%python
import pyspark.sql.functions as F
from pyspark.sql.types import *

list_len = F.udf(lambda x: len(x), IntegerType())
item_in_list = F.udf(lambda x, y: x in y, BooleanType())
df = spark.sql("select * from orderdb")
df1 = df.select("email", "OrderedProduct","ShippedStatus").groupBy("email")
df1 = df1.agg(F.collect_set("OrderedProduct"))\
       .withColumnRenamed("collect_set(OrderedProduct)", "OrderedProduct")
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

df1 = df1.select("email")
df = df1.join(df, "email", "left_outer")
display(df)

Строка идентификатора null date DateTimestamp null OrderedProduct string null ShippedStatus boolean null

1 Ответ

0 голосов
/ 10 декабря 2018

Прежде всего, udf очень плохо работают в pyspark .Если вы хотите изменить типы, используйте что-то вроде этого:

from pyspark.sql.types import IntegerType

df = df.withColumn("column", df["column"].cast(IntegerType()))

При этом нам нужен воспроизводимый пример для остальных, но я думаю, что вы можете решить это с помощью предложения 'where'.

# Your code
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) & 
               item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct) 
        &item_in_list(F.lit("NotShipped"), ShippedStatus)

# My code
condition1 = F.col('OrderedProduct') > 1
condition2 = F.col('ShippedStatus') == F.lit('NotShipped')
condition3 = F.col('OrderedProduct') == F.lit('DiscontinuedProduct')

df1 = df1.where(condition 1 & condition2 & condition3)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...