У меня есть задание потока данных, которое извлекает данные из облака SQL и загружает их в облачное хранилище. Мы настроили задание на прием параметров, чтобы мы могли использовать один и тот же код для извлечения нескольких таблиц. Задание потока данных компилируется как шаблон.
Когда мы создаем / запускаем экземпляры шаблона в последовательном режиме, мы получаем ожидаемые результаты. Однако если мы создаем / запускаем экземпляры параллельно, в облачном хранилище появляется только несколько файлов. В обоих случаях мы видим, что задания DF создаются и успешно завершаются.
Например, у нас есть 11 экземпляров, которые создают 11 выходных файлов. В последовательном порядке мы получаем все 11 файлов, параллельно мы получаем только около 3 файлов. Во время параллельного запуска все 11 экземпляров работали одновременно
Кто-нибудь может дать какой-нибудь совет относительно того, почему это происходит? Я предполагаю, что временные файлы, созданные шаблоном DF, как-то перезаписываются во время параллельного запуска?
Основная мотивация параллельного запуска - более быстрое извлечение данных.
Редактировать
Конвейер довольно прост:
PCollection<String> results = p
.apply("Read from Cloud SQL", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(dsDriver, dsConnection)
.withUsername(options.getCloudSqlUsername())
.withPassword(options.getCloudSqlPassword())
)
.withQuery(options.getCloudSqlExtractSql())
.withRowMapper(new JdbcIO.RowMapper<String>() {
@Override
public String mapRow(ResultSet resultSet) throws Exception {
return mapRowToJson(resultSet);
}
})
.withCoder(StringUtf8Coder.of()));
Когда я компилирую шаблон, я
mvn compile exec:java \
-Dexec.mainClass=com.xxxx.batch_ingestion.LoadCloudSql \
-Dexec.args="--project=myproject \
--region=europe-west1 \
--stagingLocation=gs://bucket/dataflow/staging/ \
--cloudStorageLocation=gs://bucket/data/ \
--cloudSqlInstanceId=yyyy \
--cloudSqlSchema=dev \
--runner=DataflowRunner \
--templateLocation=gs://bucket/dataflow/template/BatchIngestion"
Когда я вызываю шаблон, я также предоставляю «tempLocation». Я вижу, как используются динамические c временные местоположения. Несмотря на это, я не вижу все выходные файлы при параллельной работе.
Спасибо