Я пытаюсь использовать приведенный ниже python код в Azure Блокнот данных для создания образца кадра данных двоичного потока avro:
from pyspark.sql import Row
from pyspark.sql.avro.functions import from_avro, to_avro
data = [(2, Row(name='Krishna', age=2))]
df = spark.createDataFrame(data, ("key", "value"))
avroDf = df.select(to_avro(df.value).alias("body"))
Отображается: avroDf: pyspark. sql .dataframe. DataFrame = [body: binary]
Теперь я пытаюсь записать эти данные в Azure Eventhubs:
import pyspark
from pyspark.sql.functions import to_json, struct
from pyspark.sql.avro.functions import to_avro , from_avro
connectionString = "xxx;EntityPath=test2"
ehConf = {
'eventhubs.connectionString' : connectionString,
}
ds = avroDf \
.select("body") \
.write \
.format("eventhubs") \
.options(**ehConf) \
.save()
, когда я проверяю сообщения и входящий запрос, я вижу Eventhub получить это. Однако, если я хочу выполнить запрос на eventhub для просмотра данных в Avro, я получаю ошибки ниже: Моя цель - потоковая передача данных с помощью avro. У меня нет проблем с потоковой передачей json, и я могу запрашивать данные в Azure Концентраторах событий. Любое решение?
Source '<unknown_location>' had 1 occurrences of kind 'InputDeserializerError.InvalidData' between processing times '2020-04-22T07:19:04.6810451Z' and '2020-04-22T07:19:04.6810451Z'. Invalid Avro Format, drop invalid record.
спасибо