Apache Beam: как решить «ParDo требует детерминированного c ключевого кодировщика для использования состояния и таймеров» при использовании функции дедупликации - PullRequest
0 голосов
/ 02 августа 2020

Я пытаюсь дедуплицировать входящие сообщения из Google Cloud Pubsub, используя функцию дедупликации Apache луча. Однако я столкнулся с ошибкой после создания пары KV<String, MyModel> и передачи ее в преобразование Deduplicate.

Ошибка:

ParDo requires a deterministic key coder in order to use state and timers

Код:

PCollection<KV<String, MyModel>> deduplicatedEvents =
    messages
        .apply(
            "CreateKVPairs",
            ParDo.of(
                new DoFn<MyModel, KV<String, MyModel>>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(KV.of(c.element().getUniqueKey(),c.element()));
                  }
                }))
        .apply(
            "Deduplicate",
            Deduplicate.<KV<String, MyModel>>values());

Как мне создать детерминированный c кодер, который может кодировать / декодировать строку в качестве ключа, чтобы это работало?

Любой ввод будет действительно полезен.

1 Ответ

1 голос
/ 11 августа 2020

Преобразование Deduplicate работает, помещая весь элемент в ключ, а затем выполняет операцию группирования ключей (в данном случае ParDo с отслеживанием состояния). Поскольку Beam не зависит от языка, группировка по ключу выполняется с использованием закодированной формы элементов. Два элемента, которые кодируют одни и те же байты, «равны», а два элемента, которые кодируют разные байты, являются «неравными». язык (например, Java) относится к равенству лучей. Это означает, что если два объекта Java равны согласно Java equals(), то они должны иметь одинаковые закодированные байты. Для простых данных, таких как строки, числа, массивы, это легко. Полезно подумать о том, что делает кодер не -детерминированным c. Например, при кодировании двух экземпляров Map они могут быть equals() на уровне Java, но пары ключ-значение кодируются в другом порядке, что делает их неравными для Beam.

Если у вас есть недетерминированный c кодер для MyModel, тогда Deduplicate не будет работать правильно, и вы получите дубликаты, потому что Beam считает, что объекты с разным кодированием не равны.

Вероятно, самый простой способ автоматически получить высококачественный детерминированный c кодер должен использовать вывод схемы Beam: https://beam.apache.org/documentation/programming-guide/#schemas -for-pl-types . Вам нужно будет убедиться, что все поля также могут быть закодированы детерминированно.

...