Записать в облако sql используя поток данных JdbcIO API - PullRequest
0 голосов
/ 04 июля 2018

У меня есть требование, когда мне нужно записать PCollection String в Cloud SQL с использованием Cloud Dataflow API.

pipeline.apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
    .apply(JdbcIO.write()
    .withDataSourceConfiguration(DataSourceConfiguration
    .create("org.postgresql.Driver", "jdbc:postgresql://***:5432/test")
    .withUsername("**").withPassword("password10"))
    .withStatement("insert into person values(?,?)")
    .withPreparedStatementSetter(
 new JdbcIO.PreparedStatementSetter < Object > () {
  /**
   * 
   */
  private static final long serialVersionUID = 1 L;

  @Override
  public void setParameters(Object arg0, PreparedStatement query)
  throws Exception {
   // TODO Auto-generated method stub
   query.setString(1, "Hello");
   query.setString(1, "Hi");
  }
 }));

Это пример кода, который я пытаюсь. Очень простая версия того, что я хочу сделать.

Кроме того, возможно ли выполнять запись в Cloud SQL из Dataflow, используя parDo и писать простые операторы вставки?

1 Ответ

0 голосов
/ 05 июля 2018

Предыдущее преобразование выводит PCollection<String>, поэтому необходимо указать, что это был тип ввода для JdbcIO<T>.write()

Примерно так:

    pipeline
        .apply(TextIO.read().from("gs://***/sampleBigtable.csv"))
        .apply(JdbcIO.<String>write().withDataSourceConfiguration(
            DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://***:5432/test")
                .withUsername("**")
                .withPassword("password10"))
                .withStatement("insert into person values(?,?)")
                    .withPreparedStatementSetter((element, query) -> {
                        query.setInt(1, 1);
                        query.setString(2, "Hello");
                    })
        );
...