PubSub to Spanner Streaming Pipeline - PullRequest
       26

PubSub to Spanner Streaming Pipeline

0 голосов
/ 08 октября 2019

Я пытаюсь передать сообщение PubSub типа JSON в базу данных ключей, и insert_update работает очень хорошо. Таблица ключей имеет составной первичный ключ, поэтому необходимо удалить существующие данные перед вставкой новых данных из PubSub (поэтому присутствуют только самые последние данные). В этом случае мутации замены или вставки / обновления гаечного ключа не работают. Я добавил конвейер


import org.apache.beam.* ;

public class PubSubToSpannerPipeline {

  // JSON to TableData Object
  public static class PubSubToTableDataFn extends DoFn<String, TableData> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      .
      .
      .
    }
  }

  public interface PubSubToSpannerOptions extends PipelineOptions, StreamingOptions {
    .
    .
    .
  }

  public static void main(String[] args) {
    PubSubToSpannerOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(PubSubToSpannerOptions.class);
    options.setStreaming(true);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
        .withProjectId(options.getProjectId())
        .withInstanceId(options.getInstanceId())
        .withDatabaseId(options.getDatabaseId());

    Pipeline pipeLine = Pipeline.create(options);

    PCollection<TableData> tableDataMsgs = pipeLine.apply(PubsubIO.readStrings()
        .fromSubscription(options.getInputSubscription()))
        .apply("ParsePubSubMessage", ParDo.of(new PubSubToTableDataFn ()));

    // Window function
    PCollection<TableData> tableDataJson = tableDataMsgs
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<MutationGroup> upsertMutationGroup = tableDataJson.apply("TableDataMutation",
        MapElements.via(new SimpleFunction<TableData, MutationGroup>() {

          public MutationGroup apply(TableData input) {

            String object_id = input.objectId;

            pipeLine.apply("ReadExistingData", SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withQuery("SELECT object_id, mapped_object_id, mapped_object_name from TableName where object_id ='" + object_id + "'")
            .apply("MutationForExistingTableData", 
                    ParDo.of(new DoFn<Struct, Mutation>(){
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Struct str = c.element();
                        c.output(Mutation.delete("TableName", KeySet.newBuilder()
                            .addKey(Key.newBuilder()
                                .append(str.getString("object_id"))
                                .append(str.getString("mapped_object_id"))
                                .append(str.getString("mapped_object_name")).build()).build()));
                      }
                    } ))
            .apply("DeleteExistingTableData", SpannerIO.write().withSpannerConfig(spannerConfig));

              Mutation dataMutation = Mutation.newReplaceBuilder("TableName",
                  .
                  .
                  .

                  );
              List<Mutation> list = new ArrayList<Mutation>();


              List<Map<String, String>> mappingList = input.listOfObjectRows;

              for (Map<String, String> objectMap : mappingList ) {
                list.add(Mutation.newReplaceBuilder("TableName",
                    .
                    .
                    .);
              }     

              return MutationGroup.create(dataMutation, list);


          }
        } )));


        upsertMutationGroup.apply("WriteDataToSpanner", SpannerIO.write()
            .withSpannerConfig(spannerConfig)
            .grouped());

        // Run the pipeline.
        pipeLine.run().waitUntilFinish();
  }

}

class TableData implements Serializable {
  String objectId;
  List<Map<String, String>> listOfObjectRows;

}

Ожидается, что существующие данные сопоставления должны быть удалены из таблицы перед вставкой или обновлением данных.

1 Ответ

0 голосов
/ 08 октября 2019

Я не совсем уверен, что вы делаете, но похоже, что вы хотите:

  • Считать некоторые существующие данные с ключом (или частичным ключом), соответствующим сообщению pubsub
  • Удалить эти данные
  • Вставить новые данные из сообщения pubsub

Один из вариантов - создать DoFn, который выполняет чтение / удаление / вставку (или чтение /обновление) в транзакции чтения-записи. Это сохранит согласованность БД ...

Используйте SpannerIO.WriteFn в качестве модели - вам нужно установить SpannerAccessor как переходный процесс и создать / удалить его в @Setupи @Teardown обработчики событий

Обработчик @ProcessElement вашего DoFn создаст транзакцию чтения-записи , внутри которой вы будете читать строки для ключа, обновлять или удалятьих, а затем вставляет новый элемент (ы).

Недостатком этого метода является то, что для каждой транзакции Spanner будет обрабатываться только одно сообщение Pub / Sub (если на предыдущем шаге вы не сделаете что-то умное, например, группируете их), и это сложная транзакция чтения-записи. ,Если ваша скорость передачи сообщений / сек относительно низкая, это было бы хорошо, но если нет, то этот метод будет гораздо больше загружать вашу БД.

Второй вариант - использовать слепое удаление диапазона ключей. ,Это может работать только в том случае, если object_id является первой частью составного ключа (который, похоже, из вашего кода).

Вы должны создать MutationGroup, содержащую мутацию удаления, которая удаляет вслепую любые существующие строки, ключи которых начинаются с object_id, используя мутацию Delete с диапазоном клавиш, а затем вставляются мутации для замены удаленных строк.

MutationGroup.create(
    // Delete rows with key starting with object_id.
    Mutation.delete("TableName", KeySet.newBuilder()
        .addRange(
            KeyRange.closedClosed(
                Key.of(str.getString("object_id")),
                Key.of(str.getString("object_id"))))
        .build()),
    // Insert replacement rows.
    Mutation.newInsertBuilder("TableName")
        .set("column").to("value"),
        ...
        .build(),
    Mutation.newInsertBuilder("TableName")
        ...);

Затем он будет передан SpannerIO.write (). Grouped (), как и раньше, чтобы их можно было объединить для повышения эффективности.

...