У меня было требование, когда мне нужно было обработать файл, содержащий 1 миллион записей и сохранить его в кэше Redis.Я должен был использовать конвейер Redis, но не получил никакой информации об этом.Вот мой вопрос: Вопрос
Поэтому я решил использовать среду многопоточности-исполнителя.Я новичок в многопоточности Вот мой код:
@Async
public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {
ExecutorService executorService = Executors.newFixedThreadPool(8);
Collection<Callable<String>> callables = new ArrayList<>();
List<Subscriber> cache = new ArrayList<>();
int batchSize = defaultBatchSize.intValue();
while ((line = br.readLine()) != null) {
try {
Subscriber subscriber = createSubscriber(subscription, line);
cache.add(subscriber);
if (cache.size() >= batchSize) {
IntStream.rangeClosed(1, 8).forEach(i -> {
callables.add(createCallable(cache, subscription.getSubscriptionId()));});
}
} catch (InvalidSubscriberDataException e) {
invalidRows.add(line + ":" + e.getMessage());
invalidCount++;
}
}
List<Future<String>> taskFutureList = executorService.invokeAll(callables);
for (Future<String> future : taskFutureList) {
String value = future.get(4, TimeUnit.SECONDS);
System.out.println(String.format("TaskFuture returned value %s", value));
}
}
private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {
return new Callable<String>() {
public String call() throws Exception {
System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
processSubscribers(cache,subscriptionId);
System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
return "Finish Thread:" + Thread.currentThread().getName();
}
};
}
private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
subscriberRedisRepository.saveAll(cache);
cache.clear();
}
Идея в том, что я хочу разделить файл на пакет и сохранить этот пакет, используя поток.Я создал пул из 8 потоков.
Это правильный способ реализации среды исполнения?Если нет, не могли бы вы помочь мне в этом?Ценю помощь.