Как спроектировать задание Springbatch, которое извлекает записи из БД в пакетном режиме и параллельно запускает несколько процессоров и пишущих - PullRequest
1 голос
/ 04 апреля 2020

Сценарий: Чтение записей из БД и создание из нее 4 различных выходных файлов.

Технический стек:

  • Springboot 2 .x
  • springBatch 4.2.x
  • ArangoDB 3.6.x

Текущий подход: Задание SpringBatch, которое имеет следующие шаги в последовательности:

jobBuilderFactory.get("alljobs")
      .start(step("readAllData")) //reads all records from db, stores it in Obj1 (R1)
      .next(step("processData1")) //(P1)
      .next(step("writer1"))      // writes it to file1(W1)
      .next(step("reader2"))      // reads the same obj1(R2)
      .next(step("processor2"))   // processes it (P2)
      .next(step("writer2"))      // writes it to file1(W2)
      .next(step("reader3"))      // reads the same obj1 (R3)
      .next(step("processor3"))   // processes it (P3)
      .next(step("writer3"))      // writes it to file1(W3)
      .next(step("reader4"))      // reads the same obj1(R4)
      .next(step("processor4"))   // processes it (P4)
      .next(step("writer4"))      // writes it to file1 (W4)
      .build()

Проблема: Поскольку объем данных, поступающих из БД, ОГРОМ,> 200 000 записей, следовательно, теперь мы выбираем записи с помощью курсора в пакете из 10 000 записей.

Целевое состояние задания: Задание чтения, которое извлекает записи из БД с помощью курсора в пакете из 1000 записей:

  1. Для каждой партии из 1000 записей I должны запускать процессор и записывающее устройство для одного и того же.
  2. Кроме того, поскольку для всех остальных трех процессоров и записывающих устройств набор данных будет одинаковым (Obj1, который будет выбран из курсора), вызывая их параллельно.
Reader1() {
    while(cursor.hasNext()) {
          Obj1 = cursor.next();

          a) P1(Obj1);  |    c) R2(Obj1); |    c) R3(Obj1);  |   c) R4(Obj1);   || 
          b) W1(Obj1);  |    d) P2(Obj1); |    d) P3(Obj1);  |   d) P4(Obj1);   || All these running in parallel.
                        |    e) W2(Obj1); |    e) W3(Obj1);  |   e) W4(Obj1);   ||
    }
}

Ниже приведены подходы, которые возникли у меня в голове:

* 10 39 * Вызовите задание внутри самого курсора и выполните все шаги P1....W4 внутри итерации курсора за итерацией. Вызовите задание, которое имеет первый шаг как Reader1, а затем внутри курсора, вызовите еще один subJob, который имеет все эти P1....W4 параллельно, поскольку мы не можем go из курсора.

Пожалуйста, предложите лучший способ реализации.

Заранее спасибо.

Обновление:

Я пытался выполнить шаги (P1 .... W4) внутри шага My Reader1 в al oop, но Я застрял с реализацией, поскольку все здесь написано как Шаг, и я не уверен, как вызвать несколько шагов внутри шага R1 в al oop. Я попытался использовать Decider, поместив P1 ... W4 в поток (flow):

   flowbuilder.start(step("R1"))
         .next(decider())
         .on(COMPLETED).end()
         .from(decider())
         .on(CONTINUE)
         .flow(flow)

job.start(flow)
   .next(flow).on("CONTINUE").to(endJob()).on("FINISHED").end()
   .end()
   .build()

Но я не могу go вернуться к следующим итерациям курсора, так как итерация курсора есть только в шаге R1. Я также попытался поместить все этапы R1 ... W4 (включая Reader1) в один и тот же поток, но в итоге поток выдал циклическую c ошибку потока.

Просьба предложить лучший способ реализации это? Как сделать так, чтобы все остальные шаги, вызываемые параллельно внутри курсора, повторялись в шаге R1.

Ответы [ 2 ]

1 голос
/ 06 апреля 2020

Я считаю, что использование 4-х параллельных шагов является хорошим вариантом для вас. Даже если у вас будет 4 потока, читающих из одних и тех же данных, вы должны выиграть от параллельных шагов на этапах обработки / записи. Это должно определенно выполнить лучше, чем 4 шага в последовательности. Кстати, 200 тыс. Записей не так уж много (конечно, это зависит от размера записи и от того, как она отображается, но я думаю, что это должно быть нормально, чтение данных никогда не является узким местом).

Это всегда о торговле offs .. Здесь я торгую небольшим дублированием чтения для лучшей общей пропускной способности благодаря параллельным шагам. Я бы не стал убивать себя, чтобы убедиться, что элементы читаются только один раз и усложняют ситуацию.

Хорошей аналогией такого компромисса в мире баз данных является принятие некоторого дублирования данных в пользу более быстрых запросов (подумайте о Нет SQL дизайна, где иногда рекомендуется дублировать некоторые данные, чтобы избежать дорогостоящих объединений.

0 голосов
/ 26 апреля 2020

Вот как я окончательно спроектировал решение:

Итак, я перестроил весь поток от подхода, основанного на тасклетах, до подхода, основанного на оркестрированном чанке.

Задание будет иметь 1 шаг named - fetchProcessAndWriteData.

jobBuilderFactory.get("allChunkJob")
      .start(step("fetchProcessAndWriteData"))
      .next(step("updatePostJobRunDetails")) 
      .build()

fetchProcessAndWriteData: будет иметь ридер, masterProcessor и masterWriter с размером фрагмента 10000.

steps
        .get("fetchProcessAndWriteData")
        .chunk(BATCHSIZE)
        .reader(chunkReader)
        .processecor(masterProcessor)
        .writer(masterWriter)
        .listener(listener())
        .build()

chunkReader - считывать данные в фрагменты из курсора базы данных и передать их masterProcessor.

masterProcessor - принимает данные по одной и передает записи всем другим процессорам - P1, P2, P3, P4 и сохраняет обработанные данные в CompositeResultBean.

CompositeResultBean состоит из держателей данных для всех 4 типов записей.

List<Record> recordType1.
    List<Record> recordType2.
    List<Record> recordType3.
    List<Record> recordType4.

Этот компонент затем возвращается из метода процесса masterProcessor.

public Object process(Object item){
     .. 
     bean.setRecordType1(P1.process(item));
     bean.setRecordType2(P2.process(item));
     bean.setRecordType3(P3.process(item));
     bean.setRecordType4(P4.process(item));
     return bean;
    }

masterWriter - на этом шаге принимается список записей, т. Е. Список составных элементов здесь. Выполните итерацию в списке bean-компонента и вызовите соответствующий метод записи W1, W2, W3, W4 writer () с данными, содержащимися в каждом из композитных атрибутов композитного текста.

publi c void write (List list) {

list.forEach(record -> {
    W1.write(isInitialBatch,list.getRecordType1());
    W2.write(isInitialBatch,list.getRecordType2());
    W3.write(isInitialBatch,list.getRecordType3());
    W4.write(isInitialBatch,list.getRecordType4());
    }); 
  }

Все эти шаги выполняются в виде пакета из 10 тыс. Записей и записывают данные в файл.

Еще одна проблема, с которой я столкнулся при записи файла, заключалась в том, что мне пришлось бы заменить Уже существующий файл в первый раз записи записывается, но для более поздних необходимо добавить в тот же файл. Я решил эту проблему, переопределив chunkListener в masterWriter - где я извлек пакет # и установил флаг stati c isInitialBatch , по умолчанию равный TRUE. Эта переменная устанавливается внутри

beforeChunk() 
if chunkContext.getStepContext().getStepExecution().getCommitCount()==0 as TRUE , else FALSE .

То же логическое значение передается в FileWriter, который открывает файл в режиме добавления - TRUE или FALSE.

W1.write(isInitialBatch,list.getRecordType1());
...