Промежуток времени между двумя заданиями в Spark - PullRequest
0 голосов
/ 14 сентября 2018

Я вставляю данные в таблицу улья с итерациями в искре.

Например: скажем, 10 000 элементов, сначала эти элементы разделены на 5 списков, каждый список имеет 2000 элементов.После этого я делаю итерацию по этим 5 спискам.

В каждой итерации 2000 элементов отображаются на гораздо большее количество строк, поэтому в конце итерации 15M записей вставляются в таблицу кустов.Каждая итерация завершается за 40 минут.

Проблема возникает после каждой итерации.искра ждет запуска других 2000 К.Время ожидания составляет около 90 минут!В этот промежуток времени в спарк веб-интерфейсе нет активных задач ниже.enter image description here

Кстати, итерации начинаются непосредственно с искрового процесса.Никакого скала или java-кода не существует в начале или в конце итераций.

Есть идеи?

Спасибо

   val itemSeq = uniqueIDsDF.select("unique_id").map(r => r.getLong(0)).collect.toSeq // Get 10K items
  val itemList =  itemSeq.sliding(2000,2000).toList // Create 5 Lists


itemList.foreach(currItem => {

//starting code. (iteration start)
        val currListDF = currItem.toDF("unique_id")

        val currMetadataDF = hive_raw_metadata.join(broadcast(currListDF),Seq("unique_id"),"inner")
        currMetadataDF.registerTempTable("metaTable")
        // further logic here ....
   }

Ответы [ 2 ]

0 голосов
/ 15 сентября 2018

AFAIK, я понимаю, что вы пытаетесь разделить DataFrame и передавать данные в пакетном режиме и выполнять некоторую обработку в качестве псевдокода, что было не так ясно.

Как вы упомянули выше в своем ответе, когда когда-либо произойдет действие, потребуется некоторое время, чтобы вставить его в тонуть.sliding можно улучшить следующим образом ...


Исходя из этого предположения, у меня есть 2 варианта для вас.Вы можете выбрать наиболее подходящий ...

Вариант № 1: (foreachPartitionAsync: AsyncRDDActions ) Я бы предложил вам использоватьDataFrame возможности группировки итераторов

 df.repartition(numofpartitionsyouwant) // numPartitions
    df.rdd.foreachPartitionAsync  // since its partition wise processing to sink it would be faster than the approach you are adopting...
        { 
partitionIterator =>
          partitionIterator.grouped(2000).foreach {
        group => group.foreach {
        // do your insertions here or what ever you wanted to ....
        }
       }
      }

Примечание: СДР будет выполняться в фоновом режиме.Все эти исполнения будут отправлены в планировщик Spark и запущены одновременно.В зависимости от размера кластера Spark, некоторые задания могут ждать, пока исполнители не станут доступны для обработки.

Опция № 2:

Второй подходэто фрейм данных как randomSplit Я думаю, что вы можете использовать в этом случае для разделения фреймов данных одинакового размера.который вернет вам массив данных одинакового размера, если сумма их весов> 1. Примечание: веса (первый аргумент dataframe) для разбиений будут нормализованы, если они не равны 1.

DataFrame[] randomSplit (double [] weights) Произвольно разделяет этот DataFrame с предоставленными весами.

ссылается randomSplit code здесь

это будет похоже..

val equalsizeddfArray =  yourdf.randomSplit(Array(0.2,0.2,0.2,0.2,0.2,0.2, 0.3) // intentionally gave sum of weights > 1 (in your case 10000 records of dataframe to array of 5 dataframes of each 2000 records in it)

и затем ...

for (i <- 0 until equalsizeddfArray.length) {
  // your logic ....
}

Примечание : вышеуказанная логика является последовательной ... Если вы хотите выполнитьих параллельно (если они независимы) вы можете использовать

import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

// Now wait for the tasks to finish before exiting the app Await.result(Future.sequence(Seq(yourtaskfuncOndf1(),yourtaskfuncOndf2()...,yourtaskfuncOndf10())), Duration(10, MINUTES))

Из вышеперечисленных 2 вариантов, я бы предпочел подход № 2, так как функция randomSplit будет приниматьзаботиться (путем нормализации весов) о делении равных размеров для их обработки

0 голосов
/ 14 сентября 2018

Я получил причину, даже если задача вставки кажется завершенной в пользовательском интерфейсе spark, в фоновом режиме процесс вставки все еще продолжается.После завершения записи в hdfs начинается новая итерация.Вот причина пробела в веб-интерфейсе

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...