Инкрементное и параллельное чтение из RDBMS в Spark с использованием JDBC - PullRequest
0 голосов
/ 18 февраля 2020

Я работаю над проектом, который включает чтение данных из RDBMS с использованием JDB C, и мне удалось прочитать данные. Это то, чем я буду заниматься постоянно, еженедельно. Поэтому я пытался придумать способ, чтобы после первоначального чтения последующие извлекали только обновленные записи, а не извлекали все данные из таблицы. Я могу сделать это с помощью инкрементального импорта sq oop, указав три параметра (--check-column, --incremental last-modified/append и --last-value). Однако я не хочу использовать sq oop для этого. Есть ли способ, чтобы я мог повторить то же самое в Spark с Scala?

Во-вторых, некоторые таблицы не имеют уникального столбца, который можно использовать как partitionColumn, поэтому я подумал об использовании номера строки функция, чтобы добавить уникальный столбец в эти таблицы и затем получить MIN и MAX уникального столбца как lowerBound и upperBound соответственно. Теперь моя задача состоит в том, чтобы динамически проанализировать эти значения в операторе read, как показано ниже:

val queryNum = "select a1.*, row_number() over (order by sales) as row_nums from (select * from schema.table) a1"

val df = spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",row_nums).
option("lowerBound", min(row_nums)).
option("upperBound", max(row_nums)).
option("numPartitions", some value).
option("fetchsize",some value).
option("dbtable", queryNum).
option("user", user).
option("password",password).
load()

Я знаю, что приведенный выше код неверен и может пропустить множество процессов, но я предполагаю, что он даст общий обзор того, чего я пытаюсь достичь здесь.

1 Ответ

0 голосов
/ 07 апреля 2020

Удивительно сложно обрабатывать инкрементные чтения JDB C в Spark. ИМХО, это сильно ограничивает простоту создания многих приложений и может не стоить ваших проблем, если Sq oop выполняет эту работу.

Однако это выполнимо. См. Эту ветку для примера использования опции dbtable:

Apache Spark выбирает все строки

Чтобы сохранить эту работу идемпотентной, вам нужно прочитать в Максимальная строка вашего предыдущего вывода либо непосредственно из загрузки всех файлов данных, либо через файл журнала, который вы записываете каждый раз. Если ваши файлы данных имеют большой размер, вам может понадобиться использовать файл журнала, если его меньше, вы можете потенциально его загрузить.

...