как подключиться к облаку SQL из Google DataFlow - PullRequest
1 голос
/ 13 февраля 2020

Я пытаюсь создать задачу конвейера, используя луч java SDK и Google Dataflow для перемещения данных из облака SQL в Elasti c поиск

Я создал следующий метод main класса :

 public static void main(String[] args) throws Exception{
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setProject("staging");      
    options.setTempLocation("gs://csv_to_sql_staging/temp"); 
    options.setRunner(DataflowRunner.class);        
    options.setGcpTempLocation("gs://csv_to_sql_staging/temp");         options.setUsePublicIps(false);         
    options.setJobName("tamer-new");        '
    options.setSubnetwork("regions/us-central1/subnetworks/new-network");
    final List<String> SCOPES = Arrays.asList(
      "https://www.googleapis.com/auth/cloud-platform",
      "https://www.googleapis.com/auth/devstorage.full_control",
      "https://www.googleapis.com/auth/userinfo.email",
      "https://www.googleapis.com/auth/datastore",
      "https://www.googleapis.com/auth/sqlservice.admin",
      "https://www.googleapis.com/auth/pubsub");
    options.setGcpCredential(ServiceAccountCredentials.fromStream(new ElasticSearchIO().getClass().getResourceAsStream("/staging-b648da5d2b9b.json")).createScoped(SCOPES));            options.setServiceAccount("data-flow@staging.iam.gserviceaccount.com");

    Pipeline p = Pipeline.create(options);

    p.begin();

    PCollection < List < String >> rows = p.apply(JdbcIO. < List < String >> read().withQuery("select u.id, u.name from user_table").withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://google/nameDB_new?cloudSqlInstance=staging:europe-west1:sql-staging-instance&socketFactory=com.google.cloud.sql.mysql.SocketFactory&useUnicode=true&characterEncoding=UTF-8&user=user&password=password&useSSL=false")).withRowMapper(new RowMapper < List < String >> () {

        @Override public List < String > mapRow(ResultSet resultSet) throws Exception {
            List < String > addRow = new ArrayList < String > ();


            for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                addRow.add(i - 1, String.valueOf(resultSet.getObject(i)));
            }

            //LOG.info(String.join(",", addRow));

            return addRow;

        }
    })

    .withCoder(ListCoder.of(StringUtf8Coder. < Object > of ()))

    );

    Write w = ElasticsearchIO.write().withConnectionConfiguration(
        ElasticsearchIO.ConnectionConfiguration.create(new String[] {
            "https://host:9243"
        }, "user-temp", "String").withUsername("elastic").withPassword("password")
    );

    rows.apply(w.compose(new SerializableFunction() {

       @Override public Object apply(Object input) {
         // TODO Auto-generated method stub
         return input;
        }
    }));


    p.run().waitUntilFinish();

}

и ниже - pom. xml file:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.harmonica.dataflow</groupId>
  <artifactId>com-harmonica-dataflow</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
    <beam.version>2.19.0</beam.version>
  </properties>

  <repositories>
    <repository>
      <id>ossrh.snapshots</id>
      <name>Sonatype OSS Repository Hosting</name>
      <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${exec-maven-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>
<!--  Beam Lib -->
<dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>        
    </dependency>

    <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
    <version>${beam.version}</version>  
    </dependency>

    <dependency>
        <groupId>org.apache.beam</groupId>
        <artifactId>beam-sdks-java-io-jdbc</artifactId>
        <version>${beam.version}</version>
    </dependency>



    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>

    </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.19</version>           
        </dependency>

<dependency>
    <groupId>com.google.cloud.sql</groupId>
    <artifactId>mysql-socket-factory-connector-j-8</artifactId>
    <version>1.0.15</version>
</dependency>


    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
  </dependencies>
</project>

, и когда я выполняю эту команду:

man exec mvn compile exec:java -Dexec.mainClass=com.dataflow.ElasticSearchIO 

Рабочий успешно запущен но потом не могу подключиться к облаку SQL: даже подумал, что я выполнил текущие действия:

  • Я создал служебную учетную запись с доступом владельца к проекту и передал ее опциям бегуна
  • Я создал сеть VP C с именем new-network с диапазоном IP-адресов 190.10.0.0/16 и назначил его для параметров конвейера, а затем внес в белый список этот диапазон в облаке SQL

и тем не менее я все еще получаю эту ошибку:

Сообщение об ошибке от работника: java .lang.RuntimeException: org. apache .beam.sdk .util.UserCodeException: java. sql .SQLException: Невозможно создать PoolableConnectionFacto ry (Ошибка канала связи. Последний пакет, успешно отправленный на сервер, был 0 миллисекунд в go. Драйвер не получил никаких пакетов с сервера.) Org. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory $ 1.typedApply (IntrinsicMapTaskExecutorFactory. java: 194) org. apache .bedflow.runners .worker.IntrinsicMapTaskExecutorFactory $ 1.typedApply (IntrinsicMapTaskExecutorFactory. java: 165) org. apache .beam.runners.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply (63: 10) или 63 (10). * .beam.runners.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply (Networks. java: 50) org. apache .beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes (Сети. java: 87) org. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create (IntrinsicMapTaskExecutorFactory. java: 125) org. apache .beam.runners.dataflow.worker.Berd java: 352) org. apache .beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork (BatchDataflowWorker. java: 305) org. apache .beam.runners.dataflow.worker.Dataf lowBatchWorkerHarness $ WorkerThread. runners.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.call (DataflowBatchWorkerHarness. java: 107) java .util.concurrent.FutureTask.run (FutureTask. java: 266) java .util.concurrent. runWorker (ThreadPoolExecutor. java: 1149) java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) java .lang.Thread.run (поток. java: 748) Вызывается: org. apache .beam.sdk.util.UserCodeException: java. sql .SQLException: Невозможно создать PoolableConnectionFactory (сбой канала связи Последний пакет, успешно отправленный на сервер, был 0 миллисекунд в go. Драйвер не получил никаких пакетов с сервера.) Org. apache .beam.sdk.util.UserCodeException.wrap (UserCodeException. java: 34) org. apache .beam.sdk.io.jdb c .JdbcIO $ ReadFn $ DoFnInvoker.invokeSetup (Неизвестный источник) org. apache .beam.runners.dataflow.worker.DoFnInstanceManagers $ ConcurrentQueueInstanceManager.deserializeCopy (DoFnInstanceManager: 10 * 78. 1079). runners.dataflow.worker. * .beam.runners.dataflow.worker.DefaultParDoFnFactory.create (DefaultParDoFnFactory. java: 75) org. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access $ 000 (IntrinsicMapTaskExecutorFactory. java: 86) org. apache .beam. runners.dataflow.worker.IntrinsicMapTaskExecutorFactory $ 1.typedApply (IntrinsicMapTaskExecutorFactory. java: 183) ... еще 14 Причина: java. sql .SQLException: не удается создать последний успешно отправленный пакет PoolableConnectionFactory (ошибка связи) сервер был 0 миллисекунд go. Драйвер не получил пакетов от сервера.) org. apache .commons.dbcp2.BasicDataSource.createPoolableConnectionFactory (BasicDataSource. java: 735) org. apache. commons.dbcp2.BasicDataSource.createDataSource (BasicDataSource. java: 605) org. apache .commons.dbcp2.BasicDataSource.getConnection (BasicDataSource. java: 809) org. apache .beam.sdkio. jdb c .JdbcIO $ ReadFn.setup (JdbcIO. java: 881) Причина: com. mysql .cj.jdb c .exceptions.CommunicationsException: сбой канала связи Последний пакет успешно отправлен на сервер было 0 миллисекунд go. Драйвер не получил никаких пакетов с сервера. com. mysql .cj.jdb c .exceptions.SQLError.createCommunicationsException (SQLError. java: 174) com. mysql .cj.jdb c .exceptions.SQLExceptionsMapping.translateException (SQLExceptions 11 *: 64) com. mysql .cj.jdb c .ConnectionImpl.createNewIO (ConnectionImpl. java: 836) com. mysql .cj.jdb c .ConnectionImpl. (ConnectionImpl. java : 456) com. mysql .cj.jdb c .ConnectionImpl.getInstance (ConnectionImpl. java: 246) com. mysql .cj.jdb c .NonRegisteringDriver.connect (NonRegisteringDriver. java : 197) org. apache .commons.dbcp2.DriverConnectionFactory.createConnection (DriverConnectionFactory. java: 53) org. apache .commons.dbcp2.PoolableConnectionFactory.makeObject (PoolableConnectionFactory. * 11) .27 или 27 1128 * .commons.dbcp2.BasicDataSource.validateConnectionFactory (BasicDataSource. java: 116) org. apache .commons.dbcp2.BasicDataSource.createPoolableConnectionFactory (BasicDataSource. java: 11b). .BasicDataSource.createDataSource (BasicDataSource. java: 605) org. * 1 134 * .commons.dbcp2.BasicDataSource.getConnection (BasicDataSource. java: 809) org. apache .beam.sdk.io.jdb c .JdbcIO $ ReadFn.setup (JdbcIO. java: 881) org. apache .beam.sdk.io.jdb c .JdbcIO $ ReadFn $ DoFnInvoker.invokeSetup (Неизвестный источник) org. apache .beam.runners.dataflow.worker.DoFnInstanceManagers $ ConcurrentQueueInstanceFananceManager. java: 80) org. apache .beam.runners.dataflow.worker.DoFnInstanceManagers $ ConcurrentQueueInstanceManager.peek (DoFnInstanceManagers. java: 62) org. apache .beam.runners.dataflow.worknactF. create (UserParDoFnFactory. java: 95) org. apache .beam.runners.dataflow.worker.DefaultParDoFnFactory.create (DefaultParDoFnFactory. java: 75) org. apache .beam.runners.dataflow.worker. IntrinsicMapTaskExecutorFactory.createParDoOperation (IntrinsicMapTaskExecutorFactory. java: 264) org. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.actcess .W orker.IntrinsicMapTaskExecutorFactory $ 1.typedApply (IntrinsicMapTaskExecutorFactory. java: 183) org. apache .beam.runners.dataflow.worker. runners.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply (Networks. java: 63) org. apache .beam.runners.dataflow.worker.graph.Networks $ TypeSafeNodeFunction.apply (Networks. java: 50) org. apache .beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes (Networks. java: 87) org. apache .beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create (Intrins. Intr. java: 125) org. apache .beam.runners.dataflow.worker.BatchDataflowWorker.doWork (BatchDataflowWorker. java: 352) org. apache .beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork ( BatchDataflowWorker. java: 305) org. apache .beam.runners.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread. doWork (DataflowBatchWorkerHarness. java: 140) org. apache .beam.runners.dataflow.worker.DataflowBatchWorkerHarness $ WorkerThread.call (DataflowBatchWorkerHarness. java: 120) org. apache .beam.runners. worker.DataflowBatchWorkerHarness $ WorkerThread.call (DataflowBatchWorkerHarness. java: 107) java .util.concurrent.FutureTask.run (FutureTask. java: 266) java .util.concurrent.ThreadPoolExecutor java: 1149) java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) java .lang.Thread.run (Тема. java: 748) Причина: com . mysql .cj.exceptions.CJCommunicationsException: сбой линии связи Последний пакет, успешно отправленный на сервер, был 0 миллисекунд go. Драйвер не получил никаких пакетов с сервера. sun.reflect.NativeConstructorAccessorImpl.newInstance0 (родной метод) sun.reflect.NativeConstructorAccessorImpl.newInstance (NativeConstructorAccessorImpl. java: 62) sun.reflect.DelegatingConstructorAccessor *ImplImp_Impl * .Constructor.newInstance (Constructor. java: 423) com. mysql .cj.exceptions.ExceptionFactory.createException (ExceptionFactory. java: 61) com. mysql .cj.exceptions.ExceptionFactory.createException (ExceptionFactory . java: 105) com. mysql .cj.exceptions.ExceptionFactory.createException (ExceptionFactory. java: 151) com. mysql .cj.exceptions.ExceptionFactory.createCommunicationsException (ExceptionFactory. java: 167 ) com. mysql .cj.protocol.a.NativeSocketConnection.connect (NativeSocketConnection. java: 91) com. mysql .cj.NativeSession.connect (NativeSession. java: 144) com. mysql .cj.jdb c .ConnectionImpl.connectOneTryOnly (ConnectionImpl. java: 956) com. mysql .cj.jdb c .ConnectionImpl.createNewIO (ConnectionImpl. java: 826) ... еще 32 Причины: java. net .ConnectException: Время ожидания соединения истекло (Время ожидания соединения) java. net .PlainSocketImpl.socketConnect (собственный метод) java. net .AbstractPlainSocketImpl.doConnect (AbstractPlainSocketImpl. java: 350) java. net .AbstractPlainSocketImpl.connectToAddress (AbstractPlainSocketImpl. java: 206) java. java: 188) java. net .SocksSocketImpl.connect (SocksSocketImpl. java: 392) java. net .Socket.connect (Socket. java: 589) sun.security. ssl.SSLSocketImpl.connect (SSLSocketImpl. java: 673) sun.security.ssl.BaseSSLSocketImpl.connect (BaseSSLSocketImpl. java: 173) com.google.cloud. sql .core.CoreSocketFactory.oreS .SoF. java: 233) com.google.cloud. sql .core.CoreSocketFactory.connect (CoreSocketFactory. java: 185) com.google.cloud. sql. mysql .SocketFactory.connect (SocketFactory. java: 48) com.google.cloud. sql. mysql .SocketFactory.connect (SocketFactory . java: 38) com. mysql .cj.protocol.a.NativeSocketConnection.connect (NativeSocketConnection. java: 65) ... еще 35

Плз любая помощь будет высоко ценится! Заранее спасибо

Ответы [ 2 ]

1 голос
/ 24 февраля 2020

Вы можете использовать приведенный ниже фрагмент кода для установления sh соединения:

Pipeline p = Pipeline.create (options);

    //Increase pool size based on your records

    ComboPooledDataSource dataSource = new ComboPooledDataSource();

    dataSource.setDriverClass("com.mysql.jdbc.Driver");
    dataSource.setJdbcUrl(
            "jdbc:mysql://google/test?cloudSqlInstance=dataflowtest-:us-central1:sql-test&socketFactory=com.google.cloud.sql.mysql.SocketFactory");
    dataSource.setUser("root");
    dataSource.setPassword("root");
    dataSource.setMaxPoolSize(10);
    dataSource.setInitialPoolSize(6);

    JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource);

    // ADD rewriteBatchedStatements=true to improve write speed"

    PCollection<KV<String, String>> sqlResult = p.apply(JdbcIO.<KV<String, String>>read()
            .withDataSourceConfiguration(config)
            .withQuery("select * from test_table").withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
            .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {

                private static final long serialVersionUID = 1L;

                public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
                    return KV.of(resultSet.getString(1), resultSet.getString(2));
                }
            }));

Добавить указанную ниже зависимость в pom. xml

    <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-jdbc</artifactId>
            <version>2.17.0</version>
        </dependency>
<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.sql</groupId>
            <artifactId>mysql-socket-factory</artifactId>
            <version>1.0.0</version>
        </dependency>

Это должно работать ..

0 голосов
/ 13 февраля 2020

Если возможно, попробуйте следующий код для соединения sql:

    connection = connectToCloudSql(map.get(LiteralConstant.URL.toString()),
            map.get(LiteralConstant.USERNAME.toString()), map.get(LiteralConstant.PASSWORD.toString()));

Затем используйте следующий фрагмент кода для получения результата от соединения sql:

statement = connection.prepareCall("query"); 
statement.execute();
resultSet = statement.getResultSet();
ResultSetMetaData rsmd = resultSet.getMetaData();

int count = rsmd.getColumnCount();
if(!resultSet.next() || count < 1)
    throw new ConnectionFailureException("Failed to connect to Cloud SQL");

for (int k = 1; k <= count; k++) {
    row.set(rsmd.getColumnName(k), resultSet.getString(k));
}

Получить приведенный выше результат в PCollection Note: Не забудьте включить Cloud sql api и Cloud sql admin api.

Зависимость Maven:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.25</version>
</dependency>
<dependency>
    <groupId>com.google.cloud.sql</groupId>
    <artifactId>mysql-socket-factory</artifactId> <!-- mysql-socket-factory-connector-j-6 if using 6.x.x -->
    <version>1.0.0</version>
</dependency>

Этот фрагмент кода работал в моем случае. Дайте мне знать, если это решение подойдет вам.

...