Я использую Kafka MongoDB Source Connector [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb] с объединенной платформой v5.4.1 и MongoDB v3.6 ReplicaSet. Исходный коннектор Kafka MongoDB был удален, и теперь, когда он снова создается через месяц, я получаю следующую ошибку.
com.mongodb.MongoQueryException: Query failed with error code 280 and error message 'resume of change stream was not possible, as the resume token was not found. {_data: BinData(0, "825F06E90400000004463C5F6964003C38316266623663632D326638612D343530662D396534652D31393936336362376130386500005A1004A486EE3E58984454ADD5BF58F364361E04")}' on server 40.118.122.226:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:29)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:267)
at com.mongodb.operation.QueryBatchCursor.tryHasNext(QueryBatchCursor.java:216)
at com.mongodb.operation.QueryBatchCursor.tryNext(QueryBatchCursor.java:200)
at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:86)
at com.mongodb.operation.ChangeStreamBatchCursor$3.apply(ChangeStreamBatchCursor.java:83)
at com.mongodb.operation.ChangeStreamBatchCursor.resumeableOperation(ChangeStreamBatchCursor.java:166)
at com.mongodb.operation.ChangeStreamBatchCursor.tryNext(ChangeStreamBatchCursor.java:83)
at com.mongodb.client.internal.MongoChangeStreamCursorImpl.tryNext(MongoChangeStreamCursorImpl.java:78)
at com.mongodb.kafka.connect.source.MongoSourceTask.getNextDocument(MongoSourceTask.java:338)
at com.mongodb.kafka.connect.source.MongoSourceTask.poll(MongoSourceTask.java:155)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-07-09 09:53:09,353] INFO Watching for collection changes on '<myDBName.myCollectionName>' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
После поиска причины этой ошибки я понимаю, что токен возобновления не найден в журнале операций, так как журнал операций ограничен по памяти / размеру, и старую информацию можно было бы удалить. Я также понимаю, что, чтобы свести к минимуму возникновение этих проблем, я должен увеличить размер журнала сообщений et c. Но я хочу знать, есть ли возможность исправить это со стороны Kafka / Confluent Platform? Например, если бы я мог удалить Kafka topi c, K SQL topi c, поскольку я создаю поток, используя topi c 'myDBName.myCollectionNamedata', данные, связанные с topi c, или в Kafka Connect, чтобы исходный коннектор MongoDB снова начал фиксировать изменения в коллекциях MongoDB с текущего времени, отбрасывая старую информацию?