Структура исполнителя для обработки 1 миллиона записей - PullRequest
1 голос
/ 24 сентября 2019

У меня было требование, когда мне нужно было обработать файл, содержащий 1 миллион записей и сохранить его в кэше Redis.Я должен был использовать конвейер Redis, но не получил никакой информации об этом.Вот мой вопрос: Вопрос

Поэтому я решил использовать среду многопоточности-исполнителя.Я новичок в многопоточности Вот мой код:

@Async
    public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {

        ExecutorService executorService = Executors.newFixedThreadPool(8);
        Collection<Callable<String>> callables = new ArrayList<>();


        List<Subscriber> cache = new ArrayList<>();
        int batchSize = defaultBatchSize.intValue();

        while ((line = br.readLine()) != null) {
            try {
                Subscriber subscriber = createSubscriber(subscription, line);
                cache.add(subscriber);
                if (cache.size() >= batchSize) {
                    IntStream.rangeClosed(1, 8).forEach(i -> {
                    callables.add(createCallable(cache, subscription.getSubscriptionId()));});
                }
            } catch (InvalidSubscriberDataException e) {
                invalidRows.add(line + ":" + e.getMessage());
                invalidCount++;
            }
        }
        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        for (Future<String> future : taskFutureList) {
            String value = future.get(4, TimeUnit.SECONDS);
            System.out.println(String.format("TaskFuture returned value %s", value));
        }
    }

    private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {

        return new Callable<String>() {

            public String call() throws Exception {

                System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
                processSubscribers(cache,subscriptionId);
                System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
                return "Finish Thread:" + Thread.currentThread().getName();
            }
        };
    }

    private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
        subscriberRedisRepository.saveAll(cache);
        cache.clear();
    }

Идея в том, что я хочу разделить файл на пакет и сохранить этот пакет, используя поток.Я создал пул из 8 потоков.

Это правильный способ реализации среды исполнения?Если нет, не могли бы вы помочь мне в этом?Ценю помощь.

1 Ответ

0 голосов
/ 24 сентября 2019

Быстрые изменения в вашем текущем коде для получения запроса:

В цикле while, когда текущий кэш превышает размер пакета, создайте вызываемую передачу в текущем кеше. Сброс списка кеша, создание нового списка и назначение его в качестве кеша.

Вы создаете список вызываемых элементов, чтобы отправить их в виде пакета, почему бы не отправить свои вызываемые элементы сразу после их создания?Это начнет запись уже прочитанных записей в redis, в то время как ваш основной поток продолжит чтение из файла.

 List<Future<String>> taskFutureList = new LinkedList<Future<String>>();
 while ((line = br.readLine()) != null) {
    try {
        Subscriber subscriber = createSubscriber(subscription, line);
        cache.add(subscriber);
        if (cache.size() >= batchSize) {
                    taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
            List<Subscriber> cache = new ArrayList<>();
        }
     } catch (InvalidSubscriberDataException e) {
        invalidRows.add(line + ":" + e.getMessage());
        invalidCount++;
    }
}
//submit last batch that could be < batchSize
if(!cache.isEmpty()){ 
           taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
}

Вам не нужно хранить отдельный список вызываемых объектов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...