py4j.protocol.Py4JError: Произошла ошибка при вызове o51 .__ getnewargs__. Трассировка: py4j.Py4JException: метод __getnewargs __ ([]) не существует - PullRequest
0 голосов
/ 02 мая 2018

Я создаю фрейм данных, который я буду использовать в следующей части кода для вставки записей в таблицу кустов, вот код.

Я получаю следующее сообщение об ошибке с этим кодом

py4j.protocol.Py4JError: An error occurred while calling o51.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
        at py4j.Gateway.invoke(Gateway.java:272)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)

===========

def fn_create_df_load_status():
   data = sc.parallelize([
    [ ('cust_id', custid)     \
    , ('sys_rec', 'source system')   \
    , ('load_start_ts', (from_unixtime(unix_timestamp(), "yyyy-MM-dd HH:mm:ss"))) \
    , ('load_end_ts',   (from_unixtime(unix_timestamp(), "yyyy-MM-dd HH:mm:ss"))) \
    , ('status', 'STARTED')]
    ])
# Convert to tuple
   data_converted = data.map(lambda x: (x[0][1]))

# Define schema
   schema = StructType([
       StructField("cust_id", StringType(), True),
       StructField("sys_rec", StringType(), True),
       StructField("load_start_ts", StringType(), True),
       StructField("load_end_ts", StringType(), True),
       StructField("status", StringType(), True)
    ])

# Create dataframe
   DF = sqlContext.createDataFrame(data_converted, schema)

# Output
   DF.show() 

1 Ответ

0 голосов
/ 02 мая 2018

Вот обновленный код

def fn_create_df_load_status():
   from time import gmtime, strftime

   load_start_ts = strftime("%Y-%m-%d %H:%M:%S", gmtime())
   load_end_ts = strftime("%Y-%m-%d %H:%M:%S", gmtime())

   print ("load_start_ts : ", load_start_ts)
   print ("load_end_ts : ", load_end_ts)

   data = sc.parallelize([
    ( ('cust_id', custid)     \
    , ('sys_rec', 'source system') \
    , ('load_start_ts', load_start_ts) \
    , ('load_end_ts',  load_end_ts) \
    , ('status', 'STARTED'))
    ])
# Convert to tuple
   data_converted = data.map(lambda x: (x[0][1], x[1][1], x[2][1], x[3][1], x[4][1]))

# Define schema
   schema = StructType([
       StructField("cust_id", StringType(), True),
       StructField("sys_rec", StringType(), True),
       StructField("load_start_ts", StringType(), True),
       StructField("load_end_ts", StringType(), True),
       StructField("status", StringType(), True)
    ])

# Create dataframe
   DF = spark.createDataFrame(data_converted, schema)

# Output
   DF.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...