pyspark: dataframe выбирает строку по id в столбце другого dataframe - PullRequest
1 голос
/ 18 апреля 2019

Я хочу

  1. фильтр df1 по time_create==last_timestamp,
  2. фильтр df2 по выбранным store_product_id из df1

Здесь я использую только df1, например,

Выбор по time_create хорош:

df1[df1.time_create==last_timestamp].show()

enter image description here

Тем не менее, используйте выбранный store_product_id, фильтр исходного кадра данных df1 дал мне много строк.

df1[df1.store_product_id.isin(df1[df1.time_create==last_timestamp].store_product_id)].show()

enter image description here

Я также пытался собрать store_product_id список, который соответствует time_create==last_timestamp.

ids = df1[df1.time_create==last_timestamp].select('store_product_id').collect()
df1[df1.store_product_id.isin(ids)].show()

Но получил ошибку:

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [01e8f3c0-3ad5-4b69-b46d-f5feb3cadd5f]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
    at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
    at org.apache.spark.sql.functions$.lit(functions.scala:110)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Какой правильный путь?

Ответы [ 2 ]

1 голос
/ 18 апреля 2019

Функция, которую вы ищете: join. Вот простой пример, основанный на ваших данных:

import pyspark as sp
from pyspark.sql import SparkSession

samples = [{'store_product_id':1,'time_create':2,'last_timestamp':3},{'store_product_id':2,'time_create':2,'last_timestamp':2},{'store_product_id':3,'time_create':4,'last_timestamp':4},{'store_product_id':4,'time_create':2,'last_timestamp':5}]

spark = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

df1 = spark.createDataFrame(samples)
df1.show()

Это дает:

+--------------+----------------+-----------+
|last_timestamp|store_product_id|time_create|
+--------------+----------------+-----------+
|             3|               1|          2|
|             2|               2|          2|
|             4|               3|          4|
|             5|               4|          2|
+--------------+----------------+-----------+

Давайте отфильтруем по времени и создадим еще один кадр данных из этого:

df2 = df1.filter(df1.time_create==df1.last_timestamp)
ids = df2.select('store_product_id').show()

+----------------+
|store_product_id|
+----------------+
|               2|
|               3|
+----------------+

И здесь мы объединяем оба набора данных в store_product_id :

df3 = df1.join(df2,'store_product_id','inner').show()

+----------------+--------------+-----------+--------------+-----------+
|store_product_id|last_timestamp|time_create|last_timestamp|time_create|
+----------------+--------------+-----------+--------------+-----------+
|               3|             4|          4|             4|          4|
|               2|             2|          2|             2|          2|
+----------------+--------------+-----------+--------------+-----------+

Внутреннее соединение дает пересечение df1 и df2 на основе store_product_id

0 голосов
/ 18 апреля 2019

Как сказал @ ags29,

Результатом df1[df1.time_create==last_timestamp].select(['store_product_id']).collect() является список строк:

[Row(store_product_id=u'01e8f3c0-3ad5-4b69-b46d-f5feb3cadd5f')]

Мне нужно преобразовать строку в строку, правильный путь:

ids = df1[df1.time_create==last_timestamp].select('store_product_id').collect()
ids = map(lambda x: x.store_product_id, ids)
df1[df1.store_product_id.isin(ids)].show()

Это совсем другое дело с пандами.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...