Как запустить динамический второй запрос в облачном потоке данных Google? - PullRequest
0 голосов
/ 11 декабря 2018

Я пытаюсь выполнить операцию, в которой я получаю список идентификаторов с помощью запроса, преобразую их в строку, разделенную запятыми (то есть «1,2,3»), а затем использую ее во вторичном запросе.При попытке выполнить второй запрос мне выдается синтаксическая ошибка:

«Целевым типом лямбда-преобразования должен быть интерфейс»

String query = "SELECT DISTINCT campaignId FROM `" + options.getEligibilityInputTable() + "` ";

    Pipeline p = Pipeline.create(options);
    p.apply("GetCampaignIds", BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery(query).usingStandardSql())
      .apply("TransformCampaignIds",
        MapElements.into(TypeDescriptors.strings())
        .via((TableRow row) -> (String)row.get("campaignId")))
      .apply(Combine.globally(new StringToCsvCombineFn()))
      .apply("GetAllCampaigns", campaignIds -> BigQueryIO.readTableRows().withTemplateCompatibility().fromQuery("SELECT id AS campaignId, dataQuery FROM `{projectid}.mysql_standard.campaigns` WHERE campaignId IN (" + campaignIds + ")").usingStandardSql())
....

Как объединить запросы в цепочку?

1 Ответ

0 голосов
/ 11 декабря 2018

К сожалению, вы не можете сделать это с существующими источниками.Здесь возможны два варианта:

  • Вы вручную вызываете API BQ из ParDo.
  • Вы пишете сложный SQL-запрос, который делает это за вас.

Второй вариант выглядит примерно так:

String query = "SELECT id AS campaignId, dataQuery \
               FROM `{projectid}.mysql_standard.campaigns` \
               WHERE campaignId IN ( \
                   SELECT DISTINCT campaignId \
                   FROM `" + options.getEligibilityInputTable() 
                   + "`)";

Pipeline p = Pipeline.create(options);
p.apply("GetAllCampaigns", BigQueryIO.readTableRows()
                                     .withTemplateCompatibility()
                                     .fromQuery(query)
                                     .usingStandardSql());
...