Как передать динамический оператор SQL в коннектор JDBCIO в Apache Beam? - PullRequest
0 голосов
/ 06 июня 2019

Apache beam предоставляет разъем JDBCIO для подключения к CloudSql postgreSQL.Моя работа читает событие из pub / sub.Тело события выглядит следующим образом:

tableName,
list<value>

Мне нужно записать в таблицу на основе имени таблицы, которое я получаю из моего сообщения.

JDBCIO подготовил оператор, который позволитмне параметризировать значения в моем запросе вставки.Но мне нужно генерировать запрос вставки динамически на основе информации, представленной в событии.

 pipeline
   .apply(PubsubIO.readStrings().fromSubscription())
   .apply(convertToKV())
   .apply(JdbcIO.<List<String>>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query)
          throws SQLException {
i=0
for each element in list
          query.setInt(i, element.get(i);
i++;

        }
      })
    );

Я должен иметь возможность динамически создавать оператор SQL на основе события ввода из коллекции pcollection.Мой оператор выбора должен генерироваться динамически на основе значения списка и имени таблицы.Пожалуйста, дайте мне знать, можем ли мы сделать это или нет.

Обновление: -

Я пытаюсь вручную вызвать драйвер jdbc внутри функции parDo, но получаю сообщение об ошибке ниже.Не найден подходящий драйвер для jdbcURL.

Пожалуйста, дайте мне знать, если я что-то пропустил:

@Setup
public void doAnyRequiredSetup() throws SQLException
{
    LoggingContextUtil.installContext(loggingContext);


    connection=DriverManager.getConnection(JdbcUrl,user,password);
    statement=connection.createStatement();

    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("In doAnyRequiredSetup logging Context is now set and JDBC connection is .");
    }



}

@SuppressWarnings("unchecked")
@ProcessElement
public void processElement(ProcessContext context)
{
    JsonNode element=context.element();

    try {
        String query=formatQuery(baseQuery);

        boolean result=statement.execute(query);

        if(LOGGER.isDebugEnabled()) {
            LOGGER.debug("Executed query : "+query+" and the result is "+ result);
        }

    } catch (IllegalArgumentException | SQLException e) {

        ErrorMessage em = new ErrorMessage(element.toString(), "Insert Query Failed", e.getMessage());

        context.output(ValidateTagHelper.FAILURE_TAG,em);
    }



}

Ответы [ 2 ]

0 голосов
/ 14 июня 2019

Вы можете попробовать прочитать сообщения pubsub с атрибутами и в атрибутах, вы можете передать имя таблицы и значения в виде пары ключ-значение.

PCollection<PubsubMessage> pubsubMessage = pipeline
      .apply(PubsubIO.readMessagesWithAttributes().fromSubscription("")
0 голосов
/ 07 июня 2019

У вас не может быть динамических запросов в JdbcIO на основе элементов ввода. ParDo необходимо сбросить, как вам нужно, вы можете переписать ParDo, в котором вы бы вызвали драйвер JDBC вручную.

Если вы найдете этот другой обходной путь, вы можете разделить входную PColleciton на несколько выходов. Это сработает, если ваш вариант использования ограничен каким-то предопределенным набором запросов, которые вы можете выбрать на основе входных данных. Таким образом, вы разделяете входные данные на несколько PCollections и затем присоединяете по-разному настроенные IO к каждому.

https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

...