Кто-нибудь знает, что соответствует LocalTableScan в Spark Structured Streaming?
Я пытаюсь понять странное поведение, которое я наблюдал в своем приложении потоковой передачи структуры Spark, которое работает в локальном режиме [*].
У меня 8 ядер на моих машинах. Хотя большинство моих Пакетов имеют 8 разделов, время от времени я получаю 16, 32 или 56 и так далее разделов / Задач. Я заметил, что это всегда кратно 8. Я заметил, открывая вкладку стадии, что, когда это происходит, это потому, что есть несколько LocalTableScan.
То есть, если у меня есть 2 LocalTableScan, то мини-пакетное задание будет иметь 16 задач / разделов и так далее.
Чтобы дать немного контекста, потому что я подозреваю, что это может исходить из этого, я использую MemoryStream.
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))
У меня есть будущее, которое подпитывает мой поток памяти как таковой сразу после:
Future {
blocking {
for (i <- 1 to 100000) {
rows.addData(maps)
Thread.sleep(3000)
}
}
}
а затем мой запрос:
rdf.writeStream.
trigger(Trigger.ProcessingTime("1 seconds"))
.format("console").outputMode("append")
.queryName("SourceConvertor1").start().awaitTermination()
Пожалуйста, есть предложения? Советы?