Ни один из предоставленных шаблонов DataFlow не соответствует тому, что мне нужно сделать, поэтому я пытаюсь написать свой собственный.Мне удалось без проблем запустить пример кода, например пример подсчета слов, поэтому я попытался собрать воедино отдельные части примеров, которые читаются из BigQuery и пишут в Spanner, но в исходном коде так много вещей, которые я не понимаю и не могу приспособиться к ним.Моя собственная проблема.
Я ДЕЙСТВИТЕЛЬНО потерялся в этом, и любая помощь очень ценится!
Цель состоит в том, чтобы использовать DataFlow и Apache Beam SDK для чтения из таблицы BigQuery с 3 строковыми полями.и 1 целочисленное поле, затем объединить содержимое 3 строковых полей в одну строку и поместить эту новую строку в новое поле с именем «ключ», затем я хочу записать поле ключа и целочисленное поле (которое не изменилось) вТаблица гаечного ключа, которая уже существует, в идеале добавляет строки с новым ключом и обновляет целочисленное поле строк с ключом, который уже существует.
Я пытаюсь сделать это в Java, потому что нет соединителя ввода / выводадля Python.Любые советы по работе с Python приветствуются.
Пока я был бы очень рад, если бы я мог просто читать таблицу из BigQuery и записывать все, что получаю из этой таблицы, в таблицу в Spanner, но я могуэтого даже не произошло.
Проблемы:
- Я использую Maven и не знаю, какие зависимости мне нужно поместить в файл pom
- Я не знаю, какой пакет и импорт мне нужен в начале моего Java-файла
- Я не знаю, следует ли мне использовать readTableRows () или read (SerializableFunction) для чтенияот BigQuery
- Я понятия не имею, как получить доступ к строковым полям в PCollection, чтобы объединить их или как сделать новую PCollection только с полем ключа и целого числа
- Мне как-то нужно превратить PCollection в мутацию для записи в Spanner
- Я хочу использовать запрос INSERT UPDATE для записи в таблицу Spanner, которая, по-видимому, не подходит для Spanner.разъем ввода / вывода.
Честно говоря, я слишком смущен, чтобы даже показать тот код, который я пытаюсь запустить.
public class SimpleTransfer {
public static void main(String[] args) {
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
options.setProject("myproject");
options.setStagingLocation("gs://mybucket");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
String tableSpec = "database.mytable";
// read whole table from bigquery
rowsFromBigQuery =
p.apply(
BigQueryIO.readTableRows()
.from(tableSpec);
// Hopefully some day add a transform
// Somehow make a Mutation
PCollection<Mutation> mutation = rowsFromBigQuery;
// Only way I found to write to Spanner, not even sure if that works.
SpannerWriteResult result = mutation.apply(
SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());
p.run().waitUntilFinish();
}
}