Так что я дал сделать это через JAVA. Я нашел способ сделать это с помощью терминала довольно просто:
Команда для выполнения
Сначала мы запускаем наш сервер Zookeeper:
bin/zookeeper-server-start.sh config/zokeeper.properties
Затем мы запускаем наш сервер kafka :
bin/kafka-server-start.sh config/server.properties
Нам нужно создать топи c:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Теперь нам нужно создать сообщения:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [Your message]
И теперь мы можем запустить наш рабочий, с 1 разъемом. Вы можете иметь их свойства в файле конфигурации.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All
connector.client.config.override.policy=All
Разрешить переопределение клиента с помощью коннектора.
Вот наш коннектор с опцией earliest
(Если есть смещение не сохраняется, начиная с первой записи)
name=local-file-earliest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties
Мы остановим это через несколько секунд (вы можете посмотреть tmp/test.sink.earliest.txt
).
На этот раз мы добавим новый соединитель:
name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter
Мы можем запустить оба из них:
sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-early.properties config/connect-file-sink-latest.properties
Мы можем добавить новые сообщения и проверить, заполнен ли /tmp/test.sink.latest.txt
только этими сообщениями.
Объяснение
Основная идея заключается в том, чтобы иметь возможность по умолчанию реконфигурировать каждый разъем по-разному. Таким образом, чтобы сделать это, мы используем add Override Policy