Spark объединение датафреймов не дает рассчитывать? - PullRequest
0 голосов
/ 04 марта 2020

Я пытаюсь объединить эти кадры данных, я использовал G_ID, не равный NULL, или MCOM.T_ID, не равный NULL, и использовал обрезку, счетчик не подходит, он работает с 1 часа. Есть только 3 задачи из 300 задач. Пожалуйста, предложите, как я могу отладить это? не вызывает проблемы, как я могу отладить?

enter image description here

enter image description here

 val table1 = spark.sql(""" SELECT  trim(C_ID) AS PC_ID FROM ab.CIDS WHERE 
  _UPDT_TM >= '2020-02-01 15:14:39.527'  """)

val table2 = spark.sql(""" SELECT trim(C_ID) AS PC_ID   FROM ab.MIDS MCOM INNER
 JOIN ab.VD_MBR VDBR
  ON Trim(MCOM.T_ID) = Trim(VDBR.T_ID) AND Trim(MCOM.G_ID) = Trim(VDBR.G_ID)
 AND Trim(MCOM.C123M_CD) IN ('BBB', 'AAA') WHERE MCOM._UPDT_TM >= '2020-02-01 15:14:39.527'
 AND Trim(VDBR.BB_CD) IN ('BBC') """)

var abc=table1.select("PC_ID").union(table2.select("PC_ID"))

even tried this --> filtered = abc.filter(row => !row.anyNull);

Ответы [ 2 ]

1 голос
/ 04 марта 2020

Похоже, у вас проблема с перекосом данных. Глядя на «Сводные метрики», становится ясно, что (по крайней мере) три четверти ваших разделов пусты, поэтому вы устраняете большую часть потенциального распараллеливания, которое может предоставить вам искра.

Хотя это вызовет случайное перемешивание шаг (когда данные перемещаются по сети между различными исполнителями), .repartition() поможет сбалансировать данные по всем разделам и создать более допустимые единицы работы для распределения между доступными ядрами. Это, скорее всего, обеспечит ускорение вашего count().

. Как правило, вы, вероятно, захотите вызвать .repartition() с параметром, установленным как минимум в число ядер в вашем кластере. Установка этого значения приведет к более быстрому выполнению задач (интересно наблюдать за ходом выполнения), хотя добавляет некоторые накладные расходы на управление к общему времени, которое потребуется для выполнения задания. Если задачи слишком малы (т.е. недостаточно данных на раздел), то иногда планировщик запутывается и не будет использовать весь кластер. В целом, поиск правильного количества разделов - это балансирование.

0 голосов
/ 04 марта 2020
  1. Вы добавили псевдоним в столбец "C_ID" как "PC_ID". и после этого вы ищете "C_ID".

  2. И объединение может быть выполнено для одного и того же числа столбцов, ваши таблицы1 и таблица2 имеют разный размер столбцов.

    otherwise you will get:  org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns
    

Пожалуйста, примите сначала позаботимся об этих двух сценариях.

...