Я пишу свой собственный Mqtt-разъем для Kafka. Я знаю, что в Confluent есть Mqtt-коннектор, но он не с открытым исходным кодом, поэтому я решил написать свой собственный коннектор. У меня проблема с отправкой сообщений в Kafka. Я могу получать сообщения Mqtt, но не могу отправить их в Kafka. Обычно, когда приходит сообщение от запущенного метода Mqtt Broker messageArrived
, я добавляю сообщение в BlockingQueue
следующим образом:
private final BlockingQueue<SourceRecord> queue= new LinkedBlockingQueue<>();
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("Message arrived : " + message.toString());
queue.add(new SourceRecord(null, null, kafkaTopic, null,
Schema.STRING_SCHEMA, message.getId(),
Schema.BYTES_SCHEMA, message.getPayload()));
}
И это мой poll
метод:
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<SourceRecord>();
records.add(queue.take());
return records;
}
Итак, когда я отлаживаю этот код, вывод консоли выглядит следующим образом:
[2019-03-13 17:45:37,113] INFO WorkerSourceTask{id=mqtt-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-03-13 17:45:40,385] INFO Message arrived : {"timestamp":1552488339305,"values":[{"id":"Simulation.Simulator.Temperature","v":1,"q":true,"t":1552488336813}]} (deneme.MqttSourceTask)
[2019-03-13 17:45:45,386] INFO Message arrived : {"timestamp":1552488344305,"values":[{"id":"Simulation.Simulator.Temperature","v":2,"q":true,"t":1552488341809}]} (deneme.MqttSourceTask)
[2019-03-13 17:45:48,204] INFO WorkerSourceTask{id=mqtt-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
Кажется, Mqtt-сообщения приходят, но не могут отправить Кафку. Эти два метода работают асинхронно несколькими потоками.
Когда я записываю queue.hashCode()
в консоль в методах poll
и messageArrived
, хэш-код отличается. Он действует как отдельный объект очереди. Я не понимаю, почему это так. Что вызывает эту проблему?
Может кто-нибудь сказать мне, что не так?