Я пытаюсь понять дельту блоков данных и думаю сделать POC, используя Кафку.По сути, планируется использовать данные из Kafka и вставить их в дельта-таблицу блоков данных.
Вот шаги, которые я сделал:
1) Создание таблицы дельты для блоков данных.
%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'
2) Потреблениеданные из Кафки.
import org.apache.spark.sql.types._
val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"
val kafka2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 100)
.load()
.select($"value")
.withColumn("Value", $"value".cast(StringType))
.writeStream
.option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
.table("hazriq_delta_trial2")
Однако, когда я запрашиваю таблицу, она пуста.
Я могу подтвердить, что данные поступают.Я проверяю это, наблюдая всплеск на графике, когда создаю сообщение на тему Кафки.
![incoming data](https://i.stack.imgur.com/x7ZBZ.png)
Я что-то упустил?
Мне нужна помощь, как я могу вставить в таблицу данные, полученные от Кафки.
Заранее спасибо.