Я работаю над примером Spring Boot и Spring Batch , взяв ссылку из: https://mkyong.com/spring-batch/spring-batch-partitioning-example/.
В этом примере я преобразовал код в на основе аннотации, но она не работает. Его единственное сохранение записей в файле CSV до 40 в блоке из 10. Почему другие не получают записи?
Журналы:
2020-05-06 01:31:39.541 INFO 24180 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
Starting : Thread1, fromId : 1, toId : 10
Starting : Thread2, fromId : 11, toId : 20
Starting : Thread3, fromId : 21, toId : 30
Starting : Thread4, fromId : 31, toId : 40
Starting : Thread5, fromId : 41, toId : 50
Starting : Thread6, fromId : 51, toId : 60
Starting : Thread7, fromId : 61, toId : 70
Starting : Thread8, fromId : 71, toId : 80
Starting : Thread9, fromId : 81, toId : 90
Starting : Thread10, fromId : 91, toId : 100
2020-05-06 01:31:39.676 WARN 24180 --- [ main] o.s.b.c.c.a.DefaultBatchConfigurer : No transaction manager was provided, using a DataSourceTransactionManager
2020-05-06 01:31:39.770 INFO 24180 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2020-05-06 01:31:39.927 INFO 24180 --- [ main] .e.SpringBatchPartitionMkyongApplication : Started SpringBatchPartitionMkyongApplication in 2.078 seconds (JVM running for 3.324)
2020-05-06 01:31:39.928 INFO 24180 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [--spring.output.ansi.enabled=always]
2020-05-06 01:31:39.975 INFO 24180 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{date=1588702900000, time=1588702899832, run.id=1, -spring.output.ansi.enabled=always}]
2020-05-06 01:31:40.007 INFO 24180 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
Starting : Thread1, fromId : 1, toId : 10
Starting : Thread2, fromId : 11, toId : 20
Starting : Thread3, fromId : 21, toId : 30
Starting : Thread4, fromId : 31, toId : 40
2020-05-06 01:31:40.142 INFO 24180 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition4] executed in 120ms
2020-05-06 01:31:40.142 INFO 24180 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition2] executed in 119ms
2020-05-06 01:31:40.142 INFO 24180 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition3] executed in 120ms
2020-05-06 01:31:40.142 INFO 24180 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition1] executed in 119ms
2020-05-06 01:31:40.146 INFO 24180 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 139ms
2020-05-06 01:31:40.150 INFO 24180 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{date=1588702900000, time=1588702899832, run.id=1, -spring.output.ansi.enabled=always}] and the following status: [COMPLETED] in 151ms
2020-05-06 01:31:40.152 INFO 24180 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] launched with the following parameters: [{date=1588708900151, time=1588708900151}]
2020-05-06 01:31:40.156 INFO 24180 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
Starting : Thread1, fromId : 1, toId : 10
Starting : Thread2, fromId : 11, toId : 20
Starting : Thread3, fromId : 21, toId : 30
Starting : Thread4, fromId : 31, toId : 40
2020-05-06 01:31:40.203 INFO 24180 --- [cTaskExecutor-7] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition4] executed in 39ms
2020-05-06 01:31:40.203 INFO 24180 --- [cTaskExecutor-5] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition1] executed in 40ms
2020-05-06 01:31:40.203 INFO 24180 --- [cTaskExecutor-8] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition3] executed in 39ms
2020-05-06 01:31:40.209 INFO 24180 --- [cTaskExecutor-6] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition2] executed in 46ms
2020-05-06 01:31:40.211 INFO 24180 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 55ms
2020-05-06 01:31:40.213 INFO 24180 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=partitionJob]] completed with the following parameters: [{date=1588708900151, time=1588708900151}] and the following status: [COMPLETED] in 60ms
STATUS :: COMPLETED
2020-05-06 01:31:40.215 INFO 24180 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2020-05-06 01:31:40.219 INFO 24180 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
JobConfig. java
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
@StepScope
public JdbcPagingItemReader<User> userJdbcPagingItemReader(
@Value("#{stepExecutionContext[fromId]}") Integer fromId,
@Value("#{stepExecutionContext[toId]}") Integer toId) {
// reading database records using JDBC in a paging fashion.
JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(10);
reader.setRowMapper(new UserRowMapper());
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, username, password, age");
queryProvider.setFromClause("from user");
queryProvider.setWhereClause("where id >= :fromId and id <= :toId");
queryProvider.setSortKeys(sortKeys);
// Parameter Values
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("fromId", fromId);
parameterValues.put("toId", toId);
reader.setQueryProvider(queryProvider);
reader.setParameterValues(parameterValues);
return reader;
}
@Bean
@StepScope
public FlatFileItemWriter<User> fileItemWriter(
@Value("#{stepExecutionContext[fromId]}") Integer fromId,
@Value("#{stepExecutionContext[toId]}") Integer toId) throws Exception {
FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("csv/outputs/users.processed" + fromId + "-" + toId));
writer.setAppendAllowed(false);
writer.setLineAggregator(new UserLineAggregator());
writer.afterPropertiesSet();
return writer;
}
@Bean
public Partitioner partioner() {
Partitioner partitioner = new RangePartitioner();
partitioner.partition(10);
return partitioner;
}
// Master
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.partitioner(slaveStep().getName(), partioner())
.step(slaveStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() throws Exception {
return stepBuilderFactory.get("slaveStep")
.<User, User>chunk(10)
.reader(userJdbcPagingItemReader(null, null))
.writer(fileItemWriter(null, null))
.build();
}
@Bean(name="partitionJob")
public Job job() throws Exception {
return jobBuilderFactory.get("partitionJob")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
RangePartitioner. java @Component publi c класс RangePartitioner реализует Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionC
ontext> result = new HashMap<>();
int range = 10;
int fromId = 1;
int toId = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();
System.out.println("\nStarting : Thread" + i +", fromId : " + fromId+", toId : " + toId);
value.putInt("fromId", fromId);
value.putInt("toId", toId);
// give each thread a name
value.putString("name", "Thread" + i);
result.put("partition" + i, value);
fromId = toId + 1;
toId += range;
}
return result;
}
}