spring batch - шаг разбиения для отката всех предыдущих коммитов чанка, когда чанк не удается - PullRequest
2 голосов
/ 02 ноября 2011

Я использую Spring Batch для обработки нескольких файлов с использованием MultiResourcePartitioner, и все средства чтения и записи элементов находятся в области действия шага. Каждый шаг запускает отдельные файлы и фиксирует их в базе данных с интервалом 1000. При возникновении ошибки во время текущей обработки все предыдущие коммиты должны быть откатаны назад, и шаг завершится неудачей. Таким образом, содержимое файла не добавляется в базу данных.

Какой самый лучший способ среди них:

  • Использование распространения транзакции в качестве NESTED.

  • Установка интервала фиксации в чанке с помощью Integer.MAXVALUE, это не будет работать как В файле есть большие элементы, и он не работает с кучей места.

  • любой другой способ выполнения транзакции на уровне шага.

У меня есть образец XML-файла, показанный ниже:

<bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
    <property name="resources" value="classpath:${filepath}" />
</bean>

<bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
        <property name="resource" value="#{stepExecutionContext[fileName]}" />
</bean>

<step id="step1" xmlns="http://www.springframework.org/schema/batch">
    <tasklet transaction-manager="ratransactionManager"   >
        <chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="800" retry-limit="3">
         <retryable-exception-classes>
        <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
     </retryable-exception-classes>
    </chunk>
    <listeners>
        <listener ref="customStepExecutionListener">
        </listener>
    </listeners>
    </tasklet>
    <fail on="FAILED"/>
</step>

ОБНОВЛЕНИЕ:

Кажется, что основная таблица (где происходит прямая вставка) ссылается на другие таблицы и материализованные представления. если я удаляю данные в этой таблице, чтобы удалить устаревшие записи, используя индикатор обработанных столбцов, данные, помещенные в очередь с использованием MV, покажут старые данные. я думаю, что постановочный стол необходим для моего требования.

Для реализации таблицы промежуточных данных для этого требования

  • Создайте еще один параллельный шаг для опроса базы данных и записи данных, значение обработанного столбца которых равно Y.

  • Передача данных в конце каждого успешного завершения файла с помощью пошагового прослушивателя (метод afterStep).

или любые другие предложения.

Ответы [ 3 ]

4 голосов
/ 02 ноября 2011

В целом я согласен с подходом @MichaelLange.Но, возможно, отдельная таблица - это слишком много ... В вашей таблице импорта может быть дополнительный столбец completed, который, если задано значение "ложь", то запись принадлежит файлу, который обрабатывается сейчас (или не удалось обработать).После обработки файла вы запускаете простое обновление для этой таблицы (не должно произойти сбой, поскольку у вас нет никаких ограничений для этого столбца):

update import_table set completed = true where file_name = "file001_chunk1.txt"

Перед обработкойВ файле вы должны удалить «устаревшие» записи:

delete from import_table where file_name = "file001_chunk1.txt"

Это решение будет быстрее и проще для реализации, чем вложенные транзакции.Возможно, при таком подходе вы столкнетесь с блокировками таблиц, но при соответствующем выборе уровня изоляции это можно минимизировать.При желании вы можете создать представление для этой таблицы, чтобы отфильтровать незавершенные записи (включить индекс в столбце completed):

create view import_view as select a, b, c from import_table where completed = true

В общем, я думаю, что вложенные транзакциив этом случае это невозможно, поскольку чанки могут обрабатываться в параллельных потоках, каждый из которых содержит свой собственный контекст транзакции.Менеджер транзакций не сможет запустить вложенную транзакцию в новом потоке, даже если вам каким-то образом удастся создать «основную транзакцию» в «верхнем» потоке работ.


Еще один подход - продолжение"временной таблицы".Процесс импорта должен создать таблицы импорта и присвоить им имена, например, по дате:

import_table_2011_10_01
import_table_2011_10_02
import_table_2011_10_05
...
etc

и «super-veiw», который объединяет все эти таблицы:

create view import_table as
select * from import_table_2011_10_01
union
select * from import_table_2011_10_02
union
select * from import_table_2011_10_05

После успешного импорта «суперпредставление» должно быть воссоздано.

При таком подходе у вас возникнут трудности с внешними ключами для таблицы импорта.


Еще один подход -использовать отдельную базу данных для импорта, а затем передать импортированные данные из базы данных импорта на главную (например, передать двоичные данные).

1 голос
/ 02 ноября 2011

Не можете ли вы попробовать это с помощью стратегии компенсации?

некоторые примеры

  • использовать временную или дополнительную таблицу для данных, перемещать данные в бизнес-таблицу, только если задание выполнено успешно
  • используйте условный поток для вызова шага "deleteAllWrittenItems" в случае возникновения проблемы
0 голосов
/ 15 августа 2015

В проекте с открытым исходным кодом oVirt Майк Колесник, Эли Месика и я внедрили полноценный механизм компенсации.
Вы можете клонировать проект и посмотреть на классы, которые относятся к CompensationContext.
Я экспериментировал с Spring-Batch в последние дни, впервые в своей жизни, и похоже на то, что для пакетной операции, которая во всем состоит из одного и того же типа операции CRUD - например, пакетная вставка с вспомогательным столбцом может Помогите.
Я пытаюсь понять, можем ли мы каким-то образом перехватить идентификатор задания и сохранить его в таблице, содержащей вставленные данные (например, с помощью столбца job_id), или, например, сохранить пару job_id и entity_id в Отдельная таблица, а затем компенсация в случае неудачи задания будет состоять в удалении всех записей за одно задание.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...