Как прочитать поток структурированных данных и записать в таблицу Hive - PullRequest
0 голосов
/ 22 мая 2019

Необходимо прочитать поток структурированных данных из потока Кафки и записать его в уже существующую таблицу Hive.После анализа выясняется, что один из вариантов заключается в том, чтобы выполнить readStream источника Kafka, а затем выполнить writeStream в приемник файлов по пути к файлу HDFS.

Мой вопрос здесь - можно ли напрямую записать таблицу Hive?Или существует ли обходной подход, который можно использовать для этого варианта использования?

EDIT1:

.foreachBatch - кажется, работает, но имеет проблему, упомянутую ниже

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
//subscribe to kafka topic
val csvDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "xxxxxx912:9092").option("subscribe", "testtest").load()
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]
val query = abcd.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {batchDs.write.mode(SaveMode.Append).insertInto("default.6columns");}).option("quote", "\u0000").start()

hive> select * from 6columns;
OK
0       A3,L1,G1,P1,O1,101,TXN1     testtest        122     0       2019-05-23 12:38:49.515
0       A3,L1,G1,P1,O1,102,TXN2     testtest        123     0       2019-05-23 12:38:49.524
0       A1,L1,G1,P1,O1,100,TXN3     testtest        124     0       2019-05-23 12:38:49.524
0       A2,L2,G1,P1,O2,100,TXN4     testtest        125     0       2019-05-23 12:38:49.524
0       A3,L1,G1,P1,O1,103,TXN5     testtest        126     0       2019-05-23 12:38:54.525
0       A3,L1,G1,P1,O1,104,TXN6     testtest        127     0       2019-05-23 12:38:55.525
0       A4,L1,G1,P1,O1,100,TXN7     testtest        128     0       2019-05-23 12:38:56.526
0       A1,L1,G1,P1,O1,500,TXNID8   testtest        129     0       2019-05-23 12:38:57.526
0       A6,L2,G2,P1,O1,500,TXNID9   testtest        130     0       2019-05-23 12:38:57.526

То, что я ищу, - это разделить атрибут значения сообщения Kafka так, чтобы данные напоминали таблицу Hive, из которой они становятся таблицей из 12 столбцов (A3, L1, G1, P1, O1,101, TXN1 - разделяютсяна 7 атрибутов).Нужна дополнительная трансформация, похожая на .option ("quote", "\ u0000"), которую я сделал при написании фрейма данных.Но, похоже, не работает.

1 Ответ

1 голос
/ 22 мая 2019

Как только ваш поток настроен и потребляется из kafka, вы можете использовать функцию forEachBatch следующим образом.

val yourStream = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .load()

val query = yourStream.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {
  batchDs
      .write
      .mode(SaveMode.Append)
      .insertInto("your_db.your_table");
}).start()

query.awaitTermination()

чтобы разделить строку на , на отдельные столбцы, вы можете использовать функцию split, чтобы получить все элементы, разделенные ,, в массив, а затем вы можете выбрать элементы индивидуально по индексу, например, "SPLIT(CAST(value AS STRING), ',')[0]" чтобы получить первый элемент.

Так замените

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]

с

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "SPLIT(CAST(value AS STRING), ',')[0]", "SPLIT(CAST(value AS STRING), ',')[1]", "SPLIT(CAST(value AS STRING), ',')[2]", "SPLIT(CAST(value AS STRING), ',')[3]", "SPLIT(CAST(value AS STRING), ',')[4]", "SPLIT(CAST(value AS STRING), ',')[5]", "SPLIT(CAST(value AS STRING), ',')[6]", "CAST(topic AS STRING)", "CAST(offset AS STRING)", "CAST(partition AS STRING)", "CAST(timestamp AS STRING)").as[(String, String, String, String, String, String, String, String, String, String, String)]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...