Я просто отчаянно ищу пример кода для адаптера Esper CEP Kafka. Я уже установил Kafka и записал данные в тему Kafka с помощью производителя, и теперь я хочу обработать его с помощью Esper CEP. К сожалению, документация Esper для Kafka Adapter не очень значима. У кого-нибудь есть очень простой пример?
Edit:
Пока что я добавил адаптер, и он, кажется, работает. Однако я не знаю, как читать адаптер и как связать шаблон CEP с этим адаптером. Это мой код:
config.addImport(KafkaOutputDefault.class);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");
props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "test123");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());
Configuration config2 = new Configuration();
config2.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);
EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();