Мне нужна функциональность, которая позволяла бы асинхронно отправлять сообщения на мой PublishSubject
и обрабатывать их в определенном темпе (фактически по одному) через ConnectableObservable
. К сожалению, кажется, что вызов onNext
из PublishSubject
не прекращается, пока базовый Subscriber
не обработает сообщение.
Требуется несколько секунд для обработки каждого сообщения, и в режиме отладки я вижу, что оно выполняется до того, как вызов метода, который отправляет сообщение в PublishSubject, удаляется из стека - "After push..."
всегда появляется в консоли после внутренних журналов внутри Subscriber
...
Итак, у меня есть эта RestEndpoint:
@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Message metadata = processor.apply(extId);
log.info("Before push...");
dataImporter.pushData(metadata);
log.info("After push...");
} catch (Exception e) {
e.printStackTrace();
}
}
});
return Response.ok("Request received successfully").build();
}
Вот конструктор DataImporter:
public DataImporter(final String configFile) {
dataToImportSubject = PublishSubject.create();
dataToImportObservable = dataToImportSubject.publish();
dataToImportObservable.connect();
dataToImportObservable
.onBackpressureBuffer(1, new Action0() {
@Override
public void call() {
logger.debug("Buffer full...");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Message>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable e) {
logger.error("Error importing "+e.getMessage());
}
@Override
public void onNext(Message value) {
request(1);
importResult(configFile, value);
}
@Override
public void onStart() {
request(1);
}
});
}
Тогда pushData
DataImporter просто нажимает на PublishSubject
onNext
метод ..:
public void pushData(Message metadata) {
dataToImportSubject.onNext(metadata);
}
А вот и декларация PublishSubject
и ConnectableObservable
:
public class DataImporter implements ImporterProxy{
private final PublishSubject<Message> dataToImportSubject;
private final ConnectableObservable<Message> dataToImportObservable;