Облачный поток данных GlobalWindow триггер игнорируется - PullRequest
0 голосов
/ 09 января 2020

Использование триггера AfterPane.elementCountAtLeast не работает при запуске с использованием обработчика потока данных, но работает правильно при локальном запуске. При запуске в потоке данных он создает только одну панель.

Целью является извлечение данных из облака SQL, преобразование и запись в облачное хранилище. Однако в памяти слишком много данных, поэтому их нужно разделить и записать в облачное хранилище по частям. Это то, на что я надеялся.

Полный код:

      val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))
        // produce one global window with one pane per ~500 records
        .withGlobalWindow(WindowOptions(
          trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(500)),
          accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
        ))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
        .withNumShards(1)
        .withShardNameTemplate("-P-S")
        .withWindowedWrites() // gets us one file per window & pane
      pipe.saveAsCustomOutput("writer",out)

Я думаю, root проблемы может заключаться в том, что класс JdbcIO реализован как PTransform<PBegin,PCollection> и один вызов processElement выводит весь SQL результат запроса:

    public void processElement(ProcessContext context) throws Exception {
      try (PreparedStatement statement =
          connection.prepareStatement(
              query.get(), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
        statement.setFetchSize(fetchSize);
        parameterSetter.setParameters(context.element(), statement);
        try (ResultSet resultSet = statement.executeQuery()) {
          while (resultSet.next()) {
            context.output(rowMapper.mapRow(resultSet));
          }
        }
      }
    }

Ответы [ 2 ]

1 голос
/ 15 января 2020

В конце концов мне пришлось решить две проблемы: 1. Процессу не хватило бы памяти, и 2. Данные были записаны в один файл.

Нет способа обойти проблему. 1 с Beam JdbcIO и Cloud SQL из-за способа использования драйвера MySQL. Сам драйвер загружает весь результат за один вызов в executeStatement. Есть способ заставить драйвер передавать результаты, но для этого мне пришлось реализовать собственный код. В частности, я реализовал BoundedSource для JDB C.

Для второй проблемы я использовал номер строки, чтобы установить метку времени каждого элемента. Это позволяет мне явно контролировать, сколько строк в каждом окне, используя FixedWindows.

0 голосов
/ 09 января 2020

elementCountAtLeast является нижней границей, поэтому для бегуна допустимо использовать только одну панель.

При выполнении этого для пакетного конвейера у вас есть несколько вариантов:

  1. Позвольте бегуну решать, какой размер файла и сколько осколков записано:
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
      pipe.saveAsCustomOutput("writer",out)

Обычно это самый быстрый вариант, когда TextIO имеет GroupByKey или источник, который поддерживает расщепление, которое предшествует ему. Насколько мне известно, JDB C не поддерживает разбиение, поэтому лучше всего добавить Reshuffle после jdbcSelect , что позволит распараллеливать обработку после чтения данных из базы данных.

Вручную сгруппировать в пакеты, используя преобразование GroupIntoBatches .
val pipe = sc.jdbcSelect(getReadOptions(connOptions, stmt))
        .applyTransform(ParDo.of(new Translator()))
        .map(row => row.mkString("|"))
        .apply(GroupIntoBatches.ofSize(500))

      val out = TextIO
        .write()
        .to("gs://test-bucket/staging")
        .withSuffix(".txt")
        .withNumShards(1)
      pipe.saveAsCustomOutput("writer",out)

В общем, это будет медленнее, чем опция # 1, но она позволит вам выбрать, сколько записи записываются для каждого файла.

Есть несколько других способов сделать это со своими плюсами и минусами, но вышеупомянутые два, вероятно, наиболее близки к тому, что вы хотите. Если вы добавите более подробную информацию к своему вопросу, я могу еще раз пересмотреть этот вопрос.

...