Пакет данных Spring и поток данных облака Spring - postgres - java .lang.IllegalArgumentException: Invalid TaskExecution, ID не найден - PullRequest
0 голосов
/ 29 мая 2020

Я получаю java .lang.IllegalArgumentException: Invalid TaskExecution, ID not found, когда я запускаю весеннюю пакетную задачу с использованием потока данных весеннего облака. Я прочитал, что ошибка выполнения недопустимой задачи связана с тем, что не используется один и тот же источник данных для весеннего пакета и потока данных весеннего облака. Но в моем случае я получаю эту ошибку даже после использования того же источника данных postgres.

Это мой класс DataSourceConfiguration

import java.sql.SQLException;

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.task.configuration.DefaultTaskConfigurer;
import org.springframework.cloud.task.configuration.TaskConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.SimpleDriverDataSource;
import org.springframework.jdbc.datasource.init.DatabasePopulator;
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;

import com.zaxxer.hikari.HikariDataSource;

public class DataSourceConfiguration {
    @Value("classpath:schema-postgres.sql")
    private Resource schemaScript;

    // This data source is for spring batch to create its table to track jobs
    // spring batch and spring cloud data flow will use this datasource
    @Bean
    @Primary
    public DataSource postgressDataSource() throws SQLException {
        final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
        dataSource.setDriver(new org.postgresql.Driver());
        dataSource.setUrl("jdbc:postgresql://***/***");
        dataSource.setUsername("***");
        dataSource.setPassword("***");
        DatabasePopulatorUtils.execute(databasePopulator(), dataSource);
        return dataSource;
    }

    private DatabasePopulator databasePopulator() {
        final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(schemaScript);
        return populator;
    }

    @Bean
    public JdbcTemplate postgressJdbcTemplate(final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    // This data source is used for poc
    @Bean
    public DataSource h2DataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setDriverClassName("org.h2.Driver");
        // INIT in connection string will run data.sql script during in-memory h2
        // startup
        dataSource.setJdbcUrl("jdbc:h2:mem:testdb;INIT=RUNSCRIPT FROM 'classpath:data.sql'");
        dataSource.setUsername("***");
        dataSource.setPassword("***");
        return dataSource;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(@Qualifier("h2DataSource") final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public TaskConfigurer taskConfigurer() {
        try {
            return new DefaultTaskConfigurer(postgressDataSource());
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }
}

Это моя пакетная конфигурация

import java.util.ArrayList;
import java.util.List;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.csx.batch.listener.JobCompletionNotificationListener;
import com.csx.batch.mapper.ReportInfoMapper;
import com.csx.batch.process.ReportInfo;
import com.csx.batch.process.ReportItemProcessor;
import com.csx.batch.process.ReportWrapper;

@Configuration
@EnableScheduling
@EnableTask
@EnableBatchProcessing
@Import({ DataSourceConfiguration.class })
public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    // one reader
    @Bean
    public ItemReader<ReportInfo> reader(@Qualifier("h2DataSource") final DataSource dataSource) {
        JdbcCursorItemReader<ReportInfo> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("SELECT * FROM REPORTINFO");
        reader.setRowMapper(new ReportInfoMapper());

        // next call will made to app api and get report information
        return reader;
    }

    @Bean
    public ReportItemProcessor processor() {
        return new ReportItemProcessor();
    }

    // multiple writers
    @Bean
    public CompositeItemWriter<ReportWrapper> writer(@Qualifier("h2DataSource") final DataSource dataSource) {
        List<ItemWriter<? super ReportWrapper>> delegates = new ArrayList<>();
        // this writer step is optional
        delegates.add(reportWriter(dataSource));
        delegates.add(reportInfoWriter(dataSource));
        // add another writer to send report as email/NAS input

        return new CompositeItemWriterBuilder<ReportWrapper>().delegates(delegates).build();
    }

    // writing to report table
    @Bean
    public JdbcBatchItemWriter<ReportWrapper> reportWriter(@Qualifier("h2DataSource") final DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<ReportWrapper>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO REPORT (REPORT_NAME, REPORT_DATA) VALUES (:reportName, :reportData)")
                .dataSource(dataSource).build();
    }

    // updating reportinfo report_date column
    @Bean
    public JdbcBatchItemWriter<ReportWrapper> reportInfoWriter(@Qualifier("h2DataSource") final DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<ReportWrapper>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("UPDATE REPORTINFO SET REPORT_DATE = :nextReportDate WHERE ID = :reportId").dataSource(dataSource)
                .build();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1)
                .end().build();
    }

    @Bean
    public Step step1(ItemReader<ReportInfo> reader, ItemProcessor<ReportInfo, ReportWrapper> processor,
            CompositeItemWriter<ReportWrapper> writer) {
        return stepBuilderFactory.get("step1").<ReportInfo, ReportWrapper>chunk(1).reader(reader).processor(processor())
                .writer(writer).build();
    }
}

Выполнение вышеуказанного задания с использованием встроенного весеннего планировщика, и оно работает должным образом и добавляет задачи в таблицу task_execution, как ожидалось.

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class BatchJobScheduler {
    @Autowired
    private Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @Scheduled(fixedDelay = 8000)
    public void runBatchJob() {
        JobParameters params = new JobParametersBuilder().addLong("jobId", System.currentTimeMillis())
                .toJobParameters();
        try {

            jobLauncher.run(job, params);

        } catch (JobExecutionAlreadyRunningException e) {
            e.printStackTrace();
        } catch (JobRestartException e) {
            e.printStackTrace();
        } catch (JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        } catch (JobParametersInvalidException e) {
            e.printStackTrace();
        }
    }
}

На изображении ниже мы ясно видим, что task_execution_id 3 и 6 соответствуют ожиданиям от приложения планировщик. Где, как task_execution_id 4 и 5, взяты из потока данных весеннего облака, и это приводит к тому, что идентификатор выполнения задачи не найден. Несмотря на то, что он добавляет запись в эту же таблицу (как для весенней партии, так и для весеннего облачного потока данных есть один и тот же postgres источник данных).

enter image description here

Используемые зависимости для приложения Spring Cloud Data Flow

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.***</groupId>
    <artifactId>spring-cloud-data-flow-poc</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-cloud-data-flow-poc</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-dataflow-server</artifactId>
            <version>2.2.0.RELEASE</version>
            <exclusions>
                <exclusion>
                <groupId>io.pivotal</groupId>
                <artifactId>pivotal-cloudfoundry-client-reactor</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-hateoas</artifactId>
          <version>2.2.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;
import org.springframework.cloud.deployer.spi.cloudfoundry.CloudFoundryDeployerAutoConfiguration;
import org.springframework.cloud.kubernetes.KubernetesAutoConfiguration;

@EnableDataFlowServer
@SpringBootApplication(
        exclude = {
                CloudFoundryDeployerAutoConfiguration.class,
                KubernetesAutoConfiguration.class}
)
public class SpringCloudDataFlowPocApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudDataFlowPocApplication.class, args);
    }

}

Свойства приложения Spring Cloud Dataflow имеют тот же источник данных, что и упомянутый выше в приложении Spring Boot.

spring.datasource.url=jdbc:postgresql://***:5432/***
spring.datasource.driverClassName=org.postgresql.Driver
spring.datasource.username=***
spring.datasource.password=***

spring.cloud.dataflow.features.streams-enabled=false

spring.cloud.dataflow.features.tasks-enabled=true

spring.cloud.dataflow.features.schedules-enabled=true
spring.batch.initialize-schema=always

Вот весеннее облако выполнение потока данных: enter image description here

Я получаю ошибку при запуске задачи весеннего пакетного задания из потока данных весеннего облака

2020-05-28 17:31:47.369  WARN 98668 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'taskLifecycleListener'; nested exception is java.lang.IllegalArgumentException: Invalid TaskExecution, ID 5 not found
2020-05-28 17:31:47.370  INFO 98668 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2020-05-28 17:31:47.371  INFO 98668 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans
2020-05-28 17:31:47.372 ERROR 98668 --- [           main] o.s.c.t.listener.TaskLifecycleListener   : An event to end a task has been received for a task that has not yet started.
2020-05-28 17:31:47.373  INFO 98668 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2020-05-28 17:31:47.376  INFO 98668 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
2020-05-28 17:31:47.378  INFO 98668 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]

Пожалуйста, помогите мне понять, почему я Я получаю этот идентификатор ошибки не найден, хотя я даю один и тот же источник данных postgress как для весенних пакетных, так и для весенних облачных приложений потока данных.

Другой следующий вопрос, который у меня возник: если я запускаю пакет Spring в кластерной среде, будет ли планировщик потоков данных Spring Cloud обрабатывать кластерную среду? Или весенний облачный планировщик запускает весеннюю партию с несколькими узлами?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...