Проблемы приведения потоков Kafka в строку с KTable при группировании и агрегировании - PullRequest
0 голосов
/ 22 мая 2018

У меня есть поток Kafka с входящими сообщениями, который выглядит как sensor_code: x, time: 1526978768, address: Y Я хочу создать таблицу KTable, в которой каждый уникальный адрес хранится в каждом коде датчика.

Таблица KT

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

Где kvm1 и kvm2 мои KeyValueMappers.Моя идея состояла в том, чтобы заменить существующий ключ на sensor_code=x, address=y, выполнить groupByKey() и count().Затем другой groupBy(kvm2, Serialized.with(stringSerde, longSerde)), где kvm2 изменяет существующий key, чтобы он содержал sensor_code, и тогда значением будет его счет.Но так как он не работает, возможно, я делаю это неправильно ... Он пытается привести его к типу Long и выдает исключение, потому что ищет строку.Я хочу считать как Long, верно?

Вот первый KeyValueMapper, который я использую с соответствующей функцией справки:

    private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

Вот второй KeyValueMapper и его функция справки.

    private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}

        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

Вот исключение и ошибка, которые возвращаются, когда я пытаюсь запустить код.

[2018-05-29 15: 28: 40,119] ОШИБКА потокового потока [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] Не удалось обработать потоковую задачу 2_0 из-заследующая ошибка: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks: 105) java.lang.ClassCastException: java.lang.Long нельзя привести к java.lang.String в org.apache.kafka.common.serialization.StringSerializer.serialize (StringSerializer.java:28) в org.apache.kafka.streams.state.StateSerdes.rawValue (StateSerdes.java:178) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $(MeteredKeyValueBytesStore.java:66) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore $ 1.innerValue (MeteredKeyValueBytesStore.java:57) в org.apache.kafka.streams.staueneree .Me.Java: 198) в org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put (MeteredKeyValueBytesStore.java:117) в org.apache.kafka.streams.kstream.internals.

Обратите внимание на ошибку java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String.

Любые идеи, почему я получаю эту ошибку и как я могу ее исправить, или советы, как я могу отредактировать код для достижения желаемого результата, как я уже упоминал?

Заранее большое спасибо!

РЕДАКТИРОВАТЬ: Произошла серьезная перестройка моего вопроса, так как я отказался от одного из подходов.

1 Ответ

0 голосов
/ 22 мая 2018

В первом случае, если вы хотите использовать HashMap в качестве типа значения, вам нужно определить для него пользовательский serde и передать его с помощью Materialized. withValueSerde .

Во втором случае я не могу сказать, не видя тип возвращаемого значения из ваших KeyValueMappers и точное сообщение об ошибке: он пытается привести String к Long или наоборот?

EDIT: Спасибо, что поделились дополнительной информацией.

Я думаю, что вам нужно во втором случае также указать значение serde во второй операции подсчета.Кажется, что существует несоответствие count () для KGroupedStream и KGroupedTable в том, что первый автоматически устанавливает значение serde в LongSerde:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L281-L283

, но KGroupedTable не:

https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L253

Кажется, он уже исправлен на транке, но еще не освобожден:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java#L158-L160

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...