Я пытаюсь понять странное поведение, которое я наблюдал в своем потоковом приложении Spark, работающем в режиме local[*]
.
У меня 8 ядер на моих машинах. Хотя большинство моих Пакетов имеют 8 разделов, время от времени я получаю 16, 32 или 56 и так далее разделов / Задач. Я заметил, что это всегда кратно 8. Я заметил при открытии вкладки стадии, что, когда это происходит, это потому, что есть несколько LocalTableScan.
То есть, если у меня есть 2 LocalTableScan, то мини-пакетное задание будет иметь 16 задач / разделов и т. Д.
Я имею в виду, что он вполне может выполнить два сканирования, объединить две партии и передать их в мини-пакетную работу. Однако нет, это приводит к мини-пакетному заданию, в котором количество задач = количество ядер * количество проверок.
Вот как я настроил свой MemoryStream:
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))
У меня есть будущее, которое подпитывает мой поток памяти как таковой, сразу после:
Future {
blocking {
for (i <- 1 to 100000) {
rows.addData(maps)
Thread.sleep(3000)
}
}
}
а затем мой запрос:
rdf.writeStream.
trigger(Trigger.ProcessingTime("1 seconds"))
.format("console").outputMode("append")
.queryName("SourceConvertor1").start().awaitTermination()
Интересно, почему число задач варьируется? Как это должно быть определено Спарк?