Я делаю процесс с многопоточностью Spring Batch. Мой процесс получает большой текстовый файл (> 100 тыс. Строк). Я хочу, чтобы каждый поток обрабатывал X строк файла и выполнял X с информацией, чтобы выиграть время.
Я использую класс FlatFileItemRead, передавая начальную и конечную строку каждому потоку. Я делаю тесты с 19k строк, 3 и 4 потоков, и я не могу понять, почему первый поток, если он начинается и останавливается в правильной строке, но остальные потоки начинаются в правильной строке, но не заканчиваются в правильной, прочитайте файл до конца.
Я прочитал много вопросов об этой теме и о классе FlatFileItemReader, он не является поточно-ориентированным. Я думаю, что моя проблема здесь, но я передаю saveState в false и указываю строки для каждого потока. (См. это ).
Это файлы конфигурации XML и файл RangePartition (шаги setTiempoInicial и stTiempoFinal печатают только начальное и общее время):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<batch:job id="superTxtTest">
<batch:step id="setTiempoInicial" next="validacionDePers">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tiempoInicialReader" writer="tiempoInicialWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="promotionListener" />
</batch:listeners>
</batch:step>
<batch:step id="validacionDePers" next="setTiempoFinal">
<partition step="validacionDePersSlave" partitioner="rangePartitioner">
<handler grid-size="3" task-executor="taskExecutor" />
</partition>
</batch:step>
<batch:step id="setTiempoFinal">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tiempoFinalReader" writer="tiempoFinalWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<batch:step id="validacionDePersSlave">
<batch:tasklet transaction-manager="transactionManager"
start-limit="1">
<batch:chunk reader="tratamientoPersReader" writer="validacionPersWriter"
commit-interval="1" skip-limit="1">
<batch:skippable-exception-classes>
<batch:include
class="com.testpartitionfile.batch.PersException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="promotionListener"
class="org.springframework.batch.core.listener.ExecutionContextPromotionListener">
<property name="keys" value="tiempoInicial" />
</bean>
<bean id="validacionPersWriter"
class="com.testpartitionfile.batch.writer.ValidacionPersWriter"
scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<bean id="rangePartitioner"
class="com.testpartitionfile.batch.partitioner.RangePartitioner" />
<bean id="tratamientoPersReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource"
value="file:C:\Users\user\Desktop\testFile.txt" />
<property name="encoding" value="utf8" />
<property name="lineMapper" ref="ficheroPersMapper" />
<property name="linesToSkip" value="#{stepExecutionContext[fromId]}" />
<property name="maxItemCount" value="#{stepExecutionContext[toId]}" />
<property name="saveState" value="false" />
</bean>
<bean id="ficheroPersMapper"
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="fieldSetMapper" ref="ficheroPersSetMapper" />
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=";" />
<property name="names"
value="a,b,c,d,e,f,g,h,i,j" />
</bean>
</property>
</bean>
<bean id="ficheroPersSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="fichero" />
</bean>
<bean id="fichero" class="com.testpartitionfile.batch.dto.Fichero"
scope="prototype" />
</beans>
RangePartitioner:
public class RangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int range = 1;
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream("C:\\Users\\user\\Desktop\\testFile.txt"));
byte[] c = new byte[1024];
int count = 0;
int readChars = 0;
while ((readChars = is.read(c)) != -1) {
for (int i = 0; i < readChars; ++i) {
if (c[i] == '\n') {
++count;
}
}
}
if (count != 0) {
range = (count/gridSize);
}
} catch (Exception e){
try {
is.close();
} catch (Exception e1) {
}
}
int fromId = 1;
int toId = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();
System.out.println("\nHilo : " + i);
System.out.println("Index Inicial : " + fromId);
System.out.println("Index Final : " + toId + "\n");
value.putInt("fromId", fromId - 1);
value.putInt("toId", toId);
// give each thread a name, thread 1,2,3
value.putString("name", "Hilo " + i);
result.put("partition" + i, value);
fromId = toId + 1;
toId += range;
if (i == gridSize - 1) toId = toId + 1;
}
return result;
}
}
Исходный журнал с 3-мя потоками:
Thread: 1
Initial Index: 1
Final Index: 6333
Thread: 2
Index Initial: 6334
Final Index: 12666
Thread: 3
Index Initial: 12667
Final Index: 19000
Дело 1:
Поток 1 читает до 6333. Поток 2 читает до 19000 вместо чтения до 12666, а поток 3 читает до 19000.
Дело 2:
При 4-х потоках нити 2 и 3 читаются до 19000.
Почему поток 2 в случае 1 и потоки 2 и 3 в случае 2 читаются из начальной строки (правильно) до конца (неправильно)?