JdbcIO и неверные входы - PullRequest
       11

JdbcIO и неверные входы

0 голосов
/ 19 ноября 2018

Я новичок в Apache Beam и у меня есть несколько вопросов.

Я хочу реализовать простой кэш в моем конвейере.Мой конвейер будет запускаться каждый час для записи данных партиями (1000 элементов) в базу данных Cloud SQL и может иметь частичные ошибки в отношении согласованности данных, поскольку ограничения в базе данных будут препятствовать вставке недопустимых данных.В этом случае конвейер вставляет данные частично.Затем мне нужно оценить неправильные данные и повторно запустить конвейер вручную.В этом контексте при следующем запуске конвейера я хочу попытаться вставить только те данные, которые не удалось вставить.

У меня возникла идея написать (почасово) файл кэша в облачном хранилище.он содержит идентификаторы сущностей, которые были записаны в целевой базе данных с помощью JdbcIO, поэтому я могу прочитать этот файл при следующем запуске этого конвейера, чтобы избежать повторной перезаписи одних и тех же данных в базу данных снова и снова.Цель состоит в том, чтобы избежать запуска излишне длинных заданий в базе данных.

Я думал, что конвейер должен быть выполнен следующим образом:

1) Проверьте, есть ли файл кэша в облачном хранилище.Перейдите к шагу 2.1, если там есть файл.Если нет, перейдите к шагу 2.2.

2.1) Прочитайте файл кэша и создайте Map PCollectionView (MapView), чтобы позже использовать его в качестве SideInput.Переходите к шагу 3.

2.2) Обработайте каждый элемент источника.Переходите к шагу 4.

3) Обработайте каждый элемент источника и сравните его с MapView.Если MapView не содержит элемент, выведите его.Если он содержит, ничего не делать.Переходите к шагу 4.

4) Используйте JdbcIO write () для записи PCollection в базу данных.Переходите к шагу 5.

5) В конце конвейера:

5.1) Если конвейер успешно работал без использования файла кэша: удалите все файлы кэша в облачном хранилище.Мне не нужен кеш на этот час.

5.2) Если конвейер успешно работал и использовал файл кеша: ничего не делать.Мне не нужен кеш на этот час.

5.3) Если конвейер успешно работал с некоторыми нефатальными ошибками без использования файла кеша: создайте файл кеша с идентификаторами вставленных сущностей

5.4) Если конвейер успешно работал с некоторыми неустранимыми ошибками и использовал файл кэша: добавьте в файл кэша новые идентификаторы сущности.

5.5) Если выполнение конвейера завершилось неудачно:Ничего не делать.Я считаю, что поток данных или модель луча отката транзакции.Поправьте меня, если я ошибаюсь.

Я также подумал о другом решении, которое могло бы решить мою проблему.Вместо того, чтобы использовать «кэш», я бы попытался вывести неверные пакеты в выходной файл.Затем этот выходной файл будет оцениваться вручную, и при следующем запуске конвейера ему нужно будет только прочитать этот файл как ввод и попытаться вставить его в базу данных.

Проблема в том, что я неПолагайте, что есть способ получить PCollection из строк, правильно записанных в базу данных, используя метод write () JdbcIO.Для меня было бы крайне важно знать, какие идентификаторы сущностей были вставлены в базу данных.

У BigQuery есть несколько опций с WriteResult, но я не верю, что JdbcIO имеет.Могу ли я что-нибудь сделать в этом контексте?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...