pyspark 2.3.2: датафрейм -> сдвиг строк с 1, по столбцу -> по столбцу с датами - PullRequest
0 голосов
/ 30 ноября 2018

Best

В данный момент я экспериментирую с pyspark 2.3.2.И я хотел бы сдвинуть столбец на основе определенного столбца (сгруппировать по.)используйте pandas_udf.Поэтому я запомнил этот кусок кода

мой код:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(data_shift_prep.schema, PandasUDFType.GROUPED_MAP)
def test(pdf):
    pdf["timestamp_prev"] = pdf['timestamp'].shift(1)
    return pdf

data_shift = data_shift_prep.groupby('id').apply(test)

со следующим ожидаемым результатом:

Ожидаетсярезультат:

    id  timestamp                logintype  start_sessie    timestamp_prev
0   3   2016-02-09 09:36:57.217     INTERN  True            None
1   3   2016-02-09 09:51:40.899     INTERN  False           2016-02-09 09:36:57.217
2   3   2016-02-10 10:11:22.131     INTERN  True            2016-02-09 09:51:40.899
3   3   2016-02-10 10:17:16.345     INTERN  False           2016-02-10 10:11:22.131
4   4   2017-08-10 10:18:12.412     INTERN  True            None
5   4   2017-08-10 10:21:11.788     INTERN  False           2017-08-10 10:18:12.412
6   4   2017-08-11 14:17:33.119     INTERN  True            2017-08-10 10:21:11.788
7   4   2017-08-11 14:11:51.173     INTERN  False           2017-08-11 14:17:33.119 
8   4   2017-08-16 11:43:16.609     INTERN  True            2017-08-11 14:11:51.173
9   4   2017-08-16 11:13:35.421     INTERN  False           2017-08-16 11:43:16.609

Но, к сожалению, я всегда получаю сообщение об ошибке.pyarrow.lib.ArrowInvalid: Error converting from Python objects to Int64: Got Python object of type Timestamp but can only handle these types: integer И я не знаю, как с этим справиться.Сначала я подумал, что это из-за NaT , которые созданы функцией сдвига.Но я не уверен в этом (у меня все еще есть тот же тип ошибки после замены NaT значением Нет )

У кого-то есть еще опыт?(и вы можете решить эту ошибку типа)

С уважением

- Добавить дополнительные: схема -

StructType(List(StructField(id,IntegerType,true),
                StructField(timestamp,TimestampType,true),
                StructField(logintype,StringType,true),
                StructField(start_sessie,BooleanType,true),
                StructField(timestamp_prev,TimestampType,true)))
...