Spark Dataframe для потоковой передачи Azure EventHubs через блоки данных - PullRequest
0 голосов
/ 22 апреля 2020

Я пытаюсь использовать приведенный ниже python код в Azure Блокнот данных для создания образца кадра данных двоичного потока avro:

from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro

data = [(2, Row(name='Krishna', age=2))]
df = spark.createDataFrame(data, ("key", "value"))
avroDf = df.select(to_avro(df.value).alias("body"))

Отображается: avroDf: pyspark. sql .dataframe. DataFrame = [body: binary]

Теперь я пытаюсь записать эти данные в Azure Eventhubs:

import pyspark
from pyspark.sql.functions import to_json, struct
from pyspark.sql.avro.functions import to_avro , from_avro

connectionString = "xxx;EntityPath=test2"


ehConf = {
  'eventhubs.connectionString' : connectionString,
}


ds = avroDf \
    .select("body") \
    .write \
    .format("eventhubs") \
    .options(**ehConf) \
    .save()

, когда я проверяю сообщения и входящий запрос, я вижу Eventhub получить это. Однако, если я хочу выполнить запрос на eventhub для просмотра данных в Avro, я получаю ошибки ниже: Моя цель - потоковая передача данных с помощью avro. У меня нет проблем с потоковой передачей json, и я могу запрашивать данные в Azure Концентраторах событий. Любое решение?

Source '<unknown_location>' had 1 occurrences of kind 'InputDeserializerError.InvalidData' between processing times '2020-04-22T07:19:04.6810451Z' and '2020-04-22T07:19:04.6810451Z'. Invalid Avro Format, drop invalid record.

спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...