Я написал CombineFn, который имеет ввод KV<String, TableRow>
и вывод KV<String, Iterable<TableRow>>
.Я хотел бы использовать Combine.GroupedValues (или Combine.PerKey), и источник мог бы предположить, что это возможно, но я получаю следующую ошибку:
Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>
Мыиспользуя Beam v2.10.Контекст здесь заключается в том, что мы применяем окно сеанса к PCollection KV<String, TableRow>
, затем используем GroupByKey
для создания PCollection KV<String, Iterable<TableRow>>
.После этого шага наш CombineFn уменьшает каждую группу до KV<String, Iterable<TableRow>>
, то есть Iterable, содержащий TableRows, созданный на основе содержимого ввода.
Шаги преобразования:
public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
// group by step
PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
"Group by Key",
GroupByKey.<String, TableRow>Create()
);
// combine step
PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
"Generate New Rows",
// errors here
// Incorrect number of type arguments for generic
// method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
// of type Combine; it cannot be parameterized with arguments
// <String, TableRow, Iterable<TableRow>>
Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
);
return combinedValues;
}
Функция объединения:
private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
Double startTime = 0.0;
Double endTime = 0.0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
// the earliest and latest times in the set of table rows is set on the accumulator
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
// merge steps happen here to find the earliest and latest times
}
return merged;
}
@Override
public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
// this step will create two rows based on the start and end times found in this function
}
}
Я ожидаю, что CombineFn будет совместим с Combine.GroupedValues, как может показаться в документации.Однако, это не так.Combine.PerKey - еще один вариант, но мы не нашли способ использовать его также с CombineFn.
Соответствующие ссылки:
Документация - Combine.GroupedValues
Документация - Combine.PerKey Документация - Combine.CombineFn
Источник - Combine.GroupedValues
Источник - Combine.PerKey Источник - Combine.CombineFn