Я хочу использовать Apache Kafka Connect, чтобы получать твиты из Twitter и помещать их в мой экземпляр Kafka. Я столкнулся со странной проблемой, а именно, я запускаю скрипт:
connect-standalone.bat connect-standalone.properties twitter.properties
и получаю список ошибок в журнале консоли, например:
[2019-10-07 11:48:02,758] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2019-10-07 11:48:12,759] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Flush of offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 3 more
Caused by: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
at java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:231)
at java.base/java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:478)
at java.base/java.nio.file.Files.newOutputStream(Files.java:219)
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
... 6 more
[2019-10-07 11:48:12,763] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2019-10-07 11:48:22,765] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Flush of offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 3 more
Caused by: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
at java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:231)
at java.base/java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:478)
at java.base/java.nio.file.Files.newOutputStream(Files.java:219)
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
... 6 more
[2019-10-07 11:48:22,770] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
[2019-10-07 11:48:32,772] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Flush of offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSourceTask)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 3 more
Caused by: java.nio.file.NoSuchFileException: \tmp\connect.offsets
at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:85)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108)
at java.base/sun.nio.fs.WindowsFileSystemProvider.newByteChannel(WindowsFileSystemProvider.java:231)
at java.base/java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:478)
at java.base/java.nio.file.Files.newOutputStream(Files.java:219)
at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
... 6 more
[2019-10-07 11:48:32,776] ERROR WorkerSourceTask{id=TwitterSourceDemo-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)
По-видимому, это не должно работатьно мой Kafka-Consumer, который слушает Kafka-Cluster, получает твиты, несмотря на эти ошибки, возникающие при запуске Kafka.
Мой connect-standalone.properties выглядит следующим образом:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=connectors
twitter.properties выглядит следующим образом:
name=TwitterSourceDemo
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector
twitter.oauth.accessTokenSecret=accesTokenSecret
process.deletes=false
filter.keywords=bitcoin
kafka.status.topic=twitter_status_connect
kafka.delete.topic=twitter_deletes_connect
twitter.oauth.consumerSecret=consumerSecret
twitter.oauth.accessToken=accessToken
twitter.oauth.consumerKey=consumerKey
Визуальное дерево моего проекта:
![enter image description here](https://i.stack.imgur.com/C4s4t.png)
Я ищу проблему в строке:
offset.storage.file.filename=/tmp/connect.offsets
, потому что я использую Windows 10, у которой нет этого пути.
Я хочу знать, возможно ли исправить мою конфигурацию, чтобы избавиться от этих ошибок при запуске Kafka.
Буду благодарен за предложение, как решить эту проблему.