Kafka Connect дублирует сообщения в теме после установки sourceOffset, уникального ключа, сжатия журнала - PullRequest
2 голосов
/ 06 июля 2019

Я пытался сделать что-то вроде следующего:

public List<SourceRecord> poll() throws InterruptedException {
  List<SourceRecord> records = new ArrayList<>();

  JSONArray jsonRecords = getRecords(0, 3);

  for (Object jsonRecord: jsonRecords) {
   JSONObject j = new JSONObject(jsonRecord.toString());

   Map sourceOffset = Collections.singletonMap("block", j.get("block").toString());
   Object value = j.get("data").toString();

   records.add(new SourceRecord(
    Collections.singletonMap("samesourcepartition", "samesourcepartition"), // sourcePartition
    sourceOffset, // sourceOffset
    "mytopic", // topic
    Schema.STRING_SCHEMA, // keySchema
    j.get("block").toString, // key: "0", "1", "2", "3"
    Schema.STRING_SCHEMA, // valueSchema
    value // value
   ));

   log.info("added record for block: " + j.get("block"));
  }

  log.info("Returning {} records", records.size());

  return records;
}

Я не совсем понимаю, как использовать sourceOffset. (https://docs.confluent.io/current/connect/devguide.html#task-example-source-task)

Примером block может быть "3". Я ожидаю, что в этом случае, если Кафка уже прочитал это sourceOffset, он не должен читать это снова. Но, похоже, это полностью игнорируется, и offset продолжает расти дальше 3 и продолжает повторять те же данные 0-3 в бесконечном цикле. Например, если я посмотрю на панель мониторинга Confluent> Topics> Inspect, я ожидаю, что самые высокие записанные значения offset и key будут равны «3», но это более 100+ с дублированными ключами и значениями.

Должен ли мой poll () увеличивать 0-> 3, чтобы он знал, когда "остановиться"? Текущее поведение продолжает повторять 0-> 3, 0-> 3, ..., чтобы добавить new SourceRecord(), но я представляю, что с sourceOffset и уникальный key должен быть идемпотентным.

Я уверен, что что-то неправильно понимаю. Я также пытался включить log compaction, но все еще получал дубликаты даже с тем же ключом. Может ли кто-то показать правильное использование, чтобы иметь сообщение за sourceOffset / key?

...