Для чего нужен LocalTableScan в Spark Structure Streaming? - PullRequest
0 голосов
/ 09 сентября 2018

Кто-нибудь знает, что соответствует LocalTableScan в Spark Structured Streaming?

Я пытаюсь понять странное поведение, которое я наблюдал в своем приложении потоковой передачи структуры Spark, которое работает в локальном режиме [*].

У меня 8 ядер на моих машинах. Хотя большинство моих Пакетов имеют 8 разделов, время от времени я получаю 16, 32 или 56 и так далее разделов / Задач. Я заметил, что это всегда кратно 8. Я заметил, открывая вкладку стадии, что, когда это происходит, это потому, что есть несколько LocalTableScan.

То есть, если у меня есть 2 LocalTableScan, то мини-пакетное задание будет иметь 16 задач / разделов и так далее.

Чтобы дать немного контекста, потому что я подозреваю, что это может исходить из этого, я использую MemoryStream.

val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()
val rdf = df.mapPartitions{ it => {.....}}(RowEncoder.apply(StructType(List(StructField("blob", StringType, false)))))

У меня есть будущее, которое подпитывает мой поток памяти как таковой сразу после:

Future {
    blocking {
      for (i <- 1 to 100000) {
        rows.addData(maps)
        Thread.sleep(3000)
      }
    }
  }

а затем мой запрос:

rdf.writeStream.
    trigger(Trigger.ProcessingTime("1 seconds"))
    .format("console").outputMode("append")
    .queryName("SourceConvertor1").start().awaitTermination()

Пожалуйста, есть предложения? Советы?

1 Ответ

0 голосов
/ 09 сентября 2018

Это указывает на память в драйвере. Как показывает ваш код.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...