Вставить данные из Кафки в дельту Databricks - PullRequest
0 голосов
/ 28 февраля 2019

Я пытаюсь понять дельту блоков данных и думаю сделать POC, используя Кафку.По сути, планируется использовать данные из Kafka и вставить их в дельта-таблицу блоков данных.

Вот шаги, которые я сделал:

1) Создание таблицы дельты для блоков данных.

%sql
CREATE TABLE hazriq_delta_trial2 (
value STRING
)
USING delta
LOCATION '/delta/hazriq_delta_trial2'

2) Потреблениеданные из Кафки.

import org.apache.spark.sql.types._

val kafkaBrokers = "broker1:port,broker2:port,broker3:port"
val kafkaTopic = "kafkapoc"

val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 100)
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .table("hazriq_delta_trial2")

Однако, когда я запрашиваю таблицу, она пуста.

Я могу подтвердить, что данные поступают.Я проверяю это, наблюдая всплеск на графике, когда создаю сообщение на тему Кафки.

incoming data

Я что-то упустил?

Мне нужна помощь, как я могу вставить в таблицу данные, полученные от Кафки.

Заранее спасибо.

1 Ответ

0 голосов
/ 17 мая 2019

1) Попробуйте проверить, есть ли у вас доступ к Kafka из вашего кластера Spark, иногда вам нужно разрешить доступ с некоторых ips в Kafka.

2) Попробуйте изменить этот .option("startingOffsets", "earliest") на этот.option("startingOffsets", "latest")

3) Попробуйте также

val kafka2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "earliest")
  .load()
  .select($"value")
  .withColumn("Value", $"value".cast(StringType))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/hazriq_delta_trial2/_checkpoints/test")
  .start("hazriq_delta_trial2")
...