pipe
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from JdbcIO",
DynamicJdbcIO.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIO.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery(options.getQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));
// Execute the pipeline and return the result.
return pipeline.run();
}}
Я хочу создать конвейер в apache beam в Java, который выполняет итерацию по строке и конвертирует каждую строку любого типа данных в строку, используя функции ParDo, DoFn.