Pyspark: pyarrow.lib.ArrowTypeError: требуется целое число (полученный тип Timestamp) - PullRequest
1 голос
/ 21 января 2020

Я записываю фрейм данных в таблицу больших запросов. Это работало, но теперь я вызываю pandas udf перед записью данных в bigquery. По какой-то причине, когда я вызываю pandas udf перед записью фрейма данных spark в bigquery, я теперь вижу следующую ошибку:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
    batch = _create_batch(series, self._timezone)
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
    arrs = [create_array(s, t) for s, t in series]
  File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
    return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
  File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)

, которая из журналов исполнителей ниже выглядит так, как будто она вызвана неверная схема партера, где столбцы меток времени выводятся как целые числа?

20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "id",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "firstname",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "status",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "entry_date",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "last_status_date",
    "type" : "timestamp",
    "nullable" : true,
    "metadata" : { }
  } ]
}
and corresponding Parquet message type:
message spark_schema {
  optional binary id (UTF8);
  optional binary firstname (UTF8);
  optional binary status (UTF8);
  optional int96 entry_date;
  optional int96 last_status_date;
}

Это сбивает с толку, потому что это не происходит, когда я запускаю свой код без применения pandas_udf. Udf никак не манипулирует столбцами даты ...

def main():
    # apply pandas udf 
    df.groupBy('firstname').apply(my_pandas_udf)

    # drop some columns
    cols_to_drop = ['firstname']

    # save to bigquery
    df \
        .drop(*cols_to_drop) \
        .write \
        .format("bigquery") \
        .option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
        .option("project", "PROJECT_ID") \
        .option("credentialsFile","/path/to/my/credentials.json") \
        .option("parentProject", "PROJECT_ID") \
        .option("table", "PROJECT_ID:dataset.table") \
        .mode("overwrite") \
        .save()

def udf_schema():
    return StructType([
        StructField('id', StringType(), True),
        StructField('firstname', StringType(), True),
        StructField('status', StringType(), True),
        StructField('entry_date', TimestampType(), True),
        StructField('last_status_date', TimestampType(), True),
    ])

@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
    df = df.sort_values('entry_date', ascending=False)
    oldest_date = df['entry_date'].iloc[0]
    df = df[df['entry_date'] >= oldest_date]
    df = df.copy()
    return df

Что я делаю не так? Эта публикация stackoverflow , кажется, имеет аналогичную проблему, но по состоянию на 21.012020 не получила ответа.

Редактировать (1): Типы данных DataFrame до и после pandas_udf Ошибка возникает при возврате из pandas_udf, но вот типы данных для фрейма данных искры до его передачи в pandas_udf

==> BEFORE 

id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...