Источник данных CSV не поддерживает двоичный тип данных - PullRequest
0 голосов
/ 26 апреля 2019

Я пытаюсь запустить потоковое приложение, которое считывает данные из потока kafka и обрабатывает их. Я запускаю следующее.

val schema = new StructType()
      .add("InvoiceNo", LongType)
      .add("StockCode", LongType)
      .add("Description", StringType)
      .add("Quantity", ShortType)
      .add("InvoiceDate", StringType)
      .add("UnitPrice", DoubleType)
      .add("CustomerID", IntegerType)
      .add("Country", StringType)


    val df = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", conf.get("spark.kafka_bootstrap_servers")).
      option("subscribe", "webserver").
      option("kafka.security.protocol", "SASL_SSL").
      option("kafka.sasl.mechanism", "PLAIN").
      option("kafka.ssl.protocol", "TLSv1.2").
      option("kafka.ssl.enabled.protocols", "TLSv1.2").
      option("failOnDataLoss", "false").
      load()

Я получаю следующую ошибку.

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: CSV data source does not support binary data type.

CSV, который я даю в потоке,

536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,01/12/10 8:26,2.55,17850,United Kingdom

Что может быть причиной этой ошибки?

1 Ответ

0 голосов
/ 26 апреля 2019

spark.readStream.format("kafka") всегда читает данные в двоичном виде, а не в виде строки.

Значения всегда десериализуются как байтовые массивы с помощью ByteArrayDeserializer. Используйте операции DataFrame для явной десериализации значений
- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Непонятно, где вы использовали переменную .csv() или schema.

В документах вы можете видеть, как он преобразует ключи и значения в строки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...