Spring Batch: как управлять подключением многопоточных заданий к одному источнику данных (проблема с подключением источника данных считывателя) - PullRequest
1 голос
/ 03 февраля 2020

У меня есть многопоточные задания (6 заданий выполняются одновременно), и в этих заданиях используются 3 источника данных (2 на первом источнике данных, еще 2 на втором источнике данных и последние 2 на третьем источнике данных).

/*
 **********************************
 *
 * DATASOURCES CONFIG
 *
 **********************************
 */

@Bean
@Primary
@ConfigurationProperties(prefix = "spring.datasource.first")
public DataSource datasource() {
    return DataSourceBuilder.create().build();
}

@Bean(name="datasourcereplicaone")
@ConfigurationProperties(prefix = "spring.datasource.replicaone")
public DataSource datasourceReplicaOne() {
    return DataSourceBuilder.create().build();
}

@Bean(name="datasourcereplicatwo")
@ConfigurationProperties(prefix = "spring.datasource.replicatwo")
public DataSource datasourceReplicaTwo() {
    return DataSourceBuilder.create().build();
}

Проблема с itemReader:

@Bean(name = "readerfeedsurvcapacity", destroyMethod = "")
@StepScope
public JdbcCursorItemReader<Long> readerfeedsurvcapacity(@Value("#{jobParameters['exchangeIdSession']}")String exchangeIdSession,@Value("#{jobParameters['hostbase_param']}")String hostbase_param) {
    //recupere en seconde le retard
    String sqlRequest = "My sql request that run in 0.4 sec";
    logger.debug("[BatchConfiguration][readerfeedsurvcapacity] Requete SQL du job feedsurvcapacity : " + sqlRequest);

    JdbcCursorItemReader<Long> dbReader = new JdbcCursorItemReader<>();
    switch (hostbase_param) {
    case "PRIMARY":
        dbReader.setDataSource(datasource());
        break;
    case "R1":
        dbReader.setDataSource(datasourceReplicaOne());
        break;
    default:
        dbReader.setDataSource(datasourceReplicaTwo());
        break;
    }

    dbReader.setSql(sqlRequest);
    dbReader.setRowMapper(new InstrumentSessionRowMapper());
    return dbReader;
}

Каждую минуту они будут 2 из 6 заданий одновременно, которые будут использовать источник данных, например, и я думаю, потому что sqlrequest запущен в 0.4 se c, есть одно из заданий, которое закрывает источник данных, а другое не успевает завершить sh, поэтому у меня есть эта ошибка:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Last packet sent to the server was 1001 ms ago.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1074)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2873)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2763)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3299)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1837)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:1961)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2543)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1737)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1888)
    at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
    at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
    at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:127)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
    at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
    at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2329)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2774)
    ... 32 more
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

Last packet sent to the server was 1007 ms ago.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:406)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1074)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2873)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2763)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3299)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1837)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:1961)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2543)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1737)
    at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1888)
    at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52)
    at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java)
    at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:127)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
    at com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
    at com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2329)
    at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2774)
    ... 32 more
67850 [SimpleAsyncTaskExecutor-3] ERROR org.springframework.batch.core.step.AbstractStep - Encountered an error executing step feedsurvcapacitystep in job jobfeedsurvcapacity
org.springframework.batch.item.ItemStreamException: Failed to initialize the reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:153)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader$$FastClassBySpringCGLIB$$ebb633d0.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
    at org.springframework.batch.item.database.JdbcCursorItemReader$$EnhancerBySpringCGLIB$$88b6e64a.open(<generated>)
    at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
    at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.batch.item.ItemStreamException: Error while closing item reader
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:142)
    at org.springframework.batch.item.database.JdbcCursorItemReader.openCursor(JdbcCursorItemReader.java:131)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doOpen(AbstractCursorItemReader.java:428)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.open(AbstractItemCountingItemStreamItemReader.java:150)
    ... 20 more
Caused by: java.sql.SQLException: Connection is closed
    at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:466)
    at com.sun.proxy.$Proxy55.setAutoCommit(Unknown Source)
    at com.zaxxer.hikari.pool.ProxyConnection.setAutoCommit(ProxyConnection.java:380)
    at com.zaxxer.hikari.pool.HikariProxyConnection.setAutoCommit(HikariProxyConnection.java)
    at org.springframework.batch.item.database.AbstractCursorItemReader.doClose(AbstractCursorItemReader.java:402)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.close(AbstractItemCountingItemStreamItemReader.java:139)
    ... 23 more

1 Ответ

0 голосов
/ 03 февраля 2020

Это то, как я называю свои задания, это планировщик, который выполняется каждую минуту

   for (int i = 0; i < quality.length; i++) {
                   try {
                       logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] PRIMARY HOSTBASE");
                       jobLauncher.run(jobfeedsurvcapacity,
                               new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                                       .addString("quality_param", quality[i])
                                       .addString("hostbase_param", "PRIMARY").addLong("exchangeIdSession", exchangeIdSession[i])
                                       .toJobParameters());

                       logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] REPLICA 1 HOSTBASE");
                       jobLauncher.run(jobfeedsurvcapacity,
                               new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                                       .addString("quality_param", quality[i]).addString("hostbase_param", "R1").addLong("exchangeIdSession", exchangeIdSession[i])
                                       .toJobParameters());

                       logger.debug("[ScheduledTasks][feedSurveillanceFluxCapacityEveryMinute] REPLICA 2 HOSTBASE");
                       jobLauncher.run(jobfeedsurvcapacity,
                               new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                                       .addString("quality_param", quality[i]).addString("hostbase_param", "R2").addLong("exchangeIdSession", exchangeIdSession[i])
                                       .toJobParameters());
                   } catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException
                           | JobParametersInvalidException | JobRestartException e) {
                       e.printStackTrace();
                   }
   }

quality.length = 2, поэтому есть 3 задания, которые называются * 2

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...