Сейчас я читаю строки из файла и сохраняю в базе данных, используя следующий код:
String strQuery = "INSERT INTO public.alarm (id, name, marks) VALUES (?, ?, ?)";
JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://localhost:5432/postgres?user=michel&password=polnareff")
.setQuery(strQuery)
.setSqlTypes(new int[] { Types.INTEGER, Types.VARCHAR, Types.INTEGER}) //set the types
.finish();
DataStream<Row> rows = FilterStream
.map((tuple)-> {
Row row = new Row(3);
row.setField(0, tuple.f0);
row.setField(1, tuple.f1);
row.setField(2, tuple.f2);
return row;
});
rows.writeUsingOutputFormat(jdbcOutput);
env.execute();
}
}
Вышеприведенный код работает нормально, он выбирает строки из файла и сохраняет их в базе данных.
Например:
Если файл содержит:
1, mark, 20
тогда запись в базе данных будет выглядеть так:
id name marks
------------------
1 mark 20
Теперь требование для каждой строки, я должен создать 2 разных строки, и это должно выглядеть так:
Например:
Если файл содержит:
1, mark, 20
тогда запись в базе данных должна выглядеть так:
id name marks
------------------
1 mark-1 20
1 mark-2 20
Теперь я должен вернуть List вместо строки, а переменная потока данных должна выглядеть как DataStream<List<Row>> rows
.
Что я должен изменить в переменной JDBCOutputFormat, чтобы добиться этого?