Параметризация условия соединения в pyspark - PullRequest
0 голосов
/ 12 апреля 2019

У меня есть список имен столбцов, который меняется каждый раз. Имена столбцов хранятся в списке. Итак, мне нужно передать имена столбцов из списка (в приведенном ниже примере их id и programid) для сравнения между исходным и целевым фреймами данных. В приведенном ниже примере я хочу проверить, если src_id == id и src_programid == programid.

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import col, when

srccolumns = ['src_id','src_programid']
tgtcolumns = ['id','programid']

joinSrcTgtAction =  joinSrcTgt.withColumn(
    'action', 
    when(
        (
            (col(src_id) == col(id)) & 
            (col(src_programid) == col(programid)) & 
            (joinSrcTgt.src_checksum != joinSrcTgt.checksum)
        ),
        'upsert'
    )
)

1 Ответ

0 голосов
/ 13 апреля 2019

Предполагая, что списки имеют одинаковый размер и гарантируют порядок имен столбцов, поэтому мы можем сопоставить пару по индексу, вы можете сделать что-то вроде следующего:

src_columns = ['src_id','src_programid']
tgt_columns = ['id','programid']

condition = True
for i in range(0, len(src_columns)):
    condition &= (join_src_tgt[src_columns[i]] == join_src_tgt[dest_columns[i]])

join_src_tgt_action =  join_src_tgt.withColumn(
    'action', 
    when(condition & (join_src_tgt.src_checksum != join_src_tgt.checksum), 'upsert')
)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...