Я записываю фрейм данных в таблицу больших запросов. Это работало, но теперь я вызываю pandas udf перед записью данных в bigquery. По какой-то причине, когда я вызываю pandas udf перед записью фрейма данных spark в bigquery, я теперь вижу следующую ошибку:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)
, которая из журналов исполнителей ниже выглядит так, как будто она вызвана неверная схема партера, где столбцы меток времени выводятся как целые числа?
20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "status",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "entry_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
}, {
"name" : "last_status_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional binary id (UTF8);
optional binary firstname (UTF8);
optional binary status (UTF8);
optional int96 entry_date;
optional int96 last_status_date;
}
Это сбивает с толку, потому что это не происходит, когда я запускаю свой код без применения pandas_udf. Udf никак не манипулирует столбцами даты ...
def main():
# apply pandas udf
df.groupBy('firstname').apply(my_pandas_udf)
# drop some columns
cols_to_drop = ['firstname']
# save to bigquery
df \
.drop(*cols_to_drop) \
.write \
.format("bigquery") \
.option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
.option("project", "PROJECT_ID") \
.option("credentialsFile","/path/to/my/credentials.json") \
.option("parentProject", "PROJECT_ID") \
.option("table", "PROJECT_ID:dataset.table") \
.mode("overwrite") \
.save()
def udf_schema():
return StructType([
StructField('id', StringType(), True),
StructField('firstname', StringType(), True),
StructField('status', StringType(), True),
StructField('entry_date', TimestampType(), True),
StructField('last_status_date', TimestampType(), True),
])
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
Что я делаю не так? Эта публикация stackoverflow , кажется, имеет аналогичную проблему, но по состоянию на 21.012020 не получила ответа.
Редактировать (1): Типы данных DataFrame до и после pandas_udf Ошибка возникает при возврате из pandas_udf, но вот типы данных для фрейма данных искры до его передачи в pandas_udf
==> BEFORE
id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp