Я создаю фрейм данных, который я буду использовать в следующей части кода для вставки записей в таблицу кустов, вот код.
Я получаю следующее сообщение об ошибке с этим кодом
py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
===========
def fn_create_df_load_status():
data = sc.parallelize([
[ ('cust_id', custid) \
, ('sys_rec', 'source system') \
, ('load_start_ts', (from_unixtime(unix_timestamp(), "yyyy-MM-dd HH:mm:ss"))) \
, ('load_end_ts', (from_unixtime(unix_timestamp(), "yyyy-MM-dd HH:mm:ss"))) \
, ('status', 'STARTED')]
])
# Convert to tuple
data_converted = data.map(lambda x: (x[0][1]))
# Define schema
schema = StructType([
StructField("cust_id", StringType(), True),
StructField("sys_rec", StringType(), True),
StructField("load_start_ts", StringType(), True),
StructField("load_end_ts", StringType(), True),
StructField("status", StringType(), True)
])
# Create dataframe
DF = sqlContext.createDataFrame(data_converted, schema)
# Output
DF.show()