У меня следующая ситуация: мне нужно обработать поток, который я получаю как Flowable. Каждый элемент в потоке имеет часть данных, только первый элемент в потоке содержит метаданные. Для функции, которая может обрабатывать поток данных, требуется информация в метаданных.
Что-то вроде:
// Stream items look like this
class StreamItem{
Metadata meta;
Data data;
}
// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);
Я получаю Flowable<StreamItem>
. Я пытался сделать что-то вроде:
Flowable<StreamItem> input = ...
ConnectableFlowable<StreamItem> multi = input.publish;
Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);
Flowable<Data> streamData = multi.map(StreamItem::getData);
multi.connect();
Single<Result> result = streamMeta.flatMap(meta -> processStream(meta,streamData));
После этого я просто возвращаю result.ignoreResult()
(поскольку нам нужны побочные эффекты процесса, но не объекта), и от клиента (который является точка входа) мы просто отображаем это Completable
в стандартный ответ на вызов. Не уверен, что эта последняя часть имеет отношение.
Я также попытался:
Flowable<Result> res = input.publish(
flow -> {
Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
Flowable<Data> data = flow.map(StreamITem::getData);
return meta.flatMap(met -> processStream(met,data)).toFlowable();
});
и затем вернул res.ignoreElements()
для того же процесса Completable
, описанного выше.
Мне удалось обработать либо мета, либо заглушить мета и обработать поток данных, но как только я подключаю оба, как описано выше, кажется, что обработка не выполняется. Я думаю, что это может быть, что я обрабатываю один и тот же поток? Во всяком случае, я думаю, что я, вероятно, неправильно понимаю, как все это работает (я довольно новичок в Rx), поэтому, если у кого-то есть лучшая идея о том, как этого добиться, я хотел бы услышать это!