Конвертировать набор данных Spark в JSON и записать его в Kafka Producer - PullRequest
0 голосов
/ 05 октября 2018

Я хочу прочитать таблицу из Hive и написать в Kafka Producer (пакетное задание).

В настоящее время я читаю таблицу как Dataset<Row> в своем классе Java и пытаюсь преобразовать ее в json, так чточто я могу написать как сообщение json, используя KafkaProducer.

Dataset<Row> data = spark.sql("select * from tablename limit 5");
List<Row> rows = data.collectAsList();
for(Row row: rows) {
        List<String> stringList = new ArrayList<String>(Arrays.asList(row.schema().fieldNames())); 
        Seq<String> row_seq = JavaConverters.asScalaIteratorConverter(stringList.iterator()).asScala().toSeq();
        Map map = (Map) row.getValuesMap(row_seq);
        JSONObject json = new JSONObject();
        json.putAll( map);

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(SPARK_CONF.get("topic.name"), json.toString());
        producer.send(record);

Я получаю ClassCastException

1 Ответ

0 голосов
/ 05 октября 2018

Как только вы пишете collectAsList();, вы больше не используете Spark, просто необработанный Kafka Java API.

Я бы предложил использовать Spark Structured Streaming Integration Kafka , и вы можете сделать

Вот пример, и вам нужно сформировать DataFrame как минимум с двумя столбцамипотому что Кафка берет ключи и ценности.

// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic_name")
  .save()

Что касается получения данных в JSON, опять же, collectToList() неверно.Не тяните данные в один узел.

Вы можете использовать data.map() для преобразования набора данных из одного формата в другой.

Например, вы должны отобразить строку в строку в формате JSON.

row -> "{\"f0\":" + row.get(0) + "}"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...