Как я могу получить доступ к ключу в подклассе CombinerFn при комбинировании PC-набора пар KV? - PullRequest
1 голос
/ 05 июня 2019

Я реализую CombinePerKeyExample , используя подкласс CombineFn вместо использования SerializableFunction

package me.examples;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.transforms.Combine.CombineFn;

import java.util.HashSet;
import java.util.Set;

public class ConcatWordsCombineFn extends CombineFn<String, ConcatWordsCombineFn.Accumulator, String> {
    @DefaultCoder(AvroCoder.class)
    public static class Accumulator{
        HashSet<String> plays;
    }

    @Override
    public Accumulator createAccumulator(){
        Accumulator accumulator = new Accumulator();
        accumulator.plays = new HashSet<>();
        return accumulator;
    }

    @Override
    public Accumulator addInput(Accumulator accumulator, String input){
        accumulator.plays.add(input);
        return accumulator;
    }

    @Override
    public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators){
        Accumulator mergeAccumulator = new Accumulator();
        mergeAccumulator.plays = new HashSet<>();

        for(Accumulator accumulator: accumulators){
            mergeAccumulator.plays.addAll(accumulator.plays);
        }

        return mergeAccumulator;
    }

    @Override
    public String extractOutput(Accumulator accumulator){
        //how to access the key here ? 
        return String.join(",", accumulator.plays);
    }
}

. Конвейер состоит из ReadFromBigQuery,ExtractAllPlaysOfWords (код ниже) и WriteToBigQuery

package me.examples;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class PlaysForWord extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {


    @Override
    public PCollection<TableRow> expand(PCollection<TableRow> input) {

            PCollection<KV<String, String>> largeWords = input.apply("ExtractLargeWords", ParDo.of(new ExtractLargeWordsFn()));
            PCollection<KV<String, String>> wordNPlays = largeWords.apply("CombinePlays",Combine.perKey(new ConcatWordsCombineFn()));
            wordNPlays.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
            PCollection<TableRow> rows = wordNPlays.apply("FormatToRow", ParDo.of(new FormatShakespeareOutputFn()));
            return rows;
    }
}

Я хотел бы получить доступ к ключу в ConcatWordsCombineFn, чтобы выполнить окончательное накопление на основе этого.Например, можно объединить слова с ,, если ключ начинается с a, или использовать ; в противном случае.

При просмотре руководства по программированию

Есливам нужно изменить стратегию объединения на основе ключа (например, MIN для некоторых пользователей и MAX для других пользователей), вы можете определить KeyedCombineFn для доступа к ключу в стратегии объединения.

Я не смог найти KeyedCombineFn в org.apache.beam.sdk.transforms.Combine Я использую Apache Beam 2.12.0 и Google Dataflow в качестве бегуна.

1 Ответ

3 голосов
/ 06 июня 2019

Я не думаю, что есть встроенный способ решить эту проблему.Простой обходной путь (я знаю, что он не идеален) - это обернуть вашу строку в другой KV: KV<String, KV<String, String>>, где обе клавиши одинаковы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...