Spark DataFrames - Альтернативы для использования cogroup для полного внешнего объединения - PullRequest
0 голосов
/ 09 мая 2019

Я пытаюсь решить проблему с производительностью с помощью одного из заданий Spark и считаю, что у меня проблема с использованием функции "cogroup". Я пытаюсь объединить два фрейма данных (оба очень большие, и ни один из них не может быть транслирован), и простое объединение не будет работать, потому что мне нужно добавить много дополнительной логики обработки.

Вот пример двух DataFrames

Сделки:

+---------+-----------------+--------+
| CardNum | TransactionTime | Amount |
+---------+-----------------+--------+
| ABC     |        20190101 |   10.0 |
| ABC     |        20180501 |   25.0 |
| DEF     |        20181201 |   30.0 |
| ghi     |        20180101 |   20.0 |
+---------+-----------------+--------+

Lookups:

+---------+------------+-----------------+------------------+-------------+
| CardID  | InternalId | RecordStartDate | RecordExpiryDate | AnotherCode |
+---------+------------+-----------------+------------------+-------------+
| abc     |      10001 | 2018-01-01      | 2018-05-20       | A           |
| def     |      10002 | 2018-01-01      | 9999-12-31       | A           |
| def     |      10005 | 2018-01-01      | 9999-12-31       | B           |
| ghi     |      10003 | 2018-01-01      | 9999-12-31       | B           |
| abc     |      20001 | 2018-05-20      | 9999-12-31       | A           |
+---------+------------+-----------------+------------------+-------------+

Ожидаемые результаты:

+---------+-----------------+--------+------------+--------------------------------------------------------------+
| CardNum | TransactionTime | Amount | InternalID |                    Additional Explanation                    |
+---------+-----------------+--------+------------+--------------------------------------------------------------+
| ABC     |        20190101 | 10.0   |       2001 | For this txn time, this internal id matches                  |
| ABC     |        20180501 | 25.0   |       1001 | For an older txn and same card as above, the older id matches         |
| DEF     |        20181201 | 30.0   |       1002 | If two results are valid, pick the internal id with code "A" |
| ghi     |        20180101 | 20.0   |       1003 | Since only one match, keep the returned id                   |
+---------+-----------------+--------+------------+--------------------------------------------------------------+

Как я в настоящее время присоединяюсь к данным:

// Conversion to lowercase is needed because the grouping needs to ignore case
transactionsGroupedDF = transactionsDF.groupByKey(item => item.getAs[String]("CardNum").toLowerCase)
lookupGroupedDF = lookupDF.groupByKey(item => item.getAs[String]("CardID").toLowerCase)


val resultDF = transactionsGroupedDF.cogroup(lookupGroupedDF) {
   case (key, iter1, iter2) =>
     val txnDataList = iter1.toList
     val lookupList = iter2.toList

     txnData.map(item => resolveInternalId(item, lookupTables, key))
}RowEncoder(transactionDF.schema.add("internalID","String)

Мне нужно сделать cogroup здесь, я считаю, потому что мне нужны данные из обоих сценариев, особенно потому, что транзакция должна быть обогащена ее правильным внутренним идентификатором, учитывая: дату транзакции, бизнес-правила, вращающиеся вокруг "AnotherCode" и возможность записи значений исключений для случаев, когда внутренние идентификаторы не могут быть разрешены.

Я считаю, что код работает, как и ожидалось, однако я просто обеспокоен тем, что не выполняю это преобразование оптимальным образом. Меня беспокоит несколько вызовов groupByKey, как и вызов cogroup, так как я не на 100% знаком с ним. Любая обратная связь будет принята с благодарностью. Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...