Я пытаюсь решить проблему с производительностью с помощью одного из заданий Spark и считаю, что у меня проблема с использованием функции "cogroup". Я пытаюсь объединить два фрейма данных (оба очень большие, и ни один из них не может быть транслирован), и простое объединение не будет работать, потому что мне нужно добавить много дополнительной логики обработки.
Вот пример двух DataFrames
Сделки:
+---------+-----------------+--------+
| CardNum | TransactionTime | Amount |
+---------+-----------------+--------+
| ABC | 20190101 | 10.0 |
| ABC | 20180501 | 25.0 |
| DEF | 20181201 | 30.0 |
| ghi | 20180101 | 20.0 |
+---------+-----------------+--------+
Lookups:
+---------+------------+-----------------+------------------+-------------+
| CardID | InternalId | RecordStartDate | RecordExpiryDate | AnotherCode |
+---------+------------+-----------------+------------------+-------------+
| abc | 10001 | 2018-01-01 | 2018-05-20 | A |
| def | 10002 | 2018-01-01 | 9999-12-31 | A |
| def | 10005 | 2018-01-01 | 9999-12-31 | B |
| ghi | 10003 | 2018-01-01 | 9999-12-31 | B |
| abc | 20001 | 2018-05-20 | 9999-12-31 | A |
+---------+------------+-----------------+------------------+-------------+
Ожидаемые результаты:
+---------+-----------------+--------+------------+--------------------------------------------------------------+
| CardNum | TransactionTime | Amount | InternalID | Additional Explanation |
+---------+-----------------+--------+------------+--------------------------------------------------------------+
| ABC | 20190101 | 10.0 | 2001 | For this txn time, this internal id matches |
| ABC | 20180501 | 25.0 | 1001 | For an older txn and same card as above, the older id matches |
| DEF | 20181201 | 30.0 | 1002 | If two results are valid, pick the internal id with code "A" |
| ghi | 20180101 | 20.0 | 1003 | Since only one match, keep the returned id |
+---------+-----------------+--------+------------+--------------------------------------------------------------+
Как я в настоящее время присоединяюсь к данным:
// Conversion to lowercase is needed because the grouping needs to ignore case
transactionsGroupedDF = transactionsDF.groupByKey(item => item.getAs[String]("CardNum").toLowerCase)
lookupGroupedDF = lookupDF.groupByKey(item => item.getAs[String]("CardID").toLowerCase)
val resultDF = transactionsGroupedDF.cogroup(lookupGroupedDF) {
case (key, iter1, iter2) =>
val txnDataList = iter1.toList
val lookupList = iter2.toList
txnData.map(item => resolveInternalId(item, lookupTables, key))
}RowEncoder(transactionDF.schema.add("internalID","String)
Мне нужно сделать cogroup здесь, я считаю, потому что мне нужны данные из обоих сценариев, особенно потому, что транзакция должна быть обогащена ее правильным внутренним идентификатором, учитывая: дату транзакции, бизнес-правила, вращающиеся вокруг "AnotherCode" и возможность записи значений исключений для случаев, когда внутренние идентификаторы не могут быть разрешены.
Я считаю, что код работает, как и ожидалось, однако я просто обеспокоен тем, что не выполняю это преобразование оптимальным образом. Меня беспокоит несколько вызовов groupByKey
, как и вызов cogroup
, так как я не на 100% знаком с ним.
Любая обратная связь будет принята с благодарностью. Спасибо!