Spark Streaming: текстовый источник данных поддерживает только один столбец - PullRequest
0 голосов
/ 29 ноября 2018

Я потребляю Kafka данных и затем передаю данные в HDFS.

Данные, хранящиеся в Kafka теме trial, выглядят так:

hadoop
hive
hive
kafka
hive

ОднакоКогда я отправляю свои коды, он возвращает:

Исключение в теме "main"

org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 7 columns.;
=== Streaming Query ===
Identifier: [id = 2f3c7433-f511-49e6-bdcf-4275b1f1229a, runId = 9c0f7a35-118a-469c-990f-af00f55d95fb]
Current Committed Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":13}}}
Current Available Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":14}}}

Мой вопрос: как показано выше, данные, хранящиеся в Kafka, содержат только ОДИН столбецпочему программа говорит, что 7 columns?

Любая помощь приветствуется.


Мои spark-streaming коды:

def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder.master("local[4]")
  .appName("SpeedTester")
  .config("spark.driver.memory", "3g")
  .getOrCreate()

val ds = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "192.168.95.20:9092")
  .option("subscribe", "trial")
  .option("startingOffsets" , "earliest")
  .load()
  .writeStream
  .format("text")
  .option("path", "hdfs://192.168.95.21:8022/tmp/streaming/fixed")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()
  .awaitTermination()
 }

1 Ответ

0 голосов
/ 29 ноября 2018

Это объясняется в Руководство по интеграции структурированной потоковой передачи + Kafka :

Каждая строка в источнике имеет следующую схему:

Тип столбца

двоичный ключ

двоичное значение

строка темы

раздел int

длинное смещение

длинная метка времени

timestampType int

, что дает ровно семь столбцов.Если вы хотите записать только полезную нагрузку (значение), выберите ее и приведите к строке:

spark.readStream
   ...
  .load()
  .selectExpr("CAST(value as string)")
  .writeStream
  ...
  .awaitTermination()
...