Я пишу искровое задание, которое будет извлекать данные из Azure Deltalake и передавать их в Eventhubs (который является модулем Kafka в Azure). У меня нет спецификации реестра схемы во время написания, и я не знаю, действительно ли это мандат (Azure не имеет слитых метаданных реестра схемы, если для этого не вращается другая виртуальная машина) Перед записью данных я использую to_avro для преобразования моего Данные дельта-таблицы к одному столбцу тела (который будет иметь Topi c с каждой записью, имеющей все пары ключ-значение)
при условии, что мой экстракт дельта-таблицы находится на json в файле ниже: persondelta. json
> persondf = spark.read.format("avro").load("/tmp/persondelta.json")
>
> avroDf = persondf .select(to_avro(struct([persondf [x] for x in
> persondf .columns])).alias("body"))
Я пытаюсь записать эти данные в Eventhubs (с конечной точкой kafka в Azure), как показано ниже:
ds = avroDf \
.select("body") \
.write \
.format("eventhubs") \
.options(**ehConf) \
.save()
Этот код работает для получения данных в avro, stream это так, но моя проблема в том, что когда я использую эти данные в целевой конечной точке Kafka в базе данных Druid, они видят данные, как показано ниже:
столбец: "тело"
{ "firstname": "James" , "lastname" : "Smith" , "dob_year" : 2010 }
{ "firstname": "Michael" , "lastname" : "Rose" , "dob_year" : 2012 }
{ "firstname": "Mary" , "lastname" : "Brown" , "dob_year" : 1990 }
Мне нужно увидеть это данные в 3 различных столбцах в Druid, но я думаю, что это не проблема Druid, потому что данные AVRO, созданные в 1 столбце, форматируются как «тело».
Я пытаюсь выяснить, является ли это единственным способом получения и потребления данных через Kafka? и я не могу указать, какой столбец я хочу для потоковой передачи, и каждый topi c может нести несколько столбцов? очевидно, с этим кодом eventhubs нуждается в теле столбца, иначе он не сможет его передать.
Любая мысль и помощь будут оценены? Примечание: у меня нет схемы регистрации на стороне потребителя или производителя (я использовал встроенную схему в Druid spe c), и я переношу данные из облака в On-Prem, и у меня нет общего туннеля vNet для открытия IP-адресов и портов для сделать общий подход схемы.
спасибо