CppKafka Consumer требуется 5 секунд для инициализации - PullRequest
0 голосов
/ 06 марта 2020

Описание

Я пытаюсь настроить Потребителя, и проходит 5 секунд, прежде чем инициализация завершается и сообщения начинают получать. Я использую CppKafka (обертка librdkafka), но я подозреваю, что дело в неправильной настройке, а не в проблеме с библиотекой.

Ранее я использовал kafka внутри docker и автозагрузку задержка составила 10 секунд. Сейчас я использую этот быстрый запуск на веб-сайте Kafka, и теперь задержка запуска составляет 5 секунд.

У меня есть функция ConsumeOne, которая выглядит следующим образом.

    const auto group = std::to_string(Clock::now().time_since_epoch().count());
    Configuration config = {
            {"metadata.broker.list", "127.0.0.1:9092"},
            {"group.id",             group},
            {"debug", "broker,protocol"}
    };

    Consumer consumer(config);
    consumer.subscribe({topicName});

    const auto timeout = Clock::now() + milliseconds(maxMillis);

    std::cout << "Consuming on [" << topicName << "]" << std::endl;
    while(Clock::now() < timeout) {
        const auto msg = consumer.poll();
        if (msg) {
            std::cout << "Received a message" << std::endl;
            auto ss = std::stringstream{};
            ss << msg.get_payload();
            const auto data = ss.str();
            consumer.commit(msg);
            consumer.unsubscribe();
            return data;
        }
    }
    consumer.unsubscribe();

И в другой ветке я записываю время, которое вы видите в журнале приложений

    for (auto i = 0; i < 15; i++) {
        std::cout << "at " << i << " seconds " << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

Журналы приложений

%7|1582884541.326|BRKMAIN|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1582884541.326|BRKMAIN|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1582884541.326|BROKER|rdkafka#consumer-2| [thrd:app]: 127.0.0.1:9092/bootstrap: Added new broker with NodeId -1
%7|1582884541.326|BRKMAIN|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Enter main broker thread
%7|1582884541.326|CONNECT|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1582884541.326|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op
at 0 seconds 
%7|1582884541.326|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
Consuming on [test]
%7|1582884541.326|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1582884541.326|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1582884541.326|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 14
%7|1582884541.327|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
%7|1582884541.327|CONNECTED|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected (#1)
%7|1582884541.327|FEATURE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1582884541.327|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1582884541.327|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1582884541.327|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received ApiVersionResponse (v0, 294 bytes, CorrId 1, rtt 0.47ms)
%7|1582884541.327|FEATURE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1582884541.327|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1582884541.327|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1582884541.328|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received MetadataResponse (v2, 64 bytes, CorrId 2, rtt 0.95ms)
%7|1582884541.328|BROKER|rdkafka#consumer-2| [thrd:main]: antergos-desktop:9092/0: Added new broker with NodeId 0
%7|1582884541.328|CLUSTERID|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: ClusterId update "" -> "1F2lvz61RX2IQDVYrXQSCg"
%7|1582884541.328|CONTROLLERID|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: ControllerId update -1 -> 0
%7|1582884541.328|BRKMAIN|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Enter main broker thread
%7|1582884542.326|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent FindCoordinatorRequest (v2, 43 bytes @ 0, CorrId 3)
%7|1582884542.327|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received FindCoordinatorResponse (v2, 38 bytes, CorrId 3, rtt 0.88ms)
%7|1582884542.327|NODENAME|rdkafka#consumer-2| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "antergos-desktop:9092"
at 1 seconds 
%7|1582884542.327|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1582884542.327|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1582884542.327|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent FindCoordinatorRequest (v2, 43 bytes @ 0, CorrId 4)
%7|1582884542.327|BROKERFAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1582884542.327|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1582884542.327|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1582884542.327|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1582884542.327|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1582884542.327|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: broker in state TRY_CONNECT connecting
%7|1582884542.327|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1582884542.328|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received FindCoordinatorResponse (v2, 38 bytes, CorrId 4, rtt 0.82ms)
%7|1582884542.329|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connecting to ipv4#192.168.20.12:9092 (plaintext) with socket 19
%7|1582884542.329|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connected to ipv4#192.168.20.12:9092
%7|1582884542.329|CONNECTED|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connected (#1)
%7|1582884542.329|FEATURE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1582884542.329|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1582884542.329|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1582884542.330|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received ApiVersionResponse (v0, 294 bytes, CorrId 1, rtt 0.72ms)
%7|1582884542.330|FEATURE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1582884542.330|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
%7|1582884542.330|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1582884542.331|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v2, 64 bytes, CorrId 2, rtt 0.62ms)
%7|1582884542.332|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent MetadataRequest (v2, 31 bytes @ 0, CorrId 5)
%7|1582884542.332|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received MetadataResponse (v2, 103 bytes, CorrId 5, rtt 0.77ms)
at 2 seconds 
%7|1582884544.326|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 127 bytes @ 0, CorrId 3)
at 3 seconds 
%7|1582884544.328|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 64 bytes, CorrId 3, rtt 1.24ms)
%7|1582884544.328|REQERR|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Group member needs a valid member ID: explicit actions Ignore
at 4 seconds 
at 5 seconds
%7|1582884546.326|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 171 bytes @ 0, CorrId 4)
%7|1582884546.333|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 181 bytes, CorrId 4, rtt 6.88ms)
%7|1582884546.334|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v2, 31 bytes @ 0, CorrId 5)
%7|1582884546.335|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v2, 103 bytes, CorrId 5, rtt 1.04ms)
%7|1582884546.335|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent SyncGroupRequest (v3, 172 bytes @ 0, CorrId 6)
%7|1582884546.337|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received SyncGroupResponse (v3, 34 bytes, CorrId 6, rtt 1.86ms)
%7|1582884546.337|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v3, 94 bytes @ 0, CorrId 7)
%7|1582884546.337|TOPBRK|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Topic test [0]: joining broker (rktp 0x7f6624003ec0, 0 message(s) queued)
%7|1582884546.337|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state INIT -> TRY_CONNECT
%7|1582884546.337|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: broker in state TRY_CONNECT connecting
%7|1582884546.337|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state TRY_CONNECT -> CONNECT
%7|1582884546.337|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent OffsetFetchRequest (v1, 60 bytes @ 0, CorrId 8)
%7|1582884546.337|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v3, 6 bytes, CorrId 7, rtt 0.66ms)
%7|1582884546.338|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received OffsetFetchResponse (v1, 30 bytes, CorrId 8, rtt 1.03ms)
%7|1582884546.338|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connecting to ipv6#[fe80::42:b2ff:fe2e:b9d4%br-8edf75baa9c3]:9092 (plaintext) with socket 20
%7|1582884546.338|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connected to ipv6#[fe80::42:b2ff:fe2e:b9d4%br-8edf75baa9c3]:9092
%7|1582884546.338|CONNECTED|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connected (#1)
%7|1582884546.338|FEATURE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1582884546.338|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1582884546.338|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent ApiVersionRequest (v0, 25 bytes @ 0, CorrId 1)
%7|1582884546.340|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received ApiVersionResponse (v0, 294 bytes, CorrId 1, rtt 1.95ms)
%7|1582884546.340|FEATURE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,UnitTest
%7|1582884546.340|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state APIVERSION_QUERY -> UP
%7|1582884546.340|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent OffsetRequest (v0, 55 bytes @ 0, CorrId 2)
%7|1582884546.341|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received OffsetResponse (v0, 32 bytes, CorrId 2, rtt 1.13ms)
%7|1582884546.342|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 3)
%7|1582884546.443|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received FetchResponse (v11, 66 bytes, CorrId 3, rtt 101.10ms)
%7|1582884546.443|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 4)
%7|1582884546.544|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received FetchResponse (v11, 66 bytes, CorrId 4, rtt 101.20ms)
%7|1582884546.544|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 5)

Журналы Кафки

ничего не показывает, пока 5 секундная метка при печати:

[2020-02-28 21:12:29,784] INFO [GroupCoordinator 0]: Preparing to rebalance group 1582884744783912064 in state PreparingRebalance with old generation 0 (__consumer_offsets-15) (reason: Adding new member rdkafka-942aade1-ae1a-4c54-9419-116c2707e2a8 with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2020-02-28 21:12:29,785] INFO [GroupCoordinator 0]: Stabilized group 1582884744783912064 generation 1 (__consumer_offsets-15) (kafka.coordinator.group.GroupCoordinator)
[2020-02-28 21:12:29,787] INFO [GroupCoordinator 0]: Assignment received from leader for group 1582884744783912064 for generation 1 (kafka.coordinator.group.GroupCoordinator)

Кажется, что-то похоже на эту проблему: https://github.com/edenhill/librdkafka/issues/1597, но я использую последнюю версию брокера.

I провел тестирование с использованием python потребителя / производителя и не испытывал задержек при запуске. Любая помощь будет оценена.

  • версия librdkafka: librdkafka-1.3.0-1
  • Apache версия Kafka: kafka_2.12-2.4.0
  • Операционная система: Antergos / Arch

ОБНОВЛЕНИЕ

После захвата последней либрдкафки (master - e2dee3ad047f6783e5363cdd75e7c009e0c99a8d) время запуска теперь сокращено до 3 секунд. Вот новый журнал:

%7|1583823340.013|BROKER|rdkafka#consumer-2| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1583823340.013|BRKMAIN|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1583823340.013|BRKMAIN|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1583823340.013|BROKER|rdkafka#consumer-2| [thrd:app]: 127.0.0.1:9092/bootstrap: Added new broker with NodeId -1
at 0 seconds 
Consuming on [test]
%7|1583823340.013|INIT|rdkafka#consumer-2| [thrd:app]: librdkafka v1.4.0-RC3-2-ge2dee3 (0x1040005) rdkafka#consumer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, GCC GXX PKGCONFIG INSTALL GNULD LDS C11THREADS LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM LZ4_EXT RAPIDJSON SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x82)
%7|1583823340.013|CONNECT|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1583823340.013|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op
%7|1583823340.013|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1583823340.013|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1583823340.013|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1583823340.014|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 19
%7|1583823340.014|CONNECT|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected to ipv4#127.0.0.1:9092
%7|1583823340.014|CONNECTED|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Connected (#1)
%7|1583823340.014|FEATURE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1583823340.014|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1583823340.014|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent ApiVersionRequest (v3, 54 bytes @ 0, CorrId 1)
%7|1583823340.015|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 1.50ms)
%7|1583823340.015|FEATURE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1583823340.015|STATE|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1583823340.015|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1583823340.016|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received MetadataResponse (v2, 64 bytes, CorrId 2, rtt 0.83ms)
%7|1583823340.016|BROKER|rdkafka#consumer-2| [thrd:main]: antergos-desktop:9092/0: Added new broker with NodeId 0
%7|1583823340.016|CLUSTERID|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: ClusterId update "" -> "DxyV4QxbTaKh7nyXov9Y7Q"
%7|1583823340.016|CONTROLLERID|rdkafka#consumer-2| [thrd:main]: 127.0.0.1:9092/bootstrap: ControllerId update -1 -> 0
%7|1583823340.017|BRKMAIN|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Enter main broker thread
%7|1583823341.012|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent FindCoordinatorRequest (v2, 43 bytes @ 0, CorrId 3)
%7|1583823341.014|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received FindCoordinatorResponse (v2, 38 bytes, CorrId 3, rtt 1.29ms)
%7|1583823341.014|NODENAME|rdkafka#consumer-2| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "antergos-desktop:9092"
%7|1583823341.014|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1583823341.014|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1583823341.014|BROKERFAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker transport failure: (errno: Success)
%7|1583823341.014|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1583823341.014|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1583823341.014|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1583823341.014|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1583823341.014|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: broker in state TRY_CONNECT connecting
%7|1583823341.014|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1583823341.014|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent FindCoordinatorRequest (v2, 43 bytes @ 0, CorrId 4)
%7|1583823341.015|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connecting to ipv4#172.17.0.1:9092 (plaintext) with socket 14
%7|1583823341.016|CONNECT|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connected to ipv4#172.17.0.1:9092
%7|1583823341.016|CONNECTED|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Connected (#1)
%7|1583823341.016|FEATURE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1583823341.016|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1583823341.016|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent ApiVersionRequest (v3, 54 bytes @ 0, CorrId 1)
%7|1583823341.016|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received FindCoordinatorResponse (v2, 38 bytes, CorrId 4, rtt 1.82ms)
%7|1583823341.017|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 1.45ms)
%7|1583823341.017|FEATURE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1583823341.017|STATE|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
%7|1583823341.017|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v2, 25 bytes @ 0, CorrId 2)
%7|1583823341.018|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v2, 64 bytes, CorrId 2, rtt 1.14ms)
%7|1583823341.019|SEND|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Sent MetadataRequest (v2, 31 bytes @ 0, CorrId 5)
%7|1583823341.020|RECV|rdkafka#consumer-2| [thrd:127.0.0.1:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received MetadataResponse (v2, 103 bytes, CorrId 5, rtt 0.94ms)
at 1 seconds 
at 2 seconds 
%7|1583823343.013|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 127 bytes @ 0, CorrId 3)
%7|1583823343.014|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 64 bytes, CorrId 3, rtt 1.54ms)
%7|1583823343.014|REQERR|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore
%7|1583823343.014|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 171 bytes @ 0, CorrId 4)
%7|1583823343.017|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 181 bytes, CorrId 4, rtt 3.06ms)
%7|1583823343.018|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent MetadataRequest (v2, 31 bytes @ 0, CorrId 5)
%7|1583823343.019|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received MetadataResponse (v2, 103 bytes, CorrId 5, rtt 1.20ms)
%7|1583823343.019|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent SyncGroupRequest (v3, 172 bytes @ 0, CorrId 6)
%7|1583823343.021|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received SyncGroupResponse (v3, 34 bytes, CorrId 6, rtt 2.43ms)
%7|1583823343.022|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent HeartbeatRequest (v3, 94 bytes @ 0, CorrId 7)
%7|1583823343.022|TOPBRK|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Topic test [0]: joining broker (rktp 0x7f1c74003ef0, 0 message(s) queued)
%7|1583823343.022|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state INIT -> TRY_CONNECT
%7|1583823343.022|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: broker in state TRY_CONNECT connecting
%7|1583823343.022|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state TRY_CONNECT -> CONNECT
%7|1583823343.022|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent OffsetFetchRequest (v1, 60 bytes @ 0, CorrId 8)
%7|1583823343.022|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v3, 6 bytes, CorrId 7, rtt 0.96ms)
%7|1583823343.023|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connecting to ipv4#172.17.0.1:9092 (plaintext) with socket 15
%7|1583823343.023|CONNECT|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connected to ipv4#172.17.0.1:9092
%7|1583823343.023|CONNECTED|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Connected (#1)
%7|1583823343.023|FEATURE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1583823343.023|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1583823343.023|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent ApiVersionRequest (v3, 54 bytes @ 0, CorrId 1)
%7|1583823343.024|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/0: Received OffsetFetchResponse (v1, 30 bytes, CorrId 8, rtt 1.83ms)
%7|1583823343.024|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received ApiVersionResponse (v3, 344 bytes, CorrId 1, rtt 1.08ms)
%7|1583823343.024|FEATURE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1583823343.024|STATE|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Broker changed state APIVERSION_QUERY -> UP
%7|1583823343.025|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent OffsetRequest (v0, 55 bytes @ 0, CorrId 2)
%7|1583823343.026|RECV|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Received OffsetResponse (v0, 32 bytes, CorrId 2, rtt 1.01ms)
%7|1583823343.026|SEND|rdkafka#consumer-2| [thrd:antergos-desktop:9092/0]: antergos-desktop:9092/0: Sent FetchRequest (v11, 94 bytes @ 0, CorrId 3)
at 3 seconds 
Received a message

Мой продюсер также выдает исключение Local: Timed out, если ему дается тайм-аут менее 2 секунд

1 Ответ

0 голосов
/ 02 мая 2020

Теперь это исправлено в мастере (https://github.com/edenhill/librdkafka/commit/c64b652689c20dc7f2ce71eaf7166d9806c25096)

Первое сообщение теперь получено между 80 - 100 мс. Спасибо, что разобрались!

...