Разделить Flowable на 2, обработать 2 потока, но один зависит от другого? - PullRequest
0 голосов
/ 21 марта 2020

У меня следующая ситуация: мне нужно обработать поток, который я получаю как Flowable. Каждый элемент в потоке имеет часть данных, только первый элемент в потоке содержит метаданные. Для функции, которая может обрабатывать поток данных, требуется информация в метаданных.

Что-то вроде:

// Stream items look like this
class StreamItem{
   Metadata meta;
   Data data;
}

// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);

Я получаю Flowable<StreamItem>. Я пытался сделать что-то вроде:

Flowable<StreamItem> input = ...

ConnectableFlowable<StreamItem> multi = input.publish;

Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);

Flowable<Data> streamData = multi.map(StreamItem::getData);

multi.connect();

Single<Result> result = streamMeta.flatMap(meta ->  processStream(meta,streamData));

После этого я просто возвращаю result.ignoreResult() (поскольку нам нужны побочные эффекты процесса, но не объекта), и от клиента (который является точка входа) мы просто отображаем это Completable в стандартный ответ на вызов. Не уверен, что эта последняя часть имеет отношение.

Я также попытался:

Flowable<Result> res = input.publish(
   flow -> {
     Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
     Flowable<Data> data = flow.map(StreamITem::getData);
     return meta.flatMap(met -> processStream(met,data)).toFlowable();
   });

и затем вернул res.ignoreElements() для того же процесса Completable, описанного выше.

Мне удалось обработать либо мета, либо заглушить мета и обработать поток данных, но как только я подключаю оба, как описано выше, кажется, что обработка не выполняется. Я думаю, что это может быть, что я обрабатываю один и тот же поток? Во всяком случае, я думаю, что я, вероятно, неправильно понимаю, как все это работает (я довольно новичок в Rx), поэтому, если у кого-то есть лучшая идея о том, как этого добиться, я хотел бы услышать это!

1 Ответ

1 голос
/ 23 марта 2020

Немного изменив некоторые вещи, я думаю, вы можете использовать Flowable::withLatestFrom( Flowable, BiFunction ).

// Stream items look like this
class StreamItem
{
    String meta;
    Integer data;

    public String getMeta()
    {
        return meta;
    }

    public Integer getData()
    {
        return data;
    }
}

// Processor looks like this
interface Processor
{
    String processStream( String meta, Integer data );
}

@Test
public void testFlowable()
{
    // Set up mock input:
    AtomicBoolean first = new AtomicBoolean( true );

    Flowable<StreamItem> input = Flowable.generate( emitter -> {

        StreamItem item = new StreamItem();
        item.data = (int)( Math.random() * 100 );

        if ( first.getAndSet( false )) {
            item.meta = UUID.randomUUID().toString();
        }

        emitter.onNext( item );
    } );

    // Mock processor:
    Processor processor = ( meta, data ) -> meta + " : " + data;

    // Set up rx pipeline:
    Flowable<StreamItem> multi = input.share();
    Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );

    Flowable<String> result = multi.map( StreamItem::getData )
        .withLatestFrom( streamMeta.toFlowable(),
                ( data, meta ) -> processor.processStream( meta, data ));

    // Subscribe:
    result.take( 5 ).blockingSubscribe( System.out::println );
}

Вывод:

3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 72
3fba00bd-027b-4802-8b7d-674497d72052 : 47
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 93

Обновление на основе обратной связи:

Похоже, сработает, если вам действительно нужны ваши данные Flowable рядом с конкретным объектом метаданных:

// Stream items look like this
class StreamItem
{
    String meta;
    Integer data;

    public String getMeta()
    {
        return meta;
    }

    public Integer getData()
    {
        return data;
    }
}

// Processor looks like this
interface Processor
{
    String processStream( String meta, Flowable<Integer> data );
}

@Test
public void testFlowable()
{
    // Set up mock input:
    AtomicBoolean first = new AtomicBoolean( true );

    Flowable<StreamItem> input = Flowable.generate( emitter -> {

        StreamItem item = new StreamItem();
        item.data = (int)( Math.random() * 100 );

        if ( first.getAndSet( false )) {
            item.meta = UUID.randomUUID().toString();
        }

        emitter.onNext( item );
    } );

    // Mock processor:
    Processor processor = ( meta, data ) -> {
        System.out.println( meta );
        data.subscribe( System.out::println );
        return meta;
    };

    // Set up rx pipeline:
    Flowable<StreamItem> multi = input.take( 5 ).share();
    Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );

    streamMeta.map( meta ->
        processor.processStream( meta, multi.map( StreamItem::getData )))
    .subscribe();
}

Вывод:

3421c5f6-8554-43ce-aa69-e6cef9c1ed89
47
46
74
59
57
...