Java 9 - как работает издатель и подписчик - PullRequest
0 голосов
/ 05 декабря 2018

Я пытаюсь понять, как Subscriber и Publisher работает в java 9 .

Здесь я создал один subscriber здесь и использую SubmissionPublisher для публикации элемента.

Я пытаюсь опубликовать 100 строк для subscriber.Если я не заставлю Client программу sleep ( см. Закомментированный код в MyReactiveApp), я не увижу, что все элементы опубликованы.

почему это не так?в ожидании всех строк, обработанных здесь:

strs.stream().forEach(i -> publisher.submit(i)); // what happens here? 

Если я заменю приведенный выше код на, я вижу, что все строки печатаются в консоли

strs.stream().forEach(System.out::println);

Клиентская программа, которая публикуется с использованием SubmissionPublisher.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);


        List<String> strs = getStrs();

        System.out.println("Publishing Items to Subscriber");
        strs.stream().forEach(i -> publisher.submit(i));

        /*while (strs.size() != subs.getCounter()) {
            Thread.sleep(10);
        }*/

        //publisher.close();

        System.out.println("Exiting the app");

    }

    private static List<String> getStrs(){

        return Stream.generate(new Supplier<String>() {
            int i =1;
            @Override
            public String get() {
                return "name "+ (i++);
            }
        }).limit(100).collect(Collectors.toList());
    }

}

Абонент

import java.util.concurrent.Flow.Subscription;

public class MySubscriber implements java.util.concurrent.Flow.Subscriber<String>{

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(100);

    }

    @Override
    public void onNext(String item) {
        System.out.println(this.getClass().getSimpleName()+" item "+item);
        //subscription.request(1);
        counter++;

    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(this.getClass().getName()+ " an error occured "+throwable);

    }

    @Override
    public void onComplete() {
        System.out.println("activity completed");

    }
    public int getCounter() {
        return counter;
    }

}

выход:

Publishing Items to Subscriber
MySubscriber item name 1
MySubscriber item name 2
MySubscriber item name 3
MySubscriber item name 4
MySubscriber item name 5
Exiting the app
MySubscriber item name 6
MySubscriber item name 7
MySubscriber item name 8
MySubscriber item name 9
MySubscriber item name 10
MySubscriber item name 11
MySubscriber item name 12

1 Ответ

0 голосов
/ 02 января 2019
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

Создает новый SubmissionPublisher, используя ForkJoinPool.commonPool () для асинхронной доставки подписчикам

см .: https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/SubmissionPublisher.html#SubmissionPublisher--

Так на самом деле

    strs.stream().forEach(i -> publisher.submit(i));

ставит все заявки в очередь и асинхронно доставляет их в другой поток.Но затем приложение прекращается.Это не зависит от прогресса рабочего потока.Это означает, что приложение завершается независимо от того, сколько элементов рабочий поток уже доставил.

Это может отличаться для каждого прогона.В худшем случае приложение может быть прекращено до того, как будет доставлен первый элемент.

Потоки

Если вы хотите убедиться, что основной метод MyReactiveApp и доставкав OnSext MySubscriber происходит на разных потоках, вы можете распечатать имена соответствующих потоков, например, в основной MyReactiveApp:

System.out.println(Thread.currentThread().getName()) 

будет выводить main в качестве имени потока.

Принимая во внимание, что метод onSext MySubscriber будет, например, выводить что-то вроде ForkJoinPool.commonPool-worker-1.

Потоки пользователя и Деймона

Почему приложение завершается, хотя у нас все еще есть работающий поток?

В Java существует два вида потоков:

  • пользовательские потоки
  • потоки демонов

Java-программа завершает свою работу, когда пользовательские потоки больше не выполняются, даже когда потоки deamon все еще работают.

Основной поток - это пользовательский поток.SubmissionPublisher использует здесь рабочие потоки из ForkJoinPool.commonPool ().Это потоки демонов.

Все рабочие потоки инициализируются с установленным значением Thread.isDaemon ().

https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ForkJoinPool.html

...