rkafka.read () не возвращает сообщение (возвращает только двойные кавычки) - PullRequest
0 голосов
/ 12 сентября 2018

Попытка вернуть сообщение через библиотеку rkafka в R.

Следовал той же документации rkafka @ https://cran.r -project.org / web / packages / rkafka / vignettes / rkafka.pdf

Выход возвращает "" без фактического сообщения в нем. Инструмент Kafka подтверждает, что сообщение отправлено производителем.

КОД:

prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test")
print(rkafka.read(consumer1))

Выход:

[1] ""

Требуемый выход вернет "Testing once".

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Обновление: этот код работает:

library(rkafka)

prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing twice")

rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test",groupId = "test-consumer- 
group",zookeeperConnectionTimeoutMs = "100000",autoCommitEnable = "NULL", 
autoCommitInterval = "NULL",autoOffsetReset = "NULL")


print(rkafka.read(consumer1))
print(rkafka.readPoll(consumer1))
rkafka.closeConsumer(consumer1)

Ключ должен перезапустить Kafka после удаления созданных им журналов.

0 голосов
/ 12 сентября 2018

Чтобы прочитать сообщения темы, уже записанные в тему (до запуска потребителя), необходимо установить минимальное значение смещения (эквивалентное --from-beginning).В соответствии с аргументом rkafka docs autoOffseetReset по умолчанию установлено значение largest

autoOffsetReset
самый маленький: автоматически сбрасывается смещение на самое маленькое смещение
наибольший: автоматически сбрасывает смещение на наибольшее смещение
все остальное: выбрасывает исключение для потребителя
Обязательно: необязательно
Тип: строка
по умолчанию: самое большое

Вчтобы иметь возможность принимать сообщения, вам нужно установить autoOffsetReset в "smallest".

consumer1=rkafka.createConsumer("127.0.0.1:2181","test", autoOffsetReset="smallest")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...