Как я могу использовать пользовательские CombineFn с Combine.GroupedValues? - PullRequest
2 голосов
/ 24 апреля 2019

Я написал CombineFn, который имеет ввод KV<String, TableRow> и вывод KV<String, Iterable<TableRow>>.Я хотел бы использовать Combine.GroupedValues ​​(или Combine.PerKey), и источник мог бы предположить, что это возможно, но я получаю следующую ошибку:

Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>

Мыиспользуя Beam v2.10.Контекст здесь заключается в том, что мы применяем окно сеанса к PCollection KV<String, TableRow>, затем используем GroupByKey для создания PCollection KV<String, Iterable<TableRow>>.После этого шага наш CombineFn уменьшает каждую группу до KV<String, Iterable<TableRow>>, то есть Iterable, содержащий TableRows, созданный на основе содержимого ввода.

Шаги преобразования:

public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {

  // group by step
  PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
    "Group by Key",
    GroupByKey.<String, TableRow>Create()
  );

    // combine step
  PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
    "Generate New Rows",
    // errors here
    // Incorrect number of type arguments for generic 
    // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) 
    // of type Combine; it cannot be parameterized with arguments 
    // <String, TableRow, Iterable<TableRow>>
    Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
  );


  return combinedValues;
}

Функция объединения:

private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {

  @DefaultCoder(AvroCoder.class)
  public static class Accum implements Serializable {
    Double startTime = 0.0;
    Double endTime = 0.0;
  }

  @Override
  public Accum createAccumulator() {
    return new Accum();
  }

  @Override
  public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
    // the earliest and latest times in the set of table rows is set on the accumulator

    return accumulator;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accumulators) {
    Accum merged = createAccumulator();
    for (Accum accumulator : accumulators) {
      // merge steps happen here to find the earliest and latest times
    }

    return merged;
  }

  @Override
  public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
    // this step will create two rows based on the start and end times found in this function
  }
}

Я ожидаю, что CombineFn будет совместим с Combine.GroupedValues, как может показаться в документации.Однако, это не так.Combine.PerKey - еще один вариант, но мы не нашли способ использовать его также с CombineFn.

Соответствующие ссылки:
Документация - Combine.GroupedValues ​​
Документация - Combine.PerKey Документация - Combine.CombineFn
Источник - Combine.GroupedValues ​​
Источник - Combine.PerKey Источник - Combine.CombineFn

1 Ответ

0 голосов
/ 26 апреля 2019

Подпись CreateEvents выглядит немного не так. оно должно быть private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> для использования вместе с GroupBy. Здесь ввод TableRow и вывод комбинации Iterable<TableRow>

Вот полный код

public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {

    // group by step
    PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
        "Group by Key",
        GroupByKey.<String, TableRow>create()
    );

    // combine step
    PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
        "Generate New Rows",
        // errors here
        // Incorrect number of type arguments for generic
        // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
        // of type Combine; it cannot be parameterized with arguments
        // <String, TableRow, Iterable<TableRow>>
        Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
    );


    return combinedValues;
  }

  private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
      Double startTime = 0.0;
      Double endTime = 0.0;
    }

    @Override
    public Accum createAccumulator() {
      return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, TableRow input) {
      // the earliest and latest times in the set of table rows is set on the accumulator

      return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
      Accum merged = createAccumulator();
      for (Accum accumulator : accumulators) {
        // merge steps happen here to find the earliest and latest times
      }

      return merged;
    }

    @Override
    public Iterable<TableRow> extractOutput(Accum accumulator) {
      // this step will create two rows based on the start and end times found in this function
      return null;
    }
  }

Кроме того, вы также можете использовать более краткий способ группировки и объединения, используя Combine.perKey


  public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
    // combine step
    return rows.apply(Combine.perKey(new CreateEvents()));
  }

  private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
      Double startTime = 0.0;
      Double endTime = 0.0;
    }

    @Override
    public Accum createAccumulator() {
      return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, TableRow input) {
      // the earliest and latest times in the set of table rows is set on the accumulator

      return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
      Accum merged = createAccumulator();
      for (Accum accumulator : accumulators) {
        // merge steps happen here to find the earliest and latest times
      }

      return merged;
    }

    @Override
    public Iterable<TableRow> extractOutput(Accum accumulator) {
      // this step will create two rows based on the start and end times found in this function
      return null;
    }
  }
...