Как читать AvroFile в класс Tuple с Java в Flink - PullRequest
0 голосов
/ 18 декабря 2018

Я пытаюсь прочитать файл Avro и выполнить некоторые операции с ним, все работает нормально, но функции агрегирования, когда я их использую, получают следующее исключение:

aggregating on field positions is only possible on tuple data types

, затем я меняю свойкласс для реализации Tuple4 (так как у меня есть 4 поля), но затем, когда я хочу собрать результаты, получаю AvroTypeException Unknown Тип: T0

Вот мои данные и классы заданий:

public class Nation{

public Integer N_NATIONKEY;
public String N_NAME;
public Integer N_REGIONKEY;
public String N_COMMENT;

public Integer getN_NATIONKEY() {
    return N_NATIONKEY;
}

public void setN_NATIONKEY(Integer n_NATIONKEY) {
    N_NATIONKEY = n_NATIONKEY;
}

public String getN_NAME() {
    return N_NAME;
}

public void setN_NAME(String n_NAME) {
    N_NAME = n_NAME;
}

public Integer getN_REGIONKEY() {
    return N_REGIONKEY;
}

public void setN_REGIONKEY(Integer n_REGIONKEY) {
    N_REGIONKEY = n_REGIONKEY;
}

public String getN_COMMENT() {
    return N_COMMENT;
}

public void setN_COMMENT(String n_COMMENT) {
    N_COMMENT = n_COMMENT;
}
public Nation() {
}


public static void main(String[] args) throws Exception {
    Configuration parameters = new Configuration();

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


    Path path2 = new Path("/Users/violet/Desktop/nation.avro");

    AvroInputFormat<Nation> format = new AvroInputFormat<Nation>(path2,Nation.class);
    format.configure(parameters);
    DataSet<Nation> nation = env.createInput(format);
    nation.aggregate(Aggregations.SUM,0);


    JobExecutionResult res = env.execute();
}

ивот класс кортежа и тот же код для задания, что и выше:

public class NationTuple extends Tuple4<Integer,String,Integer,String> {

Integer N_NATIONKEY(){ return this.f0;}
String N_NAME(){return this.f1;}
Integer N_REGIONKEY(){ return this.f2;}
String N_COMMENT(){ return this.f3;}

}

Я пытался с этим классом и получил TypeException (везде используется NationTuple вместо Nation)

1 Ответ

0 голосов
/ 18 декабря 2018

Я не думаю, что ваш класс, реализующий Tuple4, - правильный путь.Вместо этого вы должны добавить в свою топологию MapFunction, которая преобразует ваш NationTuple в Tuple4.

static Tuple4<Integer, String, Integer, String> toTuple(Nation nation) {
  return Tuple4.of(nation.N_NATIONKEY, ...);
}

А затем в вызове вашей топологии:

inputData.map(p -> toTuple(p)).returns(new TypeHint<Tuple4<Integer, String, Integer, String>(){});

Единственная тонкая часть - это то, что вам нужнопредоставьте подсказку типа, чтобы flink мог выяснить, какой тип кортежа возвращает ваша функция.

Другое решение заключается в использовании имен полей вместо индексов полей кортежей при выполнении агрегации.Например:

groupBy("N_NATIONKEY", "N_REGIONKEY")

Это все объясняется здесь: https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#specifying-keys

...