Пользовательский ключ для группировки в потоке данных - PullRequest
1 голос
/ 29 марта 2019

Я хочу использовать группировку с помощью пользовательского ключа, но пока моя попытка

Мы использовали пользовательские классы для ключей объектов KV, потому что мы хотим группировать с более сложным условием, а не простым сопоставлением ключей с использованием String и т. Д.

```

    PCollection<KV<Multikey, Iterable<SomeObject>> pc2 = 
    pc.apply(GroupByKey.<Multikey, SomeObject>create());

```

Выразите условие соответствия методом equals.

```
    class Multikey implements Serializable{

        List<String> keys = new ArrayList<>();  //multiple key

        ......
        @Override
        public boolean equals(Object k){
            ...join conditions
        }
    }

```

Но я получаю ошибку.

java.lang.IllegalStateException: the keyCoder of a GroupByKey must be deterministic
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:193)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:107)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:357)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.coders.Coder$NonDeterministicException: org.apache.beam.sdk.coders.SerializableCoder@18b411b5 is not deterministic because:
    Java Serialization may be non-deterministic.
    at org.apache.beam.sdk.coders.SerializableCoder.verifyDeterministic(SerializableCoder.java:205)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:191)

Кажется, что порядок, в котором «ключи» сериализуются, неверен, поэтому я реализовал собственный сериализатор или пробовал различные кодеры, но не работал.

Ответы [ 2 ]

1 голос
/ 29 марта 2019

Обратите внимание на документацию GroupByKey :

Два ключа типа K сравниваются на равенство не обычными Java Object.equals (java.lang.Object), а вместо этого, сначала кодируя каждый из ключей с использованием Coder ключей входной PCollection, а затем сравнивая закодированные байты. Это допускает эффективную параллельную оценку. Обратите внимание, что для этого необходимо, чтобы кодер ключей был детерминированным (см. Coder.verifyDeterministic ()). Если ключевой кодер не является детерминированным, возникает исключение во время строительства трубопровода.

Один из возможных подходов: использовать пардо, которое выводит KV, выводить значение с уникальным ключом в виде строки на основе сложной условной логики, которую вы упомянули.

Другой подход заключается в использовании пользовательского типа объекта для вашего ключа вместо строки, как вы пытались до сих пор. Вам нужно будет реализовать CustomCoder , который является байтовым эквивалентом для двух ваших объектов, представляющих один и тот же ключ.

Вот документация Apache Beam по с указанием кодеров .

Вот сообщение в блоге с несколькими пользовательскими кодировщиками примеров .

Кроме того, Re: Само исключение. См. VerifyDeterministic документы, чтобы увидеть описание детерминированного кодера . Вероятно, вы нарушили этот констранит.

0 голосов
/ 01 апреля 2019

Спасибо.Я прочитал документ.

Я объясню, почему я хочу использовать пользовательские ключи.

Это потому, что мы хотим выразить дизъюнкцию как "или", а не как обычную комбинацию.

    class Multikey implements Serializable
        List <String> keys = new ArrayList <> ();

........

        @Override
        public boolean equals (Object k) {
            if (k instanceof Multikey) {
                List <String> ky = new ArrayList <String> (((Multikey) k) .keys);
                // Representation of disjunction
                ky.retainAll (keys);
                return! ky.isEmpty ();

            } else {
                return false;
            }
    }

Я прочитал документ, но похоже, что ключ GroupBy должен быть одним детерминированным значением.Сложно ли выразить дизъюнкцию в группировке?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...