поиск очень простого примера EsperIO Kafka - PullRequest
0 голосов
/ 07 ноября 2018

Я просто отчаянно ищу пример кода для адаптера Esper CEP Kafka. Я уже установил Kafka и записал данные в тему Kafka с помощью производителя, и теперь я хочу обработать его с помощью Esper CEP. К сожалению, документация Esper для Kafka Adapter не очень значима. У кого-нибудь есть очень простой пример?

Edit:

Пока что я добавил адаптер, и он, кажется, работает. Однако я не знаю, как читать адаптер и как связать шаблон CEP с этим адаптером. Это мой код:

config.addImport(KafkaOutputDefault.class);
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");

props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "test123");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());

Configuration config2 = new Configuration();
config2.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);

EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();

1 Ответ

0 голосов
/ 08 ноября 2018

Пример кода приведен ниже. Этот код предполагает, что в теме уже есть некоторые сообщения. Это не зацикливание и ожидание большего количества сообщений.

    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
    KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
    ConsumerRecords<String, String> rows = consumer.poll(1000);
    Iterator<ConsumerRecord<String, String>> it = rows.iterator();
    while (it.hasNext()) {
        ConsumerRecord<String, String> row = it.next();
        MyEvent event = new MyEvent(row.value()); // transform string to event

        // process event
        runtime.sendEvent(event);
    }
...