Весенняя загрузка кафки сообщений. Как упростить отображение dto для обработчиков? - PullRequest
0 голосов
/ 05 ноября 2018

Я настроил свой весенний загрузочный проект, используя Kafka. Я могу получать и публиковать любые строковые сообщения.

Строковое сообщение не лучший способ обработки. Было бы гораздо полезнее иметь функциональность для преобразования по умолчанию сообщения из строки в объект.

Реализация этой функции Мне нужно переместить почти все мои конфигурации Kafka с yml на java (используя свойство). ... пример производителя

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AccountSerializer.class);
    return props;
}

@Bean
public ProducerFactory<String, Account> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Account> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

Код работает, но я принял упрощение. В лучшем случае я хотел бы иметь элегантно настроенный yml, возможно, будут некоторые java-изменения. Но, делая это напрямую, я получу дополнительные 3 бина для настройки каждого kafkaTemplate и listenerFactory.

Возможно, это упростит конфигурации на будущее (мне понадобится больше дополнительных Serializer `Deserializer`)? Как?

приписка

Я хотел бы настроить yml аналогично в этом примере :

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  topic:
    foo: foo.t

но мне непонятно, как настроить здесь потребителей \ производителей с разными (De) Serializer с, сопоставлять их по указанной теме ...

Ответы [ 2 ]

0 голосов
/ 08 ноября 2018

Кажется, у меня нет шансов настроить для одного и того же слушателя разные SERIALIZER | DESERIALIZER s.

Но идентификатор не означает, что у моей проблемы нет решения.

Я использовал наследование для всех своих объектов и предоставил абстракцию AbstractEvent. AbstractEvent бесполезен в общем, но он используется в моем решении как точка входа для указанного SERIALIZER | DESERIALIZER. Чтобы узнать, какой объект находится в контексте, я использовал пользовательские заголовки. org.apache.kafka.common.serialization.Deserializer не имеет параметров заголовка, но я реализовал DESERIALIZER на основе ExtendedDeserializer. Этот способ дает мне доступ к заголовкам

via public T deserialize(String topic, Headers headers, byte[] data)

пример моего десериализатора :

@Slf4j
public class AbstractEventDeserializer<T extends AbstractEvent> implements ExtendedDeserializer<T> {

    private Map<String, Class<T>> mappers = new HashMap<>();

    // default behavior
    @Override
    public T deserialize(String arg0, byte[] devBytes) {
        ObjectMapper mapper = new ObjectMapper();
        T bar = null;
        try {
            bar = (T) mapper.readValue(devBytes, Bar.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return bar;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public T deserialize(String topic, Headers headers, byte[] data) {
        log.info("handling...");
        headers.forEach(header -> log.info("   {}: {}", header.key(), getHeaderValueAsString(header)));
        Optional<String> classTypeFromHeader = getClassTypeFromHeader(headers);
        if (classTypeFromHeader.isPresent()) {
            return parseFromJson(data, mappers.get(classTypeFromHeader.get()));
        }
        return deserialize(topic, data);
    }

    private Optional<String> getClassTypeFromHeader(Headers headers) {
        return StreamSupport.stream(headers.headers("X-CLASS-TYPE").spliterator(), false)
                .map(Header::value)
                .map(String::new)
                .findFirst();
    }

    private String getHeaderValueAsString(Header header) {
        return Optional.ofNullable(header.value())
                .map(String::new)
                .orElse(null);
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
        log.info("configuring deserialiser");
        if (arg0.containsKey("mappers")) {
            this.mappers = (Map<String, Class<T>>) arg0.get("mappers");
        }
        arg0.keySet().forEach(key -> log.info("   {}:{}", key, arg0.get(key)));
    }

}

Если вы хотите попробовать рабочее решение, пожалуйста, проверьте экспериментальный пример .

0 голосов
/ 05 ноября 2018

Облачные сервисы Spring обеспечивают лучшую конфигурацию для потребителей, параллелизм, десериализацию и меньше кода.

   <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

Образец раковины

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

public static void main(String[] args) {
    SpringApplication.run(LoggingConsumerApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void handle(Person person) {
    System.out.println("Received: " + person);
}

public static class Person {
    private String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}
}

Пример конфигурации:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: <your topic>
          group: <your consumer group>
          consumer:
            headerMode: raw
            partitioned: true
            concurrency: 10
      kafka:
        binder:
          brokers: <Comma seperated list of kafka brokers>

Более подробная информация доступна здесь https://cloud.spring.io/spring-cloud-stream/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...