Как использовать Camel-AVRO-потребитель и производитель? - PullRequest
1 голос
/ 15 марта 2019

Я не вижу пример того, как использовать компонент camel-avro для создания и потребления сообщений kafka avro?В настоящее время мой верблюжий маршрут это.что нужно изменить, чтобы работать со схемой-реестром и другими подобными подпорками, используя camel-kafka-avro для потребителя и производителя.

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);              
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

public void configure() {
        PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); 
        pc.setLocation("classpath:application.properties");

        log.info("About to start route: Kafka Server -> Log ");

        from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                + "&maxPollRecords={{consumer.maxPollRecords}}"
                + "&consumersCount={{consumer.consumersCount}}"
                + "&seekTo={{consumer.seekTo}}"
                + "&groupId={{consumer.group}}"
                +"&valueDeserializer="+KafkaAvroDeserializer.class
                +"&keyDeserializer="+StringDeserializer.class
                )
                .routeId("FromKafka")
            .log("${body}");

1 Ответ

2 голосов
/ 19 марта 2019

Я отвечаю на свой вопрос, потому что сидел над этой проблемой пару дней. Я надеюсь, что этот ответ будет полезен для других.

Я попытался использовать десериализатор io.confluent.kafka.serializers.KafkaAvroDeserializer и получил исключение kafka. поэтому мне пришлось написать собственный десериализатор, чтобы делать следующие вещи:

  1. установить схему реестра
  2. использовать определенный читатель avro (что означает не stringDeserializer по умолчанию)

тогда мы должны получить доступ к "schemaRegistry", "useSpecificAvroReader" и установить эти поля AbstractKafkaAvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer)

Вот решение ...

CAMEL-KAFKA-AVRO-ROUTE-СТРОИТЕЛЬ

public static void main(String[] args) throws Exception {
    LOG.info("About to run Kafka-camel integration...");
    CamelContext camelContext = new DefaultCamelContext();
    // Add route to send messages to Kafka
    camelContext.addRoutes(new RouteBuilder() {
        public void configure() throws Exception {                
            PropertiesComponent pc = getContext().getComponent("properties", 
                                      PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&keyDeserializer="+ StringDeserializer.class.getName() 
                    + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName()
                    )
                    .routeId("FromKafka")
                .log("${body}");

        }
    });
    camelContext.start();
    // let it run for 5 minutes before shutting down
    Thread.sleep(5 * 60 * 1000);
    camelContext.stop();
}

DESERIALIZER CLASSS - задает schema.registry.url & use.specific.avro.reader на абстрактном уровне AbstractKafkaAvroDeserializer. Если я не установлю это, я получу kafka-config-exception.

package com.example.camel.kafka.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;


import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;



public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
    implements Deserializer<Object> {

    private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";

    @Override
    public void configure(KafkaAvroDeserializerConfig config) {

     try {
          final List<String> schemas = 
                              Collections.singletonList(SCHEMA_REGISTRY_URL);
          this.schemaRegistry = new CachedSchemaRegistryClient(schemas, 
                                  Integer.MAX_VALUE);
          this.useSpecificAvroReader = true;

       } catch (ConfigException e) {
              throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
     }
   }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    configure(null);
  }

  @Override
  public Object deserialize(String s, byte[] bytes) {
    return deserialize(bytes);
  }

  @Override
  public void close() {
  }
}
...