Сериализация HashMap в потоках Кафки - PullRequest
0 голосов
/ 18 сентября 2018

Я пытаюсь сериализовать HashMap<String, String>, которые представляют собой некоторые свойства DTO, которые в конечном итоге попадают в индекс ElasticSearch.

У меня есть такой класс:

class MyDto {
    HashMap<String, String> properties;
}

Я хочу, чтобы всесвойства заканчиваются прямо под _source в ES:

_source: {
    "firstPropKey": "firstValue",
    "secondPropKey": "secondVal"
    ...
}

, а НЕ вот этим:

_source: {
    properties: {
       "firstPropKey": "firstValue",
       "secondPropKey": "secondVal"
       ...
    }
}

Я пробовал различные реализации, но не могу заставить его работать.Вот один из подходов:

final StreamsBuilder builder = new StreamsBuilder();

builder.stream(dataTopicName, Consumed.with(Serdes.String(), dtoSerde))
    .map((key, myDTO) -> new KeyValue<>(id, myDTO.properties))
    .to(elasticIndexTopicName, Produced.with(Serdes.String(), dtoPropsSerde));

Проблема в том, что dto.properties HashMap параметризован с помощью <String, String>, а Jackson2Serde не разрешает параметризованные классы, поэтому мне нужно сделать так:

private final Jackson2Serde<HashMap> dtoPropsSerde = new Jackson2Serde<>(mapper, HashMap.class);

Но тогда я получаю ошибку типа, более конкретно: Cannot resolve method to(String, Produced<K,V>)

Если я создаю dto.properties старый непараметризированный HashMap, он все равно не работает.Не могу найти ничего полезного в моих журналах.Чего мне не хватает?

РЕДАКТИРОВАТЬ: Стоит отметить, что я не получаю ничего в теме elasticIndexTopicName, которую читает подключаемый модуль ElasticSearch.

...