Настройка производительности PySpark Dataframe - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь объединить некоторые сценарии; чтобы дать нам одно чтение БД, а не каждый скрипт, читающий одни и те же данные из Hive. Так что переходим к чтению один раз; обработать многие модели.

Я сохранил кадры данных и переделил выходные данные после каждой агрегации; но мне нужно, чтобы это было быстрее, во всяком случае, эти вещи замедлили это. У нас есть 20 ТБ + данных в день, поэтому я предполагал, что сохранение данных, если они будут прочитаны много раз, ускорит процесс, но это не так.

Кроме того, у меня много заданий это происходит из тех же данных, как показано ниже. Можем ли мы запустить их параллельно. Может ли определение и вывод DF2 происходить одновременно с определением DF3, чтобы ускорить его?

df = definedf....persist()
df2 = df.groupby....
df3 = df.groupby....
....

Можно ли определить глобально кэшированный фрейм данных, к которому могут обращаться другие сценарии?

Большое спасибо!

1 Ответ

0 голосов
/ 23 апреля 2020

В scala мы можем сделать, как показано ниже. Может быть, этот код поможет вам конвертировать или думать те же логи c в python.


scala> :paste
// Entering paste mode (ctrl-D to finish)

// Define all your parallel logics inside some classes like below

trait Common extends Product with Serializable {
    def process: DataFrame
}
case class A(df: DataFrame) extends Common{
  def process = {
      Thread.sleep(4000) // To show you, I have added sleep method
      println("Inside A case class")
      df.filter(col("id") <= 2)
  }
}

case class B(df: DataFrame) extends Common {
  def process = {
      Thread.sleep(1000) // To show you, I have added sleep method
      println("Inside B case class")
      df.filter(col("id") > 5 && col("id") <= 7)
  }
}

case class C(df: DataFrame) extends Common {
  def process = {
      Thread.sleep(3000) // To show you, I have added sleep method
      println("Inside C case class")
      df.filter(col("id") > 9 && col("id") <= 12)
  }
}

// Exiting paste mode, now interpreting.

defined trait Common
defined class A
defined class B
defined class C

scala> val df = (0 to 100).toDF("id").cache // Create & cache your DF.
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int]

scala> Seq(A(df),B(df),C(df)).par.map(_.process).reduce(_ union _).show(false) // Create All object in list which you want to invoke parallel

Inside B case class
Inside C case class
Inside A case class
+---+
|id |
+---+
|0  |
|1  |
|2  |
|6  |
|7  |
|10 |
|11 |
|12 |
+---+


scala>

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...