GroupBy сразу после ключей RxJava - PullRequest
0 голосов
/ 05 августа 2020

У меня простая проблема: я получаю кучу строк из базы данных SQL. Мне нужно свернуть эти строки, уменьшить их до нескольких json объектов. Используя rx Java, у меня есть:

Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
             .groupBy(array -> array.getValue(0))
             .flatMap(g -> extractData(g));

Дело в том, что когда данных больше определенного количества, этот код зависает. думаю Я понимаю, почему, из-за ограничения параллелизма flatMap и потому, что мои группы никогда не заканчивают свою работу и ждут завершения основного наблюдаемого, прежде чем вернуться (для уменьшения необходимо, чтобы все значения в группе работали). Дело в том, что строки, которые я обрабатываю, гарантированно упорядочиваются с помощью определенного ключа c, такого же, что и тот, который я группирую.

Я ищу способ закрыть предыдущую группу, когда новая группа создана. Есть ли способ добиться этого?

Я думал, что groupByUntil позволит это, но похоже, что теперь он объединен в метод groupBy (по крайней мере, в Rx Java), и я не могу справиться найти способ, используя takeUntil / takeWhile

EDIT:

Просто осознавая, что без содержимого extractData это довольно сложно понять, вот оно:

return group.reduce(new Data(), <business logic>).toFlowable()

1 Ответ

1 голос
/ 07 августа 2020

Кажется, мне удалось это сделать с помощью собственного оператора и метода lift. Это действительно далеко не пуленепробиваемое, но пока оно работает ... Довольно сложно следовать всем руководящим принципам руководству , и я надеюсь, что я не возился с противодавлением или чем-то еще:

private static final class ConvertSQLRowsToSchemaInstances implements FlowableOperator<SchemaInstance, JsonArray> {
  private final SQLRowStream rows;
  private final Loader loader;
  private final Table table;
  private final List<Map.Entry<String, TableField>> wanted;
  private final List<Map.Entry<String, TableField>> unwanted;
  private final TableField unicity;

  public ConvertSQLRowsToSchemaInstances(SQLRowStream rows, Loader loader, Table table,
                                         List<Map.Entry<String, TableField>> wanted,
                                         List<Map.Entry<String, TableField>> unwanted) {
    this.rows = rows;
    this.loader = loader;
    this.table = table;
    this.wanted = wanted;
    this.unwanted = unwanted;
    this.unicity = table.getUnicityFields().get(0);
  }

  @NonNull
  @Override
  public Subscriber<? super JsonArray> apply(@NonNull Subscriber<? super SchemaInstance> subscriber) throws Exception {
    return new Op(subscriber);
  }

  private final class Op implements FlowableSubscriber<JsonArray>, Subscription {
    final Subscriber<? super SchemaInstance> child;

    SchemaInstance si = null;
    Subscription s;

    public Op(Subscriber<? super SchemaInstance> child) {
      this.child = child;
    }

    @Override
    public void onSubscribe(Subscription s) {
      this.s = s;
      child.onSubscribe(this);
    }

    @Override
    public void onNext(JsonArray array) {
      try {
        if (si == null) si = loader.getEmptyInstance(table.getSchema());
        else if (!si.get(unicity.getName()).get().equals(array.getValue(rows.column(unicity.getName())))) {
          // New schema arrived
          child.onNext(si);
          si = loader.getEmptyInstance(table.getSchema());
        }
        extractData(si, array, rows, table, loader, wanted, unwanted);
        request(1);
      } catch (UnknownTypeException | IllegalAccessException | InstantiationException | NoSuchFieldException e) {
        onError(e);
      }
    }

    @Override
    public void onError(Throwable e) {
      child.onError(e);
    }

    @Override
    public void onComplete() {
      if (si != null) child.onNext(si);
      child.onComplete();
    }

    @Override
    public void cancel() {
      s.cancel();
    }

    @Override
    public void request(long n) {
      s.request(n);
    }
  }
}

Просто пытаюсь собрать несколько строк из SQL в один экземпляр класса. Любые советы приветствуются.

...