Перемещение данных из Cloud SQL в Elasti c Поиск с использованием Beam и DataFlow - PullRequest
1 голос
/ 11 февраля 2020

Я новичок в луче и потоке данных Google, я создал простой класс для переноса данных из облака sql в Elasti c Поиск с использованием пакетной обработки, написав этот класс ниже:

package com.abc;

class DataFlowTest{

    public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject("staging");  options.setTempLocation("gs://csv_to_sql_staging/temp");    options.setStagingLocation("gs://csv_to_sql_staging/staging");  options.setRunner(DataflowRunner.class);    options.setGcpTempLocation("gs://csv_to_sql_staging/temp");
            Pipeline p = Pipeline.create(options);


      p.begin();

      p.apply(JdbcIO.read().withQuery("select * from user_table").withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://"+EnvironmentVariable.getDatabaseIp()+"/" + EnvironmentVariable.getDatabaseName()+ "&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user="+Credentials.getDatabaseUsername()+"&password="+Credentials.getDatabasePassword()+"&useSSL=false")));


  Write w = ElasticsearchIO.write().withConnectionConfiguration(
            ElasticsearchIO.ConnectionConfiguration.create(new String [] {"host"}, "user-temp", "String").withUsername("elastic").withPassword("password")
            );
   p.apply(w);

p.run () waitUntilFini sh (). }}

and find below my dependnecies in pom.xml


<dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>2.19.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>2.19.0</version>

      <exclusions>
      <exclusion>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      </exclusion>

      <exclusion>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      </exclusion>

      <exclusion>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client-jackson2</artifactId>
      </exclusion>


      </exclusions>

    </dependency>

    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
    <version>2.19.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>2.19.0</version>
    </dependency>



    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>2.19.0</version>
      <exclusions>
      <exclusion>
      <groupId>io.grpc</groupId>
      <artifactId>grpc-core</artifactId>
      </exclusion>

      </exclusions>

    </dependency>

и теперь проблема заключается в ошибке компиляции:

Метод apply (PTransform) в типе Pipeline не применим для аргументов (ElasticsearchIO. Напишите) В этой строке: p.apply (w);

Кто-нибудь может помочь в этом PLZ? Я сделал некоторые исключения в файле pom, чтобы исправить некоторые конфликты зависимостей

1 Ответ

1 голос
/ 24 февраля 2020

Невозможно напрямую применить ElasticSearchIO.write к объекту конвейера. Сначала создайте PCollection, а затем примените ElasticsearchIO к PCollection. Пожалуйста, обратитесь к приведенному ниже коду.

PCollection<String> sqlResult1 = p.apply(
                JdbcIO.<String>read().withDataSourceConfiguration(config).withQuery("select * from test_table")
                        .withCoder(StringUtf8Coder.of()).withRowMapper(new JdbcIO.RowMapper<String>() {

                            private static final long serialVersionUID = 1L;

                            public String mapRow(ResultSet resultSet) throws Exception {
                                StringBuilder val = new StringBuilder();
                                return val.append(resultSet.getString(0)).append(resultSet.getString(1)).toString();
                                // return KV.of(resultSet.getString(1), resultSet.getString(2));
                            }
                        }));

        sqlResult1.apply(ElasticsearchIO.write().withConnectionConfiguration(ElasticsearchIO.ConnectionConfiguration
                .create(new String[] { "https://host:9243" }, "user-temp", "String").withUsername("").withPassword("")));

Я думаю, что приведенный выше фрагмент кода должен работать в вашем случае использования.

...