Я работаю над проектом, который включает чтение данных из RDBMS с использованием JDB C, и мне удалось прочитать данные. Это то, чем я буду заниматься постоянно, еженедельно. Поэтому я пытался придумать способ, чтобы после первоначального чтения последующие извлекали только обновленные записи, а не извлекали все данные из таблицы. Я могу сделать это с помощью инкрементального импорта sq oop, указав три параметра (--check-column, --incremental last-modified/append
и --last-value)
. Однако я не хочу использовать sq oop для этого. Есть ли способ, чтобы я мог повторить то же самое в Spark с Scala?
Во-вторых, некоторые таблицы не имеют уникального столбца, который можно использовать как partitionColumn
, поэтому я подумал об использовании номера строки функция, чтобы добавить уникальный столбец в эти таблицы и затем получить MIN
и MAX
уникального столбца как lowerBound
и upperBound
соответственно. Теперь моя задача состоит в том, чтобы динамически проанализировать эти значения в операторе read, как показано ниже:
val queryNum = "select a1.*, row_number() over (order by sales) as row_nums from (select * from schema.table) a1"
val df = spark.read.format("jdbc").
option("driver", driver).
option("url",url ).
option("partitionColumn",row_nums).
option("lowerBound", min(row_nums)).
option("upperBound", max(row_nums)).
option("numPartitions", some value).
option("fetchsize",some value).
option("dbtable", queryNum).
option("user", user).
option("password",password).
load()
Я знаю, что приведенный выше код неверен и может пропустить множество процессов, но я предполагаю, что он даст общий обзор того, чего я пытаюсь достичь здесь.