Параллельный запуск одного и того же шаблона DF дает странные результаты - PullRequest
1 голос
/ 18 марта 2020

У меня есть задание потока данных, которое извлекает данные из облака SQL и загружает их в облачное хранилище. Мы настроили задание на прием параметров, чтобы мы могли использовать один и тот же код для извлечения нескольких таблиц. Задание потока данных компилируется как шаблон.

Когда мы создаем / запускаем экземпляры шаблона в последовательном режиме, мы получаем ожидаемые результаты. Однако если мы создаем / запускаем экземпляры параллельно, в облачном хранилище появляется только несколько файлов. В обоих случаях мы видим, что задания DF создаются и успешно завершаются.

Например, у нас есть 11 экземпляров, которые создают 11 выходных файлов. В последовательном порядке мы получаем все 11 файлов, параллельно мы получаем только около 3 файлов. Во время параллельного запуска все 11 экземпляров работали одновременно

Кто-нибудь может дать какой-нибудь совет относительно того, почему это происходит? Я предполагаю, что временные файлы, созданные шаблоном DF, как-то перезаписываются во время параллельного запуска?

Основная мотивация параллельного запуска - более быстрое извлечение данных.

Редактировать

Конвейер довольно прост:

        PCollection<String> results =  p
            .apply("Read from Cloud SQL", JdbcIO.<String>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                        .create(dsDriver, dsConnection)
                        .withUsername(options.getCloudSqlUsername())
                        .withPassword(options.getCloudSqlPassword())
                )
                .withQuery(options.getCloudSqlExtractSql())
                .withRowMapper(new JdbcIO.RowMapper<String>() {
                    @Override
                    public String mapRow(ResultSet resultSet) throws Exception {
                        return mapRowToJson(resultSet);
                    }
                })
                .withCoder(StringUtf8Coder.of()));

Когда я компилирую шаблон, я

mvn compile exec:java \
 -Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
 -Dexec.args="--project=myproject \
    --region=europe-west1 \
    --stagingLocation=gs://bucket/dataflow/staging/ \
    --cloudStorageLocation=gs://bucket/data/ \
    --cloudSqlInstanceId=yyyy \
    --cloudSqlSchema=dev \
    --runner=DataflowRunner \
    --templateLocation=gs://bucket/dataflow/template/BatchIngestion"

Когда я вызываю шаблон, я также предоставляю «tempLocation». Я вижу, как используются динамические c временные местоположения. Несмотря на это, я не вижу все выходные файлы при параллельной работе.

Спасибо

1 Ответ

2 голосов
/ 23 марта 2020

Решение

  1. Добавить уникальный tempLocation
  2. Добавить уникальный путь вывода и имя файла
  3. Переместить выходные файлы в конечный пункт назначения на CS после DF завершает свою обработку
...