Spark присоединяется к проблеме производительности - PullRequest
0 голосов
/ 13 февраля 2020

Я пытаюсь объединить исторические и инкрементные данные. Как часть дополнительных данных, я получаю удаления. Ниже приведен случай.

  • исторические данные - 100 записей (20 столбцов, id - ключевой столбец)
  • инкрементные данные - 10 записей (20 столбцов, id - ключевой столбец)

Из 10 записей в инкрементальных данных только 5 будут соответствовать историческим данным.

Теперь мне нужно 100 записей в конечном кадре данных, из которых 95 записей принадлежат историческим данным и 5 записей принадлежат к инкрементным данным (там, где столбец идентификатора совпадает).

Поле обновления времени доступно как в хронологических, так и в инкрементных данных.

Ниже приведен подход, который я пробовал.

    DF1 - Historical  Data
    DF2 - Incremental Delete Dataset
    DF3 = DF1 LEFTANTIJOIN DF2
    DF4 = DF2 INNERJOIN DF1
    DF5 = DF3 UNION DF4

Однако я заметил, что у него много проблем с производительностью, так как я запускаю это объединение на миллиардах записей. Есть ли лучший способ сделать это?

Ответы [ 3 ]

0 голосов
/ 15 февраля 2020

Проблемы с производительностью связаны не только с размером ваших данных. Это зависит от многих других параметров, таких как ключи, которые вы использовали для раздела, размеры ваших разделенных файлов и конфигурация кластера, на котором вы выполняете свою работу. Я бы порекомендовал вам go через официальную документацию по настройке ваших искровых заданий и внести необходимые изменения. https://spark.apache.org/docs/latest/tuning.html

0 голосов
/ 16 февраля 2020

Ниже представлен подход, который я сделал.

historical_data.as("a").join(
incr_data.as("b"),
$"a.id" === $"b.id", "full")
.select(historical_data.columns.map(f => expr(s"""case when a.id=b.id then b.${f} else a.${f} end as $f""")): _*)
0 голосов
/ 14 февраля 2020

вы можете использовать оператор cogroup в сочетании с определенной пользователем функцией для создания различных вариантов объединения. Предположим, у нас есть два этих RDD в качестве примера:

visits = sc.parallelize([("h", "1.2.3.4"), ("a", "3.4.5.6"), ("h","1.3.3.1")] )
pageNames = sc.parallelize([("h", "Home"), ("a", "About"), ("o", "Other")])
cg = visits.cogroup(pageNames).map(lambda x :(x[0], ( list(x[1][0]), list(x[1][1]))))

Вы можете реализовать внутреннее объединение следующим образом:

innerjoin = cg.flatMap(lambda x: J(x))

Где J определен как таковой:

def J(x):
    j=[]
    k=x[0]
    if x[1][0]!=[] and x[1][1]!=[]:
        for l in x[1][0]:
            for r in x[1][1]:
                j.append((k,(l,r)))
    return j

Для правого внешнего соединения, например, вам просто нужно изменить функцию J на ​​функцию roJ, определенную так:

def roJ(x):
    j=[]
    k=x[0]
    if x[1][0]!=[] and x[1][1]!=[]:
        for l in x[1][0]:
            for r in x[1][1]:
                j.append((k,(l,r)))
    elif x[1][1]!=[] :
        for r in x[1][1]:
            j.append((k, (None, r)))
    return j

И вызвать ее так:

rightouterjoin = cg.flatMap(lambda x: roJ(x))

И так для других типов объединения, вы бы sh реализовали

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...