Reactor Scheduler с анонимным потоком - PullRequest
0 голосов
/ 20 января 2019

Я тестирую, как работает реактор, создал такой код, который очень похож на то, что можно найти в документации реактора.

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@SpringBootTest
@RunWith(SpringRunner.class)
public class ReactorApplicationTests {

  @Test
  public void publishOnThreadTest() {
    Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

    final Mono<String> mono = Mono.just("Publish on test: \n")
            .map(msg -> msg + "before: " + Thread.currentThread() )
            .publishOn(s)
            .map(msg -> msg + "\nafter: " + Thread.currentThread());

    new Thread(() -> mono.subscribe(System.out::println)).start();
  }
}

И я не могу заставить его работать, что я делаю неправильно?Просто подписавшись, это работает, но я хотел посмотреть на используемый поток и немного поиграть с ним.

1 Ответ

0 голосов
/ 20 января 2019

Причина, по которой ваша тестовая программа ничего не печатает, заключается в том, что она выходит слишком рано. Следует подождать, пока не будет вызван метод суббрайбера:

@Test
public void publishOnThreadTest() throws InterruptedException {
    Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
    CountDownLatch latch = new CountDownLatch(1);

    final Mono<String> mono = Mono.just("Publish on test: \n")
            .map(msg -> msg + "before: " + Thread.currentThread() )
            .publishOn(s)
            .map(msg -> msg + "\nafter: " + Thread.currentThread());

    new Thread(() -> mono.subscribe((String str) ->{
        System.out.println(str);
        latch.countDown();
    })).start();

    latch.await();
}
...