Как добавить заголовок и загрузить мой CSV с RDD / dataframe вasticsearch? - PullRequest
0 голосов
/ 23 марта 2020

Я хочу добавить заголовок и загрузить свой CSV с RDD / dataframe вasticsearch (pyspark) У меня есть этот код

from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
rdd=sc.textFile("data/DUMP_20200101.csv")
rdd.count()
rdd.collect()
mapprdd = rdd.map(lambda x:x.split(","))
mapprdd.collect()
es_write_conf = {
  "es.nodes" : '192.x.x.x',
  "es.port" : '9200',
  "es.resource" : 'day01-01/subs',
  "es.input.json" : "yes",
  "es.mapping.id": "doc_id"
}

=== > эта ошибка показывает

rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf
)

===> Ошибка при вызове z: org. apache .spark.api. python .PythonRDD.saveAsNewAPIHadoopFile. : org. apache .spark.SparkException: нельзя использовать элемент RDD типа java .util.ArrayList



Я искал, как добавить заголовок, я могу это сделать но он не будет объединен, так как мои данные и заголовок называются saperatly

header = ["Subscriber",., ...,., "Завершение"]

так как я могу добавить Заголовки с данными не отдельно?

и как я могу решить проблему загрузки моих данных вasticsearch?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...