Идиоматический способ подачи многопоточного вывода в один поток в современной Java? - PullRequest
2 голосов
/ 07 апреля 2019

У меня есть этот код, который отлично работает. Этот вопрос сфокусирован на удобстве сопровождения кода и написании меньшего количества кода для достижения той же цели:

                Queue<IncomingItem[]> queue = new ConcurrentLinkedQueue<>();
                IncomingItem[] EOF = new IncomingItem[0];

                ForkJoinPool.commonPool().submit(() -> {
                    IncomingItem[] next;
                    while((next = queue.poll()) != EOF) {
                        if(next == null) {
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                            continue;
                        }
                        dao.batchInsert(next);
                    }
                });
                ds.reload(queue::add);
                queue.add(EOF);
                ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.HOURS);

Цель состоит в том, чтобы вывод ds.reload, который является сильно многопоточным методом, передавался в метод dao.batchInsert, который не является потокобезопасным и не может использоваться совместно с потоками (например, это DAO на основе Hibernate). ), при этом никогда не блокируя метод ds.reload, как это было бы, если бы метод dao.batchInsert был сделан synchronized.

Этот код был совместим с Java 8. Есть ли что-нибудь, что появилось в более новых выпусках Java, которые позволили бы сделать более элегантное решение?

1 Ответ

1 голос
/ 07 апреля 2019

Использование пула потоков здесь является ненужным осложнением.Используйте выделенную тему.Вместо опроса и спящего режима используйте очередь блокировки.

            BlockingQueue<IncomingItem[]> queue = new LinkedBlockingQueue<>();
            IncomingItem[] EOF = new IncomingItem[0];
            Thread thread = new Thread(() -> {
                IncomingItem[] next;
                while((next = queue.take()) != EOF) {
                    dao.batchInsert(next);
                }
            });
            thread.start();
            ds.reload(queue::put);
            queue.add(EOF);
            thread.join();
...