Spring Batch - шаги по повышению производительности - PullRequest
12 голосов
/ 17 ноября 2011

В настоящее время я занимаюсь разработкой загрузчиков данных. Чтение файла и запись в базу данных. Я использую обработчик разделов для обработки нескольких файлов, разделенных запятыми, в 30 потоках. Я хочу масштабировать и иметь пропускную способность. Ежедневно я получаю 15000 файлов (каждый из которых содержит 1 миллион записей), как мне масштабировать с помощью Spring Batch. Я хочу, чтобы работа завершила это в течение дня. может сделать это честно, или есть какие-то простые шаги точной настройки.

Пружинный загрузчик данных работает автономно. Там нет веб-контейнера. он работает на одной машине Solaris, имеющей 24 процессора. Данные записываются в одну базу данных. Обеспечивается изоляция по умолчанию и распространение. Конфигурация xml приведена ниже:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <!--  IMPORT DB CONFIG -->
    <import resource="classpath:bom/bom/bomloader/job/DataSourcePoolConfig.xml" />

     <!-- USE ANNOTATIONS TO CONFIGURE SPRING BEANS -->
    <context:component-scan base-package="bom.bom.bom" />

    <!--  INJECT THE PROCESS PARAMS HASHMAP BEFORE CONTEXT IS INITIALISED -->
    <bean id="holder" class="bom.bom.bom.loader.util.PlaceHolderBean" >
        <property name="beanName" value="holder"/>
    </bean>

    <bean id="logger" class="bom.bom.bom.loader.util.PlaceHolderBean"  >
        <property name="beanName" value="logger"/>
    </bean>

    <bean id="dataMap" class="java.util.concurrent.ConcurrentHashMap" />

    <!--  JOB REPOSITORY - WE USE DATABASE REPOSITORY  -->

<!--      <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" >-->
<!--            <property name="transactionManager" ref="frdtransactionManager" />-->
<!--            <property name="dataSource" ref="frddataSource" />-->
<!--            <property name="databaseType" value="oracle" />-->
<!--            <property name="tablePrefix" value="batch_"/>           -->
<!--      </bean>-->

   <!--  JOB REPOSITORY - WE IN MEMORY REPOSITORY  -->

    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
            <property name="transactionManager" ref="frdtransactionManager" />
    </bean>

<!--    <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">-->
<!--        <property name="dataSource" ref="frddataSource" />-->
<!--        <property name="tablePrefix" value="batch_"/>  -->
<!--    </bean>-->

   <!--  LAUNCH JOBS FROM A REPOSITORY -->

      <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
         <property name="jobRepository" ref="jobRepository" />
           <property name="taskExecutor">
                <bean class="org.springframework.core.task.SyncTaskExecutor" />
           </property>      
      </bean>

   <!--  CONFIGURE SCHEDULING IN QUARTZ -->
<!--     <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">-->
<!--        <property name="jobClass" value="bom.bom.bom.assurance.core.JobLauncherDetails" />-->
<!--        <property name="group" value="quartz-batch" />-->
<!--        <property name="jobDataAsMap">-->
<!--            <map>-->
<!--                <entry key="jobName" value="${jobname}"/>-->
<!--                <entry key="jobLocator" value-ref="jobRegistry"/>-->
<!--                <entry key="jobLauncher" value-ref="jobLauncher"/>-->
<!--            </map>-->
<!--        </property>-->
<!--     </bean>-->

      <!--  RUN EVERY 2 HOURS -->
<!--    <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">-->
<!--      <property name="triggers">-->
<!--        <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">-->
<!--          <property name="jobDetail" ref="jobDetail" />-->
<!--          <property name="cronExpression" value="2/0 * * * * ?" />-->
<!--        </bean>-->
<!--      </property>-->
<!--    </bean>-->
<!--    -->

     <!--  RUN STANDALONE -->
    <bean id="jobRunner" class="bom.bom.bom.loader.core.DataLoaderJobRunner">
        <constructor-arg value="${LOADER_NAME}" />  
    </bean>

    <!-- Get all the files for the exchanges and feed as resource to the MultiResourcePartitioner -->

    <bean id="fileresource" class="bom.bom.bom.loader.util.FiltersFoldersResourceFactory" p:dataMap-ref="dataMap">
        <property name="filePath" value="${PARENT_PATH}" />
        <property name="acceptedFolders"  value="${EXCH_CODE}" />
        <property name="logger" ref="logger" />
    </bean>

    <!--  The network  Data Loading Configuration goes here  -->      

    <job id="CDR_network _PARALLEL"     xmlns="http://www.springframework.org/schema/batch" restartable="false" >
        <step id="PREPARE_CLEAN" >
            <flow parent="prepareCleanFlow" />
            <next on="COMPLETED" to="LOAD_EXCHANGE_DATA" />
            <fail on="FAILED"  exit-code="Failed on cleaning error records."/>                    
       </step>
        <step id="LOAD_EXCHANGE_DATA"  >
            <tasklet ref="businessData" transaction-manager="ratransactionManager"  />
            <next on="COMPLETED" to="LOAD_CDR_FILES" />
            <fail on="FAILED" exit-code="FAILED ON LOADING EXCHANGE INFORMATION FROM DB." />
        </step>
        <step id="LOAD_CDR_FILES"  >
            <tasklet ref="fileresource" transaction-manager="frdtransactionManager" />
            <next on="COMPLETED" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
            <fail on="FAILED" exit-code="FAILED ON LOADING CDR FILES." />
        </step>        
        <step id="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" next="limitDecision" >
            <partition step="filestep" partitioner="filepartitioner" >
                <handler grid-size="100" task-executor="executorWithCallerRunsPolicy" />
            </partition>
        </step>
        <decision id="limitDecision" decider="limitDecider"> 
            <next on="COMPLETED" to="MOVE_RECS_STAGING_TO_MAIN_TABLE" />
            <next on="CONTINUE" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
        </decision>     
        <step id="MOVE_RECS_STAGING_TO_MAIN_TABLE" >
            <tasklet ref="moveRecords" transaction-manager="ratransactionManager" >
                <transaction-attributes isolation="SERIALIZABLE"/>
            </tasklet>          
            <fail on="FAILED" exit-code="FAILED ON MOVING DATA TO THE MAIN TABLE." />           
            <next on="*"  to="PREPARE_ARCHIVE"/>                        
        </step>     
        <step id="PREPARE_ARCHIVE" >
            <flow parent="prepareArchiveFlow" />
            <fail on="FAILED" exit-code="FAILED ON Archiving files" />  
            <end on="*" />                    
       </step>
    </job>

    <flow id="prepareCleanFlow" xmlns="http://www.springframework.org/schema/batch">
        <step id="CLEAN_ERROR_RECORDS" next="archivefileExistsDecisionInFlow" >
            <tasklet ref="houseKeeping"  transaction-manager="ratransactionManager" />
        </step>   
        <decision id="archivefileExistsDecisionInFlow" decider="archivefileExistsDecider">
           <end on="NO_ARCHIVE_FILE" />
           <next on="ARCHIVE_FILE_EXISTS" to="runprepareArchiveFlow" />
        </decision>     
        <step id="runprepareArchiveFlow"  >
            <flow parent="prepareArchiveFlow" />    
        </step>  
    </flow>

    <flow id="prepareArchiveFlow" xmlns="http://www.springframework.org/schema/batch" >
          <step id="ARCHIVE_CDR_FILES" >
            <tasklet ref="archiveFiles" transaction-manager="frdtransactionManager" />          
         </step>
    </flow>

    <bean id="archivefileExistsDecider" class="bom.bom.bom.loader.util.ArchiveFileExistsDecider" >
        <property name="logger" ref="logger" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />   
    </bean>

    <bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step" >
        <property name="resources" value="#{dataMap[processFiles]}"/>
    </bean>

    <task:executor id="executorWithCallerRunsPolicy"
                   pool-size="90-95"
                   queue-capacity="6"
                   rejection-policy="CALLER_RUNS"/>

<!-- <bean id="dynamicJobParameters" class="bom.bom.bom.assurance.core.DynamicJobParameters" />-->


    <bean id="houseKeeping" class="bom.bom.bom.loader.core.HousekeepingOperation">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />   
    </bean>

    <bean id="businessData" class="bom.bom.bom.loader.core.BusinessValidatorData">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="NrajdbcTemplate" />  
        <property name="param" value="${EXCH_CODE}" />
        <property name="sql" value="${LOOKUP_QUERY}" /> 
    </bean>

    <step id="filestep" xmlns="http://www.springframework.org/schema/batch">
        <tasklet transaction-manager="ratransactionManager" allow-start-if-complete="true"  >
            <chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="500" retry-limit="2">
             <retryable-exception-classes>
                <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
             </retryable-exception-classes>
            </chunk>
            <listeners>
                <listener ref="customStepExecutionListener">
                </listener>
            </listeners>
        </tasklet>
    </step> 

    <bean id="moveRecords"  class="bom.bom.bom.loader.core.MoveDataFromStaging">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />
    </bean>

    <bean id="archiveFiles" class="bom.bom.bom.loader.core.ArchiveCDRFile" >
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="frdjdbcTemplate" />  
        <property name="archiveFlag" value="${ARCHIVE_FILE}" />
        <property name="archiveDir" value="${ARCHIVE_LOCATION}" />
    </bean>

    <bean id="limitDecider" class="bom.bom.bom.loader.util.LimitDecider" p:dataMap-ref="dataMap">
        <property name="logger" ref="logger" />
    </bean>

<!--    <bean id="multifileReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step" >-->
<!--        <property name="resources" value="#{stepExecutionContext[fileName]}" />-->
<!--        <property name="delegate" ref="fileItemReader" />-->
<!--    </bean>-->

    <!--  READ EACH FILE PARALLELY   -->    
    <bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
            <property name="resource" value="#{stepExecutionContext[fileName]}" />
            <property name="saveState" value="false" />         
    </bean>

    <!--  LISTEN AT THE END OF EACH FILE TO DO POST PROCESSING  -->
    <bean id="customStepExecutionListener" class="bom.bom.bom.loader.core.StagingStepExecutionListener" scope="step">
        <property name="logger" ref="logger" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />   
        <property name="sql" value="${INSERT_IA_QUERY_COLUMNS}" />

    </bean>

    <!--  CONFIGURE THE ITEM PROCESSOR TO DO  BUSINESS LOGIC ON EACH ITEM -->
    <bean id="itemProcessor" class="bom.bom.bom.loader.core.StagingLogicProcessor"  scope="step">
        <property name="logger" ref="logger" />
        <property name="params" ref="businessData" />
    </bean>

    <!--  CONFIGURE THE JDBC ITEM WRITER TO WRITE IN TO DB -->
    <bean id="jdbcItenWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"  scope="step">
        <property name="dataSource" ref="radataSource"/>
        <property name="sql"> 
            <value>
                <![CDATA[
                    ${SQL1A}
                ]]>
            </value>
        </property>
        <property name="itemSqlParameterSourceProvider"> 
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider">
            </bean>
        </property>
    </bean>

<!--    <bean id="itemWriter" class="bom.bom.bom.assurance.core.LoaderDBWriter" scope="step">-->
<!--        <property name="sQL" value="${loader.sql}" />-->
<!--        <property name="jdbcTemplate" ref="NrajdbcTemplate" />-->
<!--    </bean>-->


    <!--  CONFIGURE THE FLAT FILE ITEM READER TO READ INDIVIDUAL BATCH -->
    <bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">
        <property name="strict" value="false"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
                        <property name="names" value="${COLUMNS}" />
                        <property name="columns" value="${RANGE}" />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="bom.bom.bom.loader.util.DataLoaderMapper">
                        <property name="params" value="${BEANPROPERTIES}"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Пробовал:

  • я мог видеть, что ThreadPoolExecutor зависает через 3 часа. Prstat в solaris говорит, что он обрабатывает, но не обрабатывает в журнале.

  • Пробовал с меньшим размером порции 500, из-за отпечатка памяти, без прогресса.

  • Так как он вставляется в одну базу данных (30 пулов соединений). Есть ли что-нибудь, что я могу сделать здесь.

Экземпляры из визуального vm

enter image description here

стек трассировки потока все заблокированы на уровне соединения

Full thread dump Java HotSpot(TM) Server VM (11.3-b02 mixed mode):

"Attach Listener" daemon prio=3 tid=0x00bbf800 nid=0x26 waiting on condition [0x00000000..0x00000000]
   java.lang.Thread.State: RUNNABLE

"executorWithCallerRunsPolicy-1" prio=3 tid=0x008a7000 nid=0x25 runnable [0xd5a7d000..0xd5a7fb70]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at oracle.net.ns.Packet.receive(Packet.java:240)
        at oracle.net.ns.DataPacket.receive(DataPacket.java:92)
        at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:172)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:117)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:92)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:77)
        at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1034)
        at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1(T4CMAREngine.java:1010)
        at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:588)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:953)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1222)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
        at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3468)
        - locked <0xdbdafa30> (a oracle.jdbc.driver.T4CConnection)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1350)
        at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:818)
        at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:1)
        at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:587)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:812)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:868)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:876)
        at 

Ответы [ 4 ]

3 голосов
/ 15 декабря 2011

Я бы посоветовал вам уменьшить размер куска до 50.

500 кажется слишком большим: вы слишком много ждете во время разговора с БД.

Одновременно уменьшите размер пула TaskExecutor или увеличьте размер пула БД. Вы можете выбрать, какой из них, наблюдая за хостом БД: если это ЦП, а ввод-вывод не максимален, увеличьте размер пула БД, чтобы увеличить загрузку БД. Если ваш ЦП БД уже на максимуме, уменьшите размер пула TaskExecutor. Цель состоит в том, чтобы иметь жидкий процесс.

Я думаю, что БД будет вашим основным ограничивающим элементом. Поэтому начните с настройки размера пула БД в соответствии с емкостью хоста БД. Когда это будет сделано, настройте размер пула TaskExecutor в соответствии с размером пула БД (размер пула TE = размер пула БД * 1,5), а также емкости хоста пакета (ЦП, память и операции ввода-вывода).

Разделение входящих файлов на несколько жестких дисков также может помочь (если это возможно).

0 голосов
/ 02 ноября 2016
  1. Ваша пакетная работа полагается на рефлексию (например, BeanPropertyRowMapper)?Это может снизить производительность.
  2. Если ваша база данных вызывает проблемы, вы можете ее профилировать.Здесь не так много конкретных предложений.
  3. Уже упоминалось, отбросьте размер куска.
0 голосов
/ 22 марта 2013

просто исправление в вашем выражении cron.Это верно для 2 часов:

0 0 0/2 1/1 *?*

0 голосов
/ 21 декабря 2012

Я думаю, что проблема здесь в миллионе записей в файле.Поскольку вы уже уменьшили размер чанка, вы должны обрабатывать меньшие записи.Ради тестирования уменьшите количество записей в каждом файле до 10 КБ.Я предполагаю, что вы создаете объекты, выполняете некоторую обработку, и вы делаете это для 1м записей в цикле.Каждый поток будет держать объект в памяти, пока обработка не будет завершена.Я предполагаю, что из-за объема данных в вашей памяти слишком много объектов, которые не являются сборщиком мусора.Если уменьшение размера помогает, тогда вы можете попытаться использовать легкие объекты в своем коде и попытаться установить для каждого объекта значение null в конце обработки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...