Я читаю данные из раздела Kafka и помещаю их в Azure ADLS (как в HDFS) в режиме секционирования.
Мой код выглядит следующим образом:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("failOnDataLoss", false)
.load()
.selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
.partitionBy("year", "month", "day", "hour", "minute")
.format("parquet")
.option("path", outputDirectory)
.option("checkpointLocation", checkpointDirectory)
.outputMode("append")
.start()
.awaitTermination()
У меня около 2000 записей в секунду, и моя проблема в том, что Spark вставляет данные каждые 45 секунд, и я хочу, чтобы данные были вставлены немедленно.
Кто-нибудь знает, как контролировать размер микропакета?