RxJava группа отсортированных данных - PullRequest
0 голосов
/ 22 февраля 2019

У меня очень большая коллекция данных, и необработанные данные уже отсортированы по тому, что будет ключевым.Например, у меня есть файл CSV, и первый столбец будет ключом для группировки.

A,x,x,x
A,x,y,x
A,z,y,y
C,x,s,d
C,t,d,s
B,a,s,a
E,x,x,x
E,t,r,y

Эти строки преобразуются в объекты, помещаются в список и передаются в потоковом режиме с использованием RxJava Flowable.Поскольку этот CSV будет огромным (настолько огромным, что может привести к сбою приложения), есть хороший способ преобразовать эти объекты в элемент карты, который будет выглядеть следующим образом:

{ 'A': [[x,x,x],[x,y,x],[z,y,y]] }

Ответы [ 2 ]

0 голосов
/ 26 февраля 2019

Я использую FlowableTransformers.partialCollect сейчас.Примером будет

Flowable.fromPublisher(FlowableTransformers.partialCollect(
        (Consumer<PartialCollectEmitter<LineData, Integer, 
        ListBuilder, ListDataModel>>) emitter -> {
            // Get or initialize collecting object
            ListBuilder lb = emitter.getAccumulator();
            if (lb == null) {
                lb = new ListBuilder();
                emitter.setAccumulator(lb);
            }

            if (emitter.demand() != 0) {
                boolean d = emitter.isComplete();
                if (emitter.size() != 0 && !d) {
                    LineData data = emitter.getItem(0);
                    emitter.dropItems(1);

                    // add returns the finished model if the prefix changes
                    ListDataModel model = lb.add(data);

                    if (model != null) {
                        emitter.next(model);
                    }
                } else if (d) {
                    if (!lb.isEmpty()) {
                        // clear returns the last model
                        emitter.next(lb.clear());
                    }
                    emitter.complete();
                    return;
                }
            }
            emitter.setIndex(0);
        }, Functions.emptyConsumer(), settings.getReadBufferSize() + 1).apply(
                Flowable.fromIterable(file.getFileNameList())
                        .concatMap(
                                fileName -> reader
                                        .getLineData(fileName)
                                        .buffer(settings.getReadBufferSize()))
                        .flatMap(Flowable::fromIterable)))
0 голосов
/ 23 февраля 2019

Используйте collectWhile из rxjava2-extras и укажите фабрику сбора для создания специального ключевого объекта:


class Keyed {
    final K key;
    final List<Value> list;
    ...
}

K key(Value value) {
 ...
}

source.compose(
  Transformers.
    collectWhile(
      // factory
      () -> new Keyed(),
      // add
      (keyed, x) -> { 
          keyed.list.add(x);
          return keyed; },
      // condition
      (keyed, x) -> 
         keyed.list.isEmpty() ||
         key(x).equals(keyed.key)));
...