Перезапускаем Spring Batch ItemReader - PullRequest
0 голосов
/ 17 января 2019

Программа Spring Batch, над которой я работаю, читает данные из таблицы. Он использует org.springframework.batch.item.database.JdbcCursorItemReader ’itemReader. Ранее планировалось изменить таблицу, добавить флаг PROCESSED_INDICATOR и предварительно заполнить его статусом «В ОЖИДАНИИ». Как только запись будет обработана, и средство записи обновит статус флага PROCESSED_INDICATOR на «Обработано». Это для поддержки перезапуска. Например, если пакет забирает 1 миллион записей и умирает в ½ миллиона записей, тогда, когда я перезапускаю пакет; это должно начаться там, где я остановился. Но, к сожалению, руководство не одобрило это решение. Я копаюсь в способах сделать itemreader перезапускаемым Согласно документации Spring «Большинство ItemReaders имеют гораздо более сложную логику перезапуска. Например, JdbcCursorItemReader хранит идентификатор последней обработанной строки в Курсоре. ”

У кого-нибудь есть пример такого пользовательского ридера, который реализует JdbcCursorItemReader и сохраняет последнюю обработанную строку в курсоре. https://docs.spring.io/spring-batch/trunk/reference/html/readersAndWriters.html

== ПОЛНАЯ КОНФИГУРАЦИЯ XML ==

<import resource="classpath:/batch/utility/skip/batch_skip.xml" />
<import resource="classpath:/batch/config/context-postgres.xml" />
<import resource="classpath:/batch/config/oracle-database.xml" />

<context:property-placeholder
    location="classpath:/batch/jobs/TPF-1001-DD-01/TPF-1001-DD-01.properties" />
<bean id="gridSizePartitioner"
    class="com.tpf.partitioner.GridSizePartitioner" />

      <task:executor id="taskExecutor" pool-size="${pool.size}" />
<batch:job id="XYZJob" job-repository="jobRepository"
    restartable="true">

    <batch:step id="XYZSTEP">
        <batch:description>Convert TIF files to PDF</batch:description>
        <batch:partition partitioner="gridSizePartitioner">

            <batch:handler task-executor="taskExecutor"
                grid-size="${pool.size}" />
            <batch:step>
                <batch:tasklet allow-start-if-complete="true">
                    <batch:chunk commit-interval="${commit.interval}"
                        skip-limit="${job.skip.limit}">

                        <batch:reader>
                            <bean id="timeReader"
                                class="org.springframework.batch.item.database.JdbcCursorItemReader"
                                scope="step">
                                <property name="dataSource" ref="oracledataSource" />
                                <property name="sql">
                                    <value>                                     
                                    select TIME_ID as timesheetId,count(*),max(CREATION_DATETIME) as creationDateTime , ILN_NUMBER as ilnNumber
                                    from TS_FAKE_NAME
                                    where creation_datetime  >= '#{jobParameters['creation_start_date1']} 12.00.00.000000000 AM' 
                                    and creation_datetime &lt;  '#{jobParameters['creation_start_date2']} 11.59.59.999999999 PM' 
                                    and mod(time_id,${pool.size})=#{stepExecutionContext['partition.id']} 
                                    group by  time_id ,ILN_NUMBER                                   

                                    </value>
                                </property>
                                <property name="rowMapper">
                                    <bean
                                        class="org.springframework.jdbc.core.BeanPropertyRowMapper">
                                        <property name="mappedClass"
                                            value="com.tpf.model.Time" />
                                    </bean>
                                </property>
                            </bean>
                        </batch:reader>
                        <batch:processor>
                            <bean id="compositeItemProcessor"
                                class="org.springframework.batch.item.support.CompositeItemProcessor">
                                <property name="delegates">
                                    <list>
                                        <ref bean="timeProcessor" />
                                    </list>
                                </property>

                            </bean>
                        </batch:processor>


                        <batch:writer>
                            <bean id="compositeItemWriter"
                                class="org.springframework.batch.item.support.CompositeItemWriter">
                                <property name="delegates">
                                    <list>
                                        <ref bean="timeWriter" />
                                    </list>
                                </property>
                            </bean>
                        </batch:writer>
                        <batch:skippable-exception-classes>
                            <batch:include
                                class="com.utility.skip.BatchSkipException" />
                        </batch:skippable-exception-classes>
                        <batch:listeners>
                            <batch:listener ref="batchSkipListener" />
                        </batch:listeners>
                    </batch:chunk>
                </batch:tasklet>
            </batch:step>
        </batch:partition>
    </batch:step>
    <batch:validator>
        <bean
            class="org.springframework.batch.core.job.DefaultJobParametersValidator">
            <property name="requiredKeys">
                <list>
                    <value>batchRunNumber</value>
                    <value>creation_start_date1</value>
                    <value>creation_start_date2</value>
                </list>
            </property>
        </bean>
    </batch:validator>
</batch:job>



<bean id="timesheetWriter" class="com.tpf.writer.TimeWriter"
    scope="step">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="timeProcessor"
    class="com.tpf.processor.TimeProcessor" scope="step">
    <property name="dataSource" ref="oracledataSource" />
</bean> 

1 Ответ

0 голосов
/ 17 января 2019

У кого-нибудь есть пример такого пользовательского ридера, который реализует JdbcCursorItemReader и сохраняет последнюю обработанную строку в курсоре

JdbcCursorItemReader делает это, см. Javadoc , вот выдержка:

ExecutionContext: The current row is returned as restart data,
and when restored from that same data, the cursor is opened and the current row
set to the value within the restart data.

Так что вам не нужен специальный ридер.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...