Сброс 0 выдающихся сообщений для фиксации смещения - PullRequest
1 голос
/ 14 марта 2019

Я пишу свой собственный Mqtt-разъем для Kafka. Я знаю, что в Confluent есть Mqtt-коннектор, но он не с открытым исходным кодом, поэтому я решил написать свой собственный коннектор. У меня проблема с отправкой сообщений в Kafka. Я могу получать сообщения Mqtt, но не могу отправить их в Kafka. Обычно, когда приходит сообщение от запущенного метода Mqtt Broker messageArrived, я добавляю сообщение в BlockingQueue следующим образом:

private final BlockingQueue<SourceRecord> queue= new LinkedBlockingQueue<>();

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    log.info("Message arrived : " + message.toString());
    queue.add(new SourceRecord(null, null, kafkaTopic, null,
            Schema.STRING_SCHEMA, message.getId(),
            Schema.BYTES_SCHEMA, message.getPayload()));
    }

И это мой poll метод:

@Override
public List<SourceRecord> poll() throws InterruptedException {
    List<SourceRecord> records = new ArrayList<SourceRecord>();
    records.add(queue.take());
    return records;
}

Итак, когда я отлаживаю этот код, вывод консоли выглядит следующим образом:

[2019-03-13 17:45:37,113] INFO WorkerSourceTask{id=mqtt-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-03-13 17:45:40,385] INFO Message arrived : {"timestamp":1552488339305,"values":[{"id":"Simulation.Simulator.Temperature","v":1,"q":true,"t":1552488336813}]} (deneme.MqttSourceTask)
[2019-03-13 17:45:45,386] INFO Message arrived : {"timestamp":1552488344305,"values":[{"id":"Simulation.Simulator.Temperature","v":2,"q":true,"t":1552488341809}]} (deneme.MqttSourceTask)
[2019-03-13 17:45:48,204] INFO WorkerSourceTask{id=mqtt-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)

Кажется, Mqtt-сообщения приходят, но не могут отправить Кафку. Эти два метода работают асинхронно несколькими потоками.

Когда я записываю queue.hashCode() в консоль в методах poll и messageArrived, хэш-код отличается. Он действует как отдельный объект очереди. Я не понимаю, почему это так. Что вызывает эту проблему?

Может кто-нибудь сказать мне, что не так?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...