Я пытался сделать что-то вроде следующего:
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
JSONArray jsonRecords = getRecords(0, 3);
for (Object jsonRecord: jsonRecords) {
JSONObject j = new JSONObject(jsonRecord.toString());
Map sourceOffset = Collections.singletonMap("block", j.get("block").toString());
Object value = j.get("data").toString();
records.add(new SourceRecord(
Collections.singletonMap("samesourcepartition", "samesourcepartition"), // sourcePartition
sourceOffset, // sourceOffset
"mytopic", // topic
Schema.STRING_SCHEMA, // keySchema
j.get("block").toString, // key: "0", "1", "2", "3"
Schema.STRING_SCHEMA, // valueSchema
value // value
));
log.info("added record for block: " + j.get("block"));
}
log.info("Returning {} records", records.size());
return records;
}
Я не совсем понимаю, как использовать sourceOffset
. (https://docs.confluent.io/current/connect/devguide.html#task-example-source-task)
Примером block
может быть "3"
. Я ожидаю, что в этом случае, если Кафка уже прочитал это sourceOffset
, он не должен читать это снова. Но, похоже, это полностью игнорируется, и offset
продолжает расти дальше 3 и продолжает повторять те же данные 0-3 в бесконечном цикле. Например, если я посмотрю на панель мониторинга Confluent> Topics> Inspect, я ожидаю, что самые высокие записанные значения offset
и key
будут равны «3», но это более 100+ с дублированными ключами и значениями.
Должен ли мой poll () увеличивать 0-> 3, чтобы он знал, когда "остановиться"? Текущее поведение продолжает повторять 0-> 3, 0-> 3, ..., чтобы добавить new SourceRecord()
, но я представляю, что с sourceOffset
и уникальный key
должен быть идемпотентным.
Я уверен, что что-то неправильно понимаю. Я также пытался включить log compaction
, но все еще получал дубликаты даже с тем же ключом. Может ли кто-то показать правильное использование, чтобы иметь сообщение за sourceOffset
/ key
?