Этот скрипт - метод подписки на событие от Kafka.
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
static void Main(string[] args)
{
string brokerList = "broker";
var topics = new List<string>() { "topicName" };
var config = new Dictionary<string, object>
{
{ "group.id", "ConsumerGroup" },
{ "bootstrap.servers", brokerList },
{ "auto.offset.reset", "earliest" },
{ "enable.auto.commit", false }
};
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) =>
{
...
};
consumer.Subscribe(topics);
while (true)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
}
}
Когда я отслеживаю код в режиме отладки, порядок подписки на событие:
consumer.Subscribe(topics)
consumer.Poll(TimeSpan.FromMilliseconds(1000));
consumer.OnMessage += (obj, msg) =>
Прежде чем получить новое событие (перейдите к consumer.OnMessage
), он потратил немного времени на опрос (вconsumer.Poll
) и распечатайте некоторую информацию в окне консоли.
Как следует:
4|2018-12-12 10:41:53.381|rdkafka#consumer-1|REQTMOUT| [thrd:broker/bootstrap]: broker/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
В моих первоначальных мыслях он использует consumer.Subscribe(topics)
для подключения к брокеру и consumer.Poll
для потребленияновое событие.
Но похоже, что consumer.Poll
включает в себя подключение к брокеру и использование нового события.
Мои вопросы:
- Какая функция можетподключиться к брокеру?
consumer.Subscribe
или consumer.Poll
или? - Почему
consumer.Poll
печатает информацию в окне консоли?И кажется, что есть какая-то ошибка ( Тайм-аут 1 в полете ).