Не удалось найти класс io.confluent.connect.avro.ConnectDefault - PullRequest
0 голосов
/ 10 марта 2019

Я столкнулся с аналогичной проблемой, упомянутой в https://github.com/confluentinc/kafka-streams-examples/issues/22. Я изменил конфигурацию источника JDBC в соответствии с рекомендацией в комментарии, но все еще столкнулся с той же проблемой.Можете ли вы помочь мне, что мне не хватает?Ниже вы найдете мою конфигурацию и мою Kafka Stream

JDBC-источник конфигурации
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=changeDateTime
incrementing.column.name=id
connection.password=xxxx
tasks.max=2
table.types=TABLE
mode=timestamp+incrementing
topic.prefix=Order-Topic
poll.interval.ms=5000
validate.non.null=true
query=select id, type, status, reservationId, processType, changeDateTime from order
key.converter.schemas.enable=false
value.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=SetSchemaName
transforms.SetSchemaName.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetSchemaName.schema.name=com.raj.reservation.api.Order
transforms.SetSchemaNameschema.version=3
connection.user=testuser
connection.url=<my msql jdbc server>
Kafka Stream App
       Properties property = new Properties();
        property.put(StreamsConfig.APPLICATION_ID_CONFIG, "Order_App_ID");
        property.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
        property.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        property.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        property.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        property.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

       StreamsConfig streamconfig = new StreamsConfig(property);

       Serde<String> stringSerde = Serdes.String();
       final Serde<T> valueSpecificAvroSerde = new SpecificAvroSerde();
       final Map<String, String> serdeConfig = Collections
                .singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
       valueSpecificAvroSerde.configure(serdeConfig, false); // `false` for record values


        StreamsBuilder streamBuilder = new StreamsBuilder();
        KStream<String, T> kafkaEventStream = streamBuilder
                .stream(SRC_TOPIC, Consumed.with(stringSerde, valueSpecificAvroSerde))
                .peek((k, v) -> {
                    Order order = (Order) v;
                    LOG.info(
                        "============> ***************************************** Reservation Record {}, {}", k,
                        order.getReservationId(), order.getStatus(),
                        order.getChangeDateTime()); });

        LOG.info("Started Service {} with the following config:" + "\nInput topic    : {}" + "\nStream configs : {}",
                "KafkaStreamAppTest", SRC_TOPIC, printConfigs(property));
        KafkaStreams kafkaStream = new KafkaStreams(streamBuilder.build(), streamconfig);
        kafkaStream.start();
        kafkaStream.setUncaughtExceptionHandler((t, e) -> {
            LOG.error("Thread {} had a fatal error {}", t, e, e);
        });
        try {
            Thread.sleep(35000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("Shutting down the Yelling APP now");
        kafkaStream.close();
...