Контрольная микро партия структурированного потока искр - PullRequest
1 голос
/ 04 июля 2019

Я читаю данные из раздела Kafka и помещаю их в Azure ADLS (как в HDFS) в режиме секционирования.

Мой код выглядит следующим образом:

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("failOnDataLoss", false)
      .load()
      .selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
      .partitionBy("year", "month", "day", "hour", "minute")
      .format("parquet")
      .option("path", outputDirectory)
      .option("checkpointLocation", checkpointDirectory)
      .outputMode("append")
      .start()
      .awaitTermination()

У меня около 2000 записей в секунду, и моя проблема в том, что Spark вставляет данные каждые 45 секунд, и я хочу, чтобы данные были вставлены немедленно.

Кто-нибудь знает, как контролировать размер микропакета?

1 Ответ

2 голосов
/ 04 июля 2019

Из версии Spark 2.3 доступен режим непрерывной обработки. В официальном док. вы можете прочитать, что для этого режима поддерживаются только три приемника, и только приемник Kafka готов к производству, и " сквозную обработку с малой задержкой лучше всего наблюдать при использовании Kafka в качестве источника и приемника"

df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/0")
.option("topic", "output0")
.trigger(Trigger.Continuous("0 seconds"))
.start()

Итак, похоже, что в данный момент вы не можете использовать HDFS в качестве приемника в режиме Continuous. В вашем случае, возможно, вы сможете протестировать Akka Streams и разъем Alpakka

...