У меня есть таблица в Hive, например:
hive> create table if not exists stock_quote (TradeDay string, TradeTime string, OpenPrice string, HighPrice string, LowPrice String, ClosePrice String, volume string) partitioned by (tickerid string) row format delimited fields terminated by ',' stored as textfile;
Попытка вставить в таблицу следующий код:
sc = spark.sparkContext
lines = sc.textFile('file:///<File Name>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(tickerid = row[0], tradeday = row[1], tradetime = row[2],
openprice = row[3], highprice = row[4],
lowprice = row[5], closeprice = row[6],
volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.format('hive').mode('append').partitionBy('tickerid').saveAsTable('stock_quote')
Получение следующей ошибки:
py4j.protocol.Py4JJavaError: An error occurred while calling o72.saveAsTable.
: org.apache.spark.SparkException: Requested partitioning does not match the stock_quote table:
Requested partitions:
Table partitions: tickerid
Пробовал со следующим:
stock_quote_table = namedtuple("stock_quote",
["tickerid", "tradeday", "tradetime", "openprice", "highprice", "lowprice", "closeprice", "volume"])
rows_map = rows.map(lambda row : stock_quote_table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').partitionBy('tickerid').insertInto('default.stock_quote')
Получена следующая ошибка:
pyspark.sql.utils.AnalysisException: "insertInto() can't be used together with partitionBy(). Partition columns have already been defined for the table. It is not necessary to use partitionBy().;"
Так изменилась последняя строка:
rows_df.write.mode('append').insertInto('default.stock_quote')
Выше вставлены данные в таблицу, но он создал один подкаталог для каждой строки в файле в HDFS с tickerid = like / user / hive / warehouse / stock_quote / tickerid = 980 и, под этим, имя файла начинается с 'part ...
Подскажите, пожалуйста, что не так в коде.