Что означает перестановка в контексте обработки один раз в приемнике BigQuery? - PullRequest
0 голосов
/ 26 сентября 2018

Я читаю статью о точной однократной обработке, реализованной некоторыми источниками и приемниками потока данных, и у меня возникают проблемы с пониманием примера с приемником BigQuery.Из статьи

Генерация случайного UUID является недетерминированной операцией, поэтому мы должны добавить перестановку , прежде чем вставить ее в BigQuery .После этого при любых повторных попытках Cloud Dataflow всегда будет использоваться тот же UUID, который был перетасован.Повторные попытки вставки в BigQuery всегда будут иметь одинаковый идентификатор вставки, поэтому BigQuery может их фильтровать

// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

Что означает rehuffle и как он может предотвратить генерациюдругой UUID для той же вставки при последующих попытках?

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Перестановка группирует данные другим способом.Однако здесь он используется для побочных эффектов: контрольная точка и дедупликация.

Без перестановок, если та же задача генерирует UUID и вставляет данные в BigQuery, существует риск, что работник перезапустит и новый работник сгенерируетновый UUID и отправляет другую строку в BigQuery, что приводит к дублированию строк.

Операция перестановки разбивает генерацию UUID и вставку BigQuery на два шага, а также вставляет контрольные точки и дедупликацию между ними.

  1. Сначала генерируется UUID и отправляется на перестановку.Если работник создания UUID перезапущен, то все в порядке, так как при перестановке выполняется дедупликация строк, исключая данные из сбойных / перезапущенных работников.
  2. Сгенерированные UUID проверяются с помощью операции случайного перемешивания.
  3. Работник вставки BigQuery используетUUID с контрольными точками, поэтому, даже если он перезапущен - он отправляет точно такие же данные в BigQuery.
  4. BigQuery дедуплицирует данные, используя эти UUID, поэтому дубликаты из перезапущенного работника вставки удаляются в BigQuery.
0 голосов
/ 27 сентября 2018

Я думаю, что статья дает хорошее объяснение того, почему «перестановка» помогает перейти от «хотя бы один раз» к «ровно один раз»:

В частности, окно может попытаться сработать с элементом e0, e1, e2, но рабочий аварийно завершает работу перед тем, как завершить обработку окна (но не раньше, чем эти элементы отправляются как побочный эффект).Когда работник перезапустится, окно снова будет запущено, но теперь появляется поздний элемент e3.Поскольку этот элемент обнаруживается до фиксации окна, он не считается поздними данными, поэтому DoFn снова вызывается с элементами e0, e1, e2, e3.Затем они отправляются на операцию побочного эффекта.Идемпотентность здесь не помогает, так как каждый раз отправлялись разные наборы логических записей.

Существуют и другие способы введения недетерминизма.Стандартный способ преодоления этого риска заключается в том, чтобы полагаться на тот факт, что облачный поток данных в настоящее время гарантирует, что только одна версия вывода DoFn может преодолеть границу тасования .

Вы также можете проверить документы Reshuffle:

Там есть примечание об устаревании этого класса, поэтому будущие реализации BigQueryIO может отличаться.

...