Перезагрузка шага раздела для создания другого набора потоков - PullRequest
0 голосов
/ 19 июня 2019

У меня есть огромный список отчетов, загруженных в шаг раздела чанка.Каждый отчет будет дополнительно обработан для создания индивидуального отчета.Но если я загружаю 50 тыс. Отчетов на шаге раздела, который перегружает сервер, и он становится очень медленным.Вместо этого я бы предпочел, чтобы раздел шага загружал 3k списка отчетов, обрабатывал его и затем загружал еще 3k отчетов на шаге разделения. Продолжайте до тех пор, пока не будут обработаны 50k отчетов.

    <step id="genReport" next="fileTransfer">
        <chunk  item-count="1000">
            <reader ref="Reader" >
            </reader>
            <writer
                ref="Writer" >
            </writer>
        </chunk>
      <partition>
            <mapper ref="Mapper">
                <properties >
                    <property name="threadCount" value="#{jobProperties['threadCount']}"/>
                    <property name="threadNumber" value="#{partitionPlan['threadNumber']}"/>
                </properties>
            </mapper>
      </partition>
    </step> 
public PartitionPlan mapPartitions() {
        PartitionPlanImpl partitionPlan = new PartitionPlanImpl();
        int numberOfPartitions = //dao call to load the reports count
        partitionPlan.setThreads(getThreadCount());
        partitionPlan.setPartitions(numberOfPartitions); //This numberOfPartitions is comes from the database, huge size like 20k to 40k
        Properties[] props = new Properties[numberOfPartitions];

        for (int idx = 0; idx < numberOfPartitions; idx++) {
            Properties threadProperties = new Properties();
            threadProperties.setProperty("threadNumber", idx + "");
            GAHReportListData gahRptListData = gahReportListManager.getPageToProcess(); //Data pulled from PriorityBlockingQueue 
            String dynSqlId = gahRptListData.getDynSqlId(); 

            threadProperties.setProperty("sqlId", dynSqlId);
            threadProperties.setProperty("outFile", fileName);

            props[idx] = threadProperties;
        }
        partitionPlan.setPartitionProperties(props);
        return partitionPlan;
    }

После того, как 3k отчетов данных обрабатываются с помощью картографа разделов, Затем он должен проверить следующий доступный список.Если он доступен, раздел должен быть сброшен для обработки следующего набора 3k отчетов.

1 Ответ

1 голос
/ 19 июня 2019

Нет способа сбросить раздел. Когда все разделы, определенные для partitionMapper, выполнены, этот шаг завершен. У вас может быть второй разделенный шаг, который, как я думаю, будет таким же, как первый (и третий, и четвертый), пока вы не пройдете через все. Это грязно. И вы не можете вернуться в JSL и повторить тот же шаг снова.

Возможно, у вас есть разделение / поток, который одновременно запускает несколько шагов, но вы не можете динамически установить, сколько потоков. Это в JSL. И вы получите больше параллелизма, чем ваша среда, вероятно, может обработать.

Я предполагаю, что ваш блок чтения / обработки / записи выполняет итерацию результатов одного SQLid, назначенного разделу. Чтобы составить список sqlids, я думаю, вам нужен способ узнать, когда один из них был завершен, а следующий запущен в том же цикле чанка. Читатель, вероятно, мог бы управлять списком и знать, когда произошли переходы. Вам, вероятно, понадобится сигнал автору, что конец фрагмента - это конец одного отчета, и он должен перейти к следующему. Возможно, вам понадобится собственный алгоритм проверки, чтобы вы могли быть уверены, что это будет точка проверки в конце отчета, а не надеяться, что вы достигнете контрольной точки, когда в каждом sqlid закончились записи для обработки.

Я представляю это как ответ, а не как другой комментарий, потому что кажется, что ответ на вопрос, заданный здесь, «нет». Остальное - просто обсуждение возможных альтернативных подходов.

...