Не удается переопределить успешно в Kafka Connect 2.4 мои разъемы - PullRequest
0 голосов
/ 09 апреля 2020

Привет! Я собираюсь использовать новую политику переопределения, выпущенную в версии 2.3, через код java.

И я хочу создать такой пример:

  • Создание топи c с 10 сообщениями

  • Создание получателя, который использует сообщения, и затем отправляет их на FileSink по умолчанию

  • Создание переопределенного приемника, который не должен принимать данные от потребителя (он настроен как самый ранний)

  • Создание сообщения, которое потребляет и принимает два приемника!

    Вот конфигурации моих коннекторов SINK (файл) (по умолчанию):

        taskOut = new FileStreamSinkTask();
        Map<String, String> sinkProperties = new HashMap<>();
        sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_LATEST);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        connectorOut.start(sinkProperties);
        taskOut.initialize(createMock(SinkTaskContext.class));
        taskOut.start(connectorOut.taskConfigs(1).get(0));

А вот самое раннее (только то, что меняется):

     sinkProperties.put(FileStreamSinkConnector.TOPICS_CONFIG, new ConstantSettingsBehavior().SINGLE_TOPIC);
        sinkProperties.put(FileStreamSinkConnector.FILE_CONFIG, new ConstantSettingsBehavior().FILE_OUT_EARLY);
        sinkProperties.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
        sinkProperties.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Далее я собираюсь создать потребителя, который будет принимать сообщения от Topi c как
List

Я передаю этот список Задачи каждого разъема:

        myLatestOne.getTaskOut().put(data);
        myEarlyOne.getTaskOut().put(data);

Но похоже, что я поступаю неправильно! Потому что все сообщения принимаются каждым разъемом

Здесь код Запрос запроса кода Переопределите код, который я использую.

Если я что-то пропустил, не делайте стесняйтесь сказать мне! (первый вопрос).

Спасибо

Ответы [ 2 ]

0 голосов
/ 16 апреля 2020

Так что я дал сделать это через JAVA. Я нашел способ сделать это с помощью терминала довольно просто:

Команда для выполнения

Сначала мы запускаем наш сервер Zookeeper:

 bin/zookeeper-server-start.sh config/zokeeper.properties

Затем мы запускаем наш сервер kafka :

bin/kafka-server-start.sh config/server.properties

Нам нужно создать топи c:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

Теперь нам нужно создать сообщения:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> [Your message]

И теперь мы можем запустить наш рабочий, с 1 разъемом. Вы можете иметь их свойства в файле конфигурации.

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
connector.client.config.override.policy=All 

connector.client.config.override.policy=All Разрешить переопределение клиента с помощью коннектора.

Вот наш коннектор с опцией earliest (Если есть смещение не сохраняется, начиная с первой записи)

name=local-file-earliest-sink 
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.earliest.txt
topics=test
consumer.override.auto.offset.reset=earliest
value.converter=org.apache.kafka.connect.storage.StringConverter
sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties

Мы остановим это через несколько секунд (вы можете посмотреть tmp/test.sink.earliest.txt).

На этот раз мы добавим новый соединитель:

name=local-file-latest-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.latest.txt
topics=test
consumer.override.auto.offset.reset=latest
value.converter=org.apache.kafka.connect.storage.StringConverter

Мы можем запустить оба из них:

sudo ./bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-sink-early.properties config/connect-file-sink-latest.properties 

Мы можем добавить новые сообщения и проверить, заполнен ли /tmp/test.sink.latest.txt только этими сообщениями.

Объяснение

Основная идея заключается в том, чтобы иметь возможность по умолчанию реконфигурировать каждый разъем по-разному. Таким образом, чтобы сделать это, мы используем add Override Policy

0 голосов
/ 10 апреля 2020

Каждый соединитель создаст новый идентификатор группы потребителей. Если они оба читают из одних и тех же тем, они оба получат все сообщения

Кроме того, переопределения потребителей и производителей уже возможны на рабочем уровне, и я не видел, чтобы кто-нибудь писал свой собственный соединитель, подобный этому так как вы можете просто использовать connect-standalone

...