Кафка-проверяемый-производитель и проблема потребителя - PullRequest
0 голосов
/ 09 февраля 2020

Я занимаюсь экспериментированием с кафкой. Я уже запускаю

kafka-console-producer and 
kafka-console-consumer. 

Я отправляю сообщения с kafka-продюсером и успешно получаю на kafka-console-consumer. Теперь я хочу производить и потреблять около 5000 сообщений одновременно. Я изучаю документацию и узнаю, что есть две команды.

kafka-verifiable-producer.sh

kafka-verifiable-consumer.sh

Я пытался использовать эти команды.

 kafka-verifiable-producer.sh --broker-list localhost:9092 --max-messages 5000 --topic data-sending

kafka-verifiable-consumer.sh  --group-instance-id 1 --group-id data-world --topic data-sending --broker-list localhost:9092

Результат выглядит следующим образом

"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4996","offset":44630,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4997","offset":44631,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4998","offset":44632,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4999","offset":44633,"topic":"try_1","partition":0}

{"timestamp":1581268289769,"name":"shutdown_complete"}
 {"timestamp":1581268289771,"name":"tool_data","sent":5000,"acked":5000,"target_throughput":-1,"avg_throughput":5285.412262156448}

На консоли пользователя результат выглядит следующим образом

{"timestamp":1581268089357,"name":"records_consumed","count":352,"partitions":[{"topic":"try_1","partition":0,"count":352,"minOffset":32777,"maxOffset":33128}]}
{"timestamp":1581268089359,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33129}],"success":true}
{"timestamp":1581268089384,"name":"records_consumed","count":500,"partitions":[{"topic":"try_1","partition":0,"count":500,"minOffset":33129,"maxOffset":33628}]}
 {"timestamp":1581268089391,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33629}],"success":true}
 {"timestamp":1581268089392,"name":"records_consumed","count":270,"partitions":[{"topic":"try_1","partition":0,"count":270,"minOffset":33629,"maxOffset":33898}]}
 {"timestamp":1581268089394,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33899}],"success":true}
 {"timestamp":1581268089415,"name":"records_consumed","count":500,"partitions":[{"topic":"try_1","partition":0,"count":500,"minOffset":33899,"maxOffset":34398}]}
 {"timestamp":1581268089416,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":34399}],"success":true}
 {"timestamp":1581268089417,"name":"records_consumed","count":235,"partitions":[{"topic":"try_1","partition":0,"count":235,"minOffset":34399,"maxOffset":34633}]}
{"timestamp":1581268089419,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":34634}],"success":true}

В приведенных выше результатах ключ является нулевым. Как я могу отправить большую часть сообщений с этой командой? Я пытался найти один пример, как их использовать, но не нашел ни одного. Это производит целочисленное число как значения, но где я могу вставить сообщения? Есть ли способ, которым я могу использовать эту команду для создания массовых сообщений? Также возможно ли реализовать такие команды в windows или это только для linux? Любая ссылка на примеры будет принята с благодарностью.

1 Ответ

1 голос
/ 09 февраля 2020

Сценарий kafka-verifiable-producer.sh выполняет класс org.apache.kafka.tools.VerifiableProducer. (https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java)

Его программные аргументы --throughput, --repeating-keys и --value-prefix могут удовлетворить ваши потребности.

Например, следующее генерирует сообщения с значение префикса 111 и с инкрементным ключом, который вращается для каждых 5 сообщений. Вы также можете настроить пропускную способность сообщений с помощью опции --throughput. В этом примере он генерирует в среднем 5 сообщений в секунду.

./kafka-verifiable-producer.sh --broker-list localhost:9092 --max-messages 10 --repeating-keys 5 --value-prefix 100 --throughput 5 --topic test
{"timestamp":1581271492652,"name":"startup_complete"}
{"timestamp":1581271492860,"name":"producer_send_success","key":"0","value":"100.0","offset":45,"topic":"test","partition":0}
{"timestamp":1581271492862,"name":"producer_send_success","key":"1","value":"100.1","offset":46,"topic":"test","partition":0}
{"timestamp":1581271493048,"name":"producer_send_success","key":"2","value":"100.2","offset":47,"topic":"test","partition":0}
{"timestamp":1581271493254,"name":"producer_send_success","key":"3","value":"100.3","offset":48,"topic":"test","partition":0}
{"timestamp":1581271493256,"name":"producer_send_success","key":"4","value":"100.4","offset":49,"topic":"test","partition":0}
{"timestamp":1581271493457,"name":"producer_send_success","key":"0","value":"100.5","offset":50,"topic":"test","partition":0}
{"timestamp":1581271493659,"name":"producer_send_success","key":"1","value":"100.6","offset":51,"topic":"test","partition":0}
{"timestamp":1581271493860,"name":"producer_send_success","key":"2","value":"100.7","offset":52,"topic":"test","partition":0}
{"timestamp":1581271494063,"name":"producer_send_success","key":"3","value":"100.8","offset":53,"topic":"test","partition":0}
{"timestamp":1581271494268,"name":"producer_send_success","key":"4","value":"100.9","offset":54,"topic":"test","partition":0}
{"timestamp":1581271494483,"name":"shutdown_complete"}
{"timestamp":1581271494484,"name":"tool_data","sent":10,"acked":10,"target_throughput":5,"avg_throughput":5.452562704471101}

Самым простым является изменение / расширение указанного выше класса, если вам нужны более настраиваемые ключи и значения сообщений.

...