Flink: как сохранить список строк в базе данных - PullRequest
0 голосов
/ 19 марта 2019

Сейчас я читаю строки из файла и сохраняю в базе данных, используя следующий код:

String strQuery = "INSERT INTO public.alarm (id, name, marks) VALUES (?, ?, ?)";
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
 .setDrivername("org.postgresql.Driver")
 .setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
 .setQuery(strQuery)
 .setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR, Types.INTEGER}) //set the types
 .finish();
DataStream<Row> rows = FilterStream
            .map((tuple)-> {
               Row row = new Row(3);                 
               row.setField(0, tuple.f0);            
               row.setField(1, tuple.f1);             
               row.setField(2, tuple.f2);   
               return row;
            });
rows.writeUsingOutputFormat(jdbcOutput);
env.execute();
}
}

Вышеприведенный код работает нормально, он выбирает строки из файла и сохраняет их в базе данных.

Например:

Если файл содержит:

1, mark, 20

тогда запись в базе данных будет выглядеть так:

id   name   marks
------------------
1    mark   20

Теперь требование для каждой строки, я должен создать 2 разных строки, и это должно выглядеть так:

Например:

Если файл содержит:

1, mark, 20

тогда запись в базе данных должна выглядеть так:

id   name   marks
------------------
1    mark-1   20
1    mark-2   20

Теперь я должен вернуть List вместо строки, а переменная потока данных должна выглядеть как DataStream<List<Row>> rows.

Что я должен изменить в переменной JDBCOutputFormat, чтобы добиться этого?

...