Publi sh FluxSink разных типов объектов - PullRequest
0 голосов
/ 27 февраля 2020

У меня есть конечная точка rsocket, которая отвечает потоком:

@MessageMapping("responses")
Flux<?> deal(@Payload String message) {
    return myService.generateResponses(message);
}

Ответами может быть любой из 3 различных типов объектов, созданных асинхронно с использованием следующего кода (если он работал):

public Flux<?> generateResponses(String request) {
  // Setup response sinks
  final FluxProcessor publish = EmitterProcessor.create().serialize();
  final FluxSink<Response1> sink1 = publish.sink();    
  final FluxSink<Response2> sink2 = publish.sink();
  final FluxSink<Response3> sink3 = publish.sink();

  // Get async responses: starts new thread to gather responses and update sinks
  new MyResponses(request, sink1, sink2, sink3)

  // Return the Flux
  Flux<?> output = Flux
    .from(publish
    .log());
}

Проблема заключается в том, что когда я заполняю приемники различными объектами, только первый приемник фактически публикует обратно подписчику.

public class MyResponses extends CacheListenerAdapter {
  private FluxSink<Response1> sink1;
  private FluxSink<Response2> sink2;
  private FluxSink<Response3> sink3;

  // Constructor is omitted for brevity

  @Override
  public void afterCreate(EntryEvent event) {       
    if (event.getNewValue() instanceof Response1) {  
        Response1 r1 = (Response1)event.getNewValue();
        sink1.next(r1);
    }
    if (event.getNewValue() instanceof Response2) {  
        Response2 r2 = (Response2)event.getNewValue();
        sink2.next(r2);
    }
    if (event.getNewValue() instanceof Response3) {  
        Response3 r3 = (Response3)event.getNewValue();
        sink3.next(r3);
    }
  }
}

Если я создаю приемники типа <?>, то возникает a .next ошибка:

The method next(capture#2-of ?) in the type FluxSink<capture#2-of ?> is not applicable for the arguments (Response1)

Есть ли лучший подход к этому требованию?

1 Ответ

0 голосов
/ 28 февраля 2020

Причина, по которой это не работало с другими объектами, заключалась в сериализации Spring Boot Data Geode базовых типов объектов. Чтобы заставить работать объект Flux, было использовать 1 приемник типа <Object>

public Flux<Object> generateResponses(String request) {
  // Setup the Flux
  EmitterProcessor<Object> emitter = EmitterProcessor.create();
  FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

  // Get async responses: starts new thread to gather responses and update sinks
  new MyResponses(request, sink)

  // Setup an output Flux to publish the input Flux
  Flux<Object> out = Flux
    .from(emitter
    .log(log.getName()));
}

Затем обработчик событий использовал 1 приемник

public class MyResponses extends CacheListenerAdapter {
  private FluxSink<Object> sink;

  // Constructor is omitted for brevity

  @Override
  public void afterCreate(EntryEvent event) {       
    if (event.getNewValue() instanceof Response1) {  
        Response1 r1 = (Response1)event.getNewValue();
        sink.next(r1);
    }
    if (event.getNewValue() instanceof Response2) {  
        Response2 r2 = (Response2)event.getNewValue();
        sink.next(r2);
    }
    if (event.getNewValue() instanceof Response3) {  
        Response3 r3 = (Response3)event.getNewValue();
        sink.next(r3);
    }
  }
}
...