Как устанавливается количество задач и разделов при использовании MemoryStream? - PullRequest
0 голосов
/ 09 сентября 2018

Я пытаюсь понять странное поведение, которое я наблюдал в своем потоковом приложении Spark, работающем в режиме local[*].

У меня 8 ядер на моих машинах. Хотя большинство моих Пакетов имеют 8 разделов, время от времени я получаю 16, 32 или 56 и так далее разделов / Задач. Я заметил, что это всегда кратно 8. Я заметил при открытии вкладки стадии, что, когда это происходит, это потому, что есть несколько LocalTableScan.

То есть, если у меня есть 2 LocalTableScan, то мини-пакетное задание будет иметь 16 задач / разделов и т. Д.

Я имею в виду, что он вполне может выполнить два сканирования, объединить две партии и передать их в мини-пакетную работу. Однако нет, это приводит к мини-пакетному заданию, в котором количество задач = количество ядер * количество проверок.

Вот как я настроил свой MemoryStream:

val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))

У меня есть будущее, которое подпитывает мой поток памяти как таковой, сразу после:

Future {
    blocking {
      for (i <- 1 to 100000) {
        rows.addData(maps)
        Thread.sleep(3000)
      }
    }
  }

а затем мой запрос:

rdf.writeStream.
    trigger(Trigger.ProcessingTime("1 seconds"))
    .format("console").outputMode("append")
    .queryName("SourceConvertor1").start().awaitTermination()

Интересно, почему число задач варьируется? Как это должно быть определено Спарк?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...