Блоки данных Azure Отсутствуют записи при потоковой передаче в хранилище данных SQL - PullRequest
0 голосов
/ 27 октября 2018

Сначала у меня были следующие инструкции, и когда я загружал 20 000 файлов, я получал 20 000 записей в БД (каждый файл содержит только 1 запись).

aTracking = sqlContext.read.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")

aTracking.write \
    .option('user', dwUser) \
    .option('password', dwPass) \
    .jdbc('jdbc:sqlserver://' + dwServer + ':' + dwJdbcPort + ';database=' + dwDatabase, 'stg_tr_energy_xmlin.csv_in', mode = 'append' )

Затем, в целях скорости, я подумал, что было бы лучшепоток с Polybase ... закодирован как ... но там я получил только + - 17.000 записей.

aTracking = spark.readStream.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")

aTracking.writeStream \
         .format("com.databricks.spark.sqldw") \
         .option("url", sqlDwUrl) \
         .option("tempDir", "wasbs://uploaddw@" + blobStorage + ".blob.core.windows.net/stream") \
         .option("forwardSparkAzureStorageCredentials", "true") \
         .option("dbTable", "stg_tr_energy_xmlin.csv_in") \
         .option("checkpointLocation", "/checkpoint") \
         .start()

Есть предложения, что может вызвать это?

1 Ответ

0 голосов
/ 29 октября 2018

Состояние вашего запроса структурированной потоковой передачи отслеживается в местоположении контрольной точки.«Предполагается, что каждый источник потоковой передачи имеет смещения (аналогичные смещениям Кафки (...)) для отслеживания позиции чтения в потоке. Движок использует контрольные точки и журналы записи вперед для записи диапазона смещения данных, обрабатываемых в каждом триггере.».См. Документация Spark (поиск контрольных точек) для получения более подробной информации.

Поэтому, если вы хотите повторно обработать все ваши файлы, удалите каталог местоположения контрольной точки (или определите новый), определенный в:

.option("checkpointLocation", "/checkpoint"). 

Дополнительно проверяется файл _spark_metadata в целевом каталоге, поэтому если вы хотите, чтобы все данные записывались снова, вам также следует очистить целевой каталог (с помощью хранилища данных SQL Azure временный каталог).

...