Spring Batch - получение исключения DeadlockLoserDataAccessException при попытке чтения / записи в одну и ту же таблицу - PullRequest
0 голосов
/ 20 ноября 2018

Я работаю над приложением Spring Batch, которое будет считывать необработанные данные из таблицы A, обрабатывать данные, вставлять обработанные данные в таблицу B, а затем обновлять строку в таблице A до PROCESSED.Однако, хотя вставка данных в таблицу B работает нормально, я продолжаю получать исключение DeadlockLoserDataAccessException каждый раз, когда пытаюсь обновить таблицу A. Я считаю, что это связано с тем, что Curser из JDBCCursorItemReader, который использовался для чтения таблицы A, мешаетОбновление таблицы.Как мне исправить это?

Я использую JDBCCursorItemReader и CompositeItemWriter в Spring Batch.Размер куска 1.

Ответы [ 3 ]

0 голосов
/ 21 ноября 2018

Я бы посоветовал вам переработать логику транзакции, чтобы "заблокировать" необходимые строки TABLEA, помечающие их как "ОБРАБОТАННЫЕ" в самом начале вашей транзакции, а не обновлять их еще раз в конце транзакции.Смотрите пример ниже.

-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys; 
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;

-- Every session starts its transaction with locking its own set of rows (only one in the example), 
-- which becomes invisible for the same statement issued by other concurrent transactions 
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
0 голосов
/ 21 ноября 2018

Я полагаю, что это связано с тем, что Curser из JDBCCursorItemReader, который использовался для чтения таблицы A, мешает обновлению таблицы.Как мне исправить это?

Это не должно вызывать проблем, если и чтение, вставка и обновление находятся в одной транзакции (что имеет место при использовании шага, ориентированного на чанк).

Я использую JDBCCursorItemReader и CompositeItemWriter в Spring Batch.Размер чанка равен 1.

Вот быстрый (автономный) пример с той же конфигурацией, как вы упомянули:

import java.util.Arrays;
import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public JdbcCursorItemReader<Person> itemReader() {
        return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person where processed = false")
                .beanRowMapper(Person.class)
                .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                .build();
    }

    @Bean
    public ItemProcessor<Person, Person> itemProcessor() {
        return item -> new Person(item.getId(), item.getName().toUpperCase());
    }

    @Bean
    public CompositeItemWriter<Person> itemWriter() {
        return new CompositeItemWriterBuilder<Person>()
                .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                .ignoreItemStream(true)
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> peopleItemWriter() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("insert into people (name) values (:name)")
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<Person> personItemUpdater() {
        return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("update person set processed = true where id = :id")
                .build();
    }

    @Bean
    public Step step() {
        return steps.get("step")
                .<Person, Person>chunk(1)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
    }

    @Bean
    public Job job() {
        return jobs.get("job")
                .start(step())
                .build();
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    public static void main(String[] args) throws Exception {

        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
        jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
        jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
        jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
        jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());

        Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
        Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
        System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
        System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
        Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
        System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
    }

    public static class Person {

        private int id;

        private String name;

        public Person() {
        }

        public Person(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

}

Он читает людей из Persontable (TableA в вашем случае), в верхнем регистре их имя и записывает результат в таблицу People (TableB в вашем случае).Затем он обновляет флаг processed в таблице Person.

Если вы запустите пример, вы должны увидеть:

nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2

без исключения мертвой блокировки.

Надеюсь, это поможет.

0 голосов
/ 20 ноября 2018

Архитектура ETL подобна чтению данных из источника, их обработке и записи в цель.Я стараюсь избегать этой логики обновления в моих процессах, так как это приводит к большим накладным расходам и проблемам, которые вы описали.Поэтому, возможно, вы могли бы переосмыслить архитектуру ...

Если нет, я действительно рекомендую иметь соответствующий индекс для обновления - в зависимости от условий поиска, которые вы используете.Это сделает обновление не только более дешевым, но и SQL потребуется только для доступа к одной строке, что позволит избежать дополнительных сканирований таблиц для обновления.

...