Я получаю java .lang.IllegalArgumentException: Invalid TaskExecution, ID not found, когда я запускаю весеннюю пакетную задачу с использованием потока данных весеннего облака. Я прочитал, что ошибка выполнения недопустимой задачи связана с тем, что не используется один и тот же источник данных для весеннего пакета и потока данных весеннего облака. Но в моем случае я получаю эту ошибку даже после использования того же источника данных postgres.
Это мой класс DataSourceConfiguration
import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.cloud.task.configuration.TaskConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.SimpleDriverDataSource;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import com.zaxxer.hikari.HikariDataSource;
public class DataSourceConfiguration {
@Value("classpath:schema-postgres.sql")
private Resource schemaScript;
// This data source is for spring batch to create its table to track jobs
// spring batch and spring cloud data flow will use this datasource
@Bean
@Primary
public DataSource postgressDataSource() throws SQLException {
final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
dataSource.setDriver(new org.postgresql.Driver());
dataSource.setUrl("jdbc:postgresql://***/***");
dataSource.setUsername("***");
dataSource.setPassword("***");
DatabasePopulatorUtils.execute(databasePopulator(), dataSource);
return dataSource;
}
private DatabasePopulator databasePopulator() {
final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(schemaScript);
return populator;
}
@Bean
public JdbcTemplate postgressJdbcTemplate(final DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
// This data source is used for poc
@Bean
public DataSource h2DataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName("org.h2.Driver");
// INIT in connection string will run data.sql script during in-memory h2
// startup
dataSource.setJdbcUrl("jdbc:h2:mem:testdb;INIT=RUNSCRIPT FROM 'classpath:data.sql'");
dataSource.setUsername("***");
dataSource.setPassword("***");
return dataSource;
}
@Bean
public JdbcTemplate jdbcTemplate(@Qualifier("h2DataSource") final DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public TaskConfigurer taskConfigurer() {
try {
return new DefaultTaskConfigurer(postgressDataSource());
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
}
Это моя пакетная конфигурация
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.csx.batch.listener.JobCompletionNotificationListener;
import com.csx.batch.mapper.ReportInfoMapper;
import com.csx.batch.process.ReportInfo;
import com.csx.batch.process.ReportItemProcessor;
import com.csx.batch.process.ReportWrapper;
@Configuration
@EnableScheduling
@EnableTask
@EnableBatchProcessing
@Import({ DataSourceConfiguration.class })
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
// one reader
@Bean
public ItemReader<ReportInfo> reader(@Qualifier("h2DataSource") final DataSource dataSource) {
JdbcCursorItemReader<ReportInfo> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("SELECT * FROM REPORTINFO");
reader.setRowMapper(new ReportInfoMapper());
// next call will made to app api and get report information
return reader;
}
@Bean
public ReportItemProcessor processor() {
return new ReportItemProcessor();
}
// multiple writers
@Bean
public CompositeItemWriter<ReportWrapper> writer(@Qualifier("h2DataSource") final DataSource dataSource) {
List<ItemWriter<? super ReportWrapper>> delegates = new ArrayList<>();
// this writer step is optional
delegates.add(reportWriter(dataSource));
delegates.add(reportInfoWriter(dataSource));
// add another writer to send report as email/NAS input
return new CompositeItemWriterBuilder<ReportWrapper>().delegates(delegates).build();
}
// writing to report table
@Bean
public JdbcBatchItemWriter<ReportWrapper> reportWriter(@Qualifier("h2DataSource") final DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<ReportWrapper>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO REPORT (REPORT_NAME, REPORT_DATA) VALUES (:reportName, :reportData)")
.dataSource(dataSource).build();
}
// updating reportinfo report_date column
@Bean
public JdbcBatchItemWriter<ReportWrapper> reportInfoWriter(@Qualifier("h2DataSource") final DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<ReportWrapper>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("UPDATE REPORTINFO SET REPORT_DATE = :nextReportDate WHERE ID = :reportId").dataSource(dataSource)
.build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1)
.end().build();
}
@Bean
public Step step1(ItemReader<ReportInfo> reader, ItemProcessor<ReportInfo, ReportWrapper> processor,
CompositeItemWriter<ReportWrapper> writer) {
return stepBuilderFactory.get("step1").<ReportInfo, ReportWrapper>chunk(1).reader(reader).processor(processor())
.writer(writer).build();
}
}
Выполнение вышеуказанного задания с использованием встроенного весеннего планировщика, и оно работает должным образом и добавляет задачи в таблицу task_execution, как ожидалось.
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class BatchJobScheduler {
@Autowired
private Job job;
@Autowired
private JobLauncher jobLauncher;
@Scheduled(fixedDelay = 8000)
public void runBatchJob() {
JobParameters params = new JobParametersBuilder().addLong("jobId", System.currentTimeMillis())
.toJobParameters();
try {
jobLauncher.run(job, params);
} catch (JobExecutionAlreadyRunningException e) {
e.printStackTrace();
} catch (JobRestartException e) {
e.printStackTrace();
} catch (JobInstanceAlreadyCompleteException e) {
e.printStackTrace();
} catch (JobParametersInvalidException e) {
e.printStackTrace();
}
}
}
На изображении ниже мы ясно видим, что task_execution_id 3 и 6 соответствуют ожиданиям от приложения планировщик. Где, как task_execution_id 4 и 5, взяты из потока данных весеннего облака, и это приводит к тому, что идентификатор выполнения задачи не найден. Несмотря на то, что он добавляет запись в эту же таблицу (как для весенней партии, так и для весеннего облачного потока данных есть один и тот же postgres источник данных).
Используемые зависимости для приложения Spring Cloud Data Flow
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.***</groupId>
<artifactId>spring-cloud-data-flow-poc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-data-flow-poc</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server</artifactId>
<version>2.2.0.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.pivotal</groupId>
<artifactId>pivotal-cloudfoundry-client-reactor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;
import org.springframework.cloud.deployer.spi.cloudfoundry.CloudFoundryDeployerAutoConfiguration;
import org.springframework.cloud.kubernetes.KubernetesAutoConfiguration;
@EnableDataFlowServer
@SpringBootApplication(
exclude = {
CloudFoundryDeployerAutoConfiguration.class,
KubernetesAutoConfiguration.class}
)
public class SpringCloudDataFlowPocApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudDataFlowPocApplication.class, args);
}
}
Свойства приложения Spring Cloud Dataflow имеют тот же источник данных, что и упомянутый выше в приложении Spring Boot.
spring.datasource.url=jdbc:postgresql://***:5432/***
spring.datasource.driverClassName=org.postgresql.Driver
spring.datasource.username=***
spring.datasource.password=***
spring.cloud.dataflow.features.streams-enabled=false
spring.cloud.dataflow.features.tasks-enabled=true
spring.cloud.dataflow.features.schedules-enabled=true
spring.batch.initialize-schema=always
Вот весеннее облако выполнение потока данных:
Я получаю ошибку при запуске задачи весеннего пакетного задания из потока данных весеннего облака
2020-05-28 17:31:47.369 WARN 98668 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is java.lang.IllegalArgumentException: Invalid TaskExecution, ID 5 not found
2020-05-28 17:31:47.370 INFO 98668 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2020-05-28 17:31:47.371 INFO 98668 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans
2020-05-28 17:31:47.372 ERROR 98668 --- [ main] o.s.c.t.listener.TaskLifecycleListener : An event to end a task has been received for a task that has not yet started.
2020-05-28 17:31:47.373 INFO 98668 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2020-05-28 17:31:47.376 INFO 98668 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
2020-05-28 17:31:47.378 INFO 98668 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
Пожалуйста, помогите мне понять, почему я Я получаю этот идентификатор ошибки не найден, хотя я даю один и тот же источник данных postgress как для весенних пакетных, так и для весенних облачных приложений потока данных.
Другой следующий вопрос, который у меня возник: если я запускаю пакет Spring в кластерной среде, будет ли планировщик потоков данных Spring Cloud обрабатывать кластерную среду? Или весенний облачный планировщик запускает весеннюю партию с несколькими узлами?