Spark Streaming от Kafka, обработка является неточной, когда зависит от записей, потребляемых в течение миллисекунд - PullRequest
0 голосов
/ 08 октября 2019

Spark Streaming от Kafka с чувствительной к последовательности обработкой: обработка неточна, когда зависит от записей, потребляемых в течение миллисекунд друг от друга.

Мы используем Spark (в частности, Spark Streaming DStream в приложении Scala) для получения отКафка, обработка, затем запись в HDFS, поверх которой строится таблица Hive. Обработка включает в себя создание uuid на основе логики, обращающейся к различным проверкам полей ввода. После создания uuid он добавляется в таблицы внешних ссылок (в Phoenix) в внешние ссылки со значениями из определенных полей. Дальнейшие записи, которые соответствуют определенным полям, должны назначаться существующему uuid, когда у них есть соответствующие поля в таблице внешних ссылок.

Проблема в том, что параллельная обработка, кажется, назначает несколько uuids, когда это не нужно. Если поступает запись с новым uuid, но другая запись, которая должна соответствовать этому uuid, поступает в течение миллисекунд, она будет обработана слишком быстро и получит новый uuid, а не тот, который был только что создан. Предположительно, это связано с тем, что записи обрабатываются в кластере в параллельной форме, поэтому они не обрабатываются последовательно. Большое количество обрабатываемых данных означает, что параллельная обработка является необходимостью.

Упрощенный пример:

Каждая запись имеет метки времени «A», «B» и «C». Мы присваиваем значениестолбец uuid, основанный на содержимом этих столбцов.

Записи поступают в такой последовательности, как (значения столбцов A и C в этом примере в основном не имеют значения. Кроме того, здесь указывается временная метка, чтобы представлять, когда запись используется,тогда как в действительности он показывает время создания в восходящем направлении и не имеет отношения к обработке):

1. 2019-10-08 10:00:00:00 A1 B1 --
2. 2019-10-08 10:00:00:01 A2 B1 C1
3. 2019-10-08 10:00:20:00 -- B1 C1

Скажем, логика для назначения uuid в этом случае основана на B (хотя на самом деле это несколько варьируется),и новый uuid создается для этого нового значения B в записи 1. Этот uuid и соответствующее значение B (плюс значения для A и C для дальнейших, более сложных проверок) добавляются в строку таблицы внешних ссылок. Вторая запись проверяет таблицу внешних ссылок с помощью B1, но поскольку она обрабатывается параллельно перед записью первой записи в внешние ссылки, она снова создает новый uuid и передает его в xref. Теперь у нас есть результирующие строки в таблице Hive:

A1 B1 -- uuid1
A2 B1 C1 uuid2

, когда они должны иметь одинаковый uuid1. Кроме того, таблица внешних ссылок показывает A2 B1 C1 uuid2, так как вторая запись перезаписана при загрузке, поскольку B1 является первичным ключом. Это не главная проблема, но это означает, что последующим записям, таким как 3, которые сопоставляются с xref, потому что они пришли после того, как прошло достаточно времени, дают второй uuid, и uuid1 никогда не будет назначен снова, оставляя первую строку сиротой.

Это сложная проблема для подведения итогов или поиска, тем более что я довольно новичок во всех концепциях и технологиях. Я очень ценю любую помощь.

...