У меня очень странная проблема с потоковой структурой искры. Потоковая структура Spark создает две искровые работы для каждой микро-партии. В результате прочитайте данные из Кафки дважды. Вот простой фрагмент кода.
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object CheckHowSparkReadFromKafka {
def main(args: Array[String]): Unit = {
val session = SparkSession.builder()
.config(new SparkConf()
.setAppName(s"simple read from kafka with repartition")
.setMaster("local[*]")
.set("spark.driver.host", "localhost"))
.getOrCreate()
val testPath = "/tmp/spark-test"
FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new Path(testPath), true)
import session.implicits._
val stream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-20002-prod:9092")
.option("subscribe", "topic")
.option("maxOffsetsPerTrigger", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", "latest")
.load()
.repartitionByRange( $"offset")
.writeStream
.option("path", testPath + "/data")
.option("checkpointLocation", testPath + "/checkpoint")
.format("parquet")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
stream.processAllAvailable()
Это происходит потому, что если .repartitionByRange( $"offset")
, если я уберу эту строку, все хорошо. Но с помощью spark создайте два задания, одно с 1-й стадией, только что прочитанное с Кафки, второе с 3-й стадией: чтение -> перемешивание -> запись Таким образом, результат первого задания никогда не использовался.
Это существенно влияет на производительность. В некоторых моих темах по Кафке 1550 разделов, поэтому читать их дважды - большое дело. Если я добавлю кеш, дела пойдут лучше, но это не для меня. В локальном режиме первое задание в пакетном режиме занимает менее 0,1 мс, кроме пакета с индексом 0. Но в кластере YARN и Messos оба задания полностью ожидаемы и по моим темам занимают около 1,2 мин.
Почему это происходит? Как я могу избежать этого? Похоже, ошибка?
PS Я использую свечи 2.4.3.