Я новичок в Apache Beam и у меня есть несколько вопросов.
Я хочу реализовать простой кэш в моем конвейере.Мой конвейер будет запускаться каждый час для записи данных партиями (1000 элементов) в базу данных Cloud SQL и может иметь частичные ошибки в отношении согласованности данных, поскольку ограничения в базе данных будут препятствовать вставке недопустимых данных.В этом случае конвейер вставляет данные частично.Затем мне нужно оценить неправильные данные и повторно запустить конвейер вручную.В этом контексте при следующем запуске конвейера я хочу попытаться вставить только те данные, которые не удалось вставить.
У меня возникла идея написать (почасово) файл кэша в облачном хранилище.он содержит идентификаторы сущностей, которые были записаны в целевой базе данных с помощью JdbcIO, поэтому я могу прочитать этот файл при следующем запуске этого конвейера, чтобы избежать повторной перезаписи одних и тех же данных в базу данных снова и снова.Цель состоит в том, чтобы избежать запуска излишне длинных заданий в базе данных.
Я думал, что конвейер должен быть выполнен следующим образом:
1) Проверьте, есть ли файл кэша в облачном хранилище.Перейдите к шагу 2.1, если там есть файл.Если нет, перейдите к шагу 2.2.
2.1) Прочитайте файл кэша и создайте Map PCollectionView (MapView), чтобы позже использовать его в качестве SideInput.Переходите к шагу 3.
2.2) Обработайте каждый элемент источника.Переходите к шагу 4.
3) Обработайте каждый элемент источника и сравните его с MapView.Если MapView не содержит элемент, выведите его.Если он содержит, ничего не делать.Переходите к шагу 4.
4) Используйте JdbcIO write () для записи PCollection в базу данных.Переходите к шагу 5.
5) В конце конвейера:
5.1) Если конвейер успешно работал без использования файла кэша: удалите все файлы кэша в облачном хранилище.Мне не нужен кеш на этот час.
5.2) Если конвейер успешно работал и использовал файл кеша: ничего не делать.Мне не нужен кеш на этот час.
5.3) Если конвейер успешно работал с некоторыми нефатальными ошибками без использования файла кеша: создайте файл кеша с идентификаторами вставленных сущностей
5.4) Если конвейер успешно работал с некоторыми неустранимыми ошибками и использовал файл кэша: добавьте в файл кэша новые идентификаторы сущности.
5.5) Если выполнение конвейера завершилось неудачно:Ничего не делать.Я считаю, что поток данных или модель луча отката транзакции.Поправьте меня, если я ошибаюсь.
Я также подумал о другом решении, которое могло бы решить мою проблему.Вместо того, чтобы использовать «кэш», я бы попытался вывести неверные пакеты в выходной файл.Затем этот выходной файл будет оцениваться вручную, и при следующем запуске конвейера ему нужно будет только прочитать этот файл как ввод и попытаться вставить его в базу данных.
Проблема в том, что я неПолагайте, что есть способ получить PCollection из строк, правильно записанных в базу данных, используя метод write () JdbcIO.Для меня было бы крайне важно знать, какие идентификаторы сущностей были вставлены в базу данных.
У BigQuery есть несколько опций с WriteResult, но я не верю, что JdbcIO имеет.Могу ли я что-нибудь сделать в этом контексте?