confluent_kafka висит на опубликовать - PullRequest
0 голосов
/ 04 июля 2019

Когда я пытаюсь запустить hello world ( опубликовано здесь ) для confluent-kafka==1.0.1, мой процесс зависает на p.flush(). Отладочное сообщение содержит следующую ошибку:

 mytopic [0] 1 message(s) queued but broker not up

Это крайне смущает меня, потому что приложение четко взаимодействует с брокером. Прежде чем пытаться опубликовать, он успешно создает тему, чего нельзя было бы сделать, не взаимодействуя с брокером. Я включил сценарий, среду, в которой он запущен, и полный журнал ниже.

Я понятия не имею, что здесь может пойти не так, поэтому любой совет будет оценен.

Тестовый скрипт:

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': f"{flask_application.config['KAFKA_HOST']}:9092",
              "debug": "topic,msg,broker"})


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


for data in ['imadata']:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
    print('produce called')

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
print('calling flush')
p.flush()
print('flush called')

docker compose для kafka / zookeeper:

version: "3.0"
services:
  zookeeper:
    image: 'bitnami/zookeeper:3.5.5-r11'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2.2.1-r14'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper

журнал ошибок:

%7|1562193919.214|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1562193919.214|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1562193919.214|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1562193919.214|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.0.1 (0x10001ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd, GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM CRC32C_HW, debug 0x46)
%7|1562193919.214|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1562193919.214|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1562193919.214|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1562193919.214|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1562193919.214|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1562193919.214|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: mytopic
%7|1562193919.214|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW mytopic [-1] 0x7fdc0b7988e0 (at rd_kafka_topic_new0:393)
%7|1562193919.216|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv6#[::1]:9092 (plaintext) with socket 10
%7|1562193919.216|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected to ipv6#[::1]:9092
%7|1562193919.216|CONNECTED|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected (#1)
%7|1562193919.216|FEATURE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1562193919.216|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1562193919.220|FEATURE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD
%7|1562193919.220|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1562193919.221|BROKER|rdkafka#producer-1| [thrd:main]: 455b444cbea7:9092/1001: Added new broker with NodeId 1001
%7|1562193919.221|STATE|rdkafka#producer-1| [thrd:main]: Topic mytopic changed state unknown -> exists
%7|1562193919.221|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic mytopic partition count changed from 0 to 1
%7|1562193919.221|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW mytopic [0] 0x7fdc0b4d8280 (at rd_kafka_topic_partition_cnt_update:620)
%7|1562193919.221|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mytopic partition 0 Leader 1001
%7|1562193919.221|BRKDELGT|rdkafka#producer-1| [thrd:main]: mytopic [0]: delegate to broker 455b444cbea7:9092/1001 (rktp 0x7fdc0b4d8280, term 0, ref 2, remove 0)
%7|1562193919.221|BRKDELGT|rdkafka#producer-1| [thrd:main]: mytopic [0]: broker 455b444cbea7:9092/1001 is now leader for partition with 0 messages (0 bytes) queued
%7|1562193919.221|BRKMAIN|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Enter main broker thread
%7|1562193919.221|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic mytopic [0] 0x7fdc0b4d8280 from (none) to 455b444cbea7:9092/1001 (sending PARTITION_JOIN to 455b444cbea7:9092/1001)
%7|1562193919.221|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 1 unassigned messages in topic mytopic to 1 partitions
%7|1562193919.221|TOPBRK|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Topic mytopic [0]: joining broker (rktp 0x7fdc0b4d8280, 0 message(s) queued)
%7|1562193919.221|FETCHADD|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Added mytopic [0] to active list (1 entries, opv 0, 0 messages queued)
%7|1562193919.221|UAS|rdkafka#producer-1| [thrd:main]: 1/1 messages were partitioned in topic mytopic
%7|1562193919.221|TOPPAR|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: mytopic [0] 1 message(s) queued but broker not up
%7|1562193919.221|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state INIT -> TRY_CONNECT
%7|1562193919.221|METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: 1/1 requested topic(s) seen in metadata
%7|1562193919.221|CLUSTERID|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId update "" -> "aOByDyDMTpuVFWCzenRfVw"
%7|1562193919.221|CONTROLLERID|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: ControllerId update -1 -> 1001
%7|1562193919.221|CONNECT|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: broker in state TRY_CONNECT connecting
%7|1562193919.221|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state TRY_CONNECT -> CONNECT
%7|1562193919.222|BROKERFAIL|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1562193919.222|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state CONNECT -> DOWN
%7|1562193919.223|TOPPAR|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: mytopic [0] 1 message(s) queued but broker not up
%7|1562193920.217|QRYLEADER|rdkafka#producer-1| [thrd:main]: Topic mytopic [0]: leader is down: re-query
%7|1562193920.219|METADATA|rdkafka#producer-1| [thrd:main]:   Topic mytopic partition 0 Leader 1001
%7|1562193920.219|METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/bootstrap: 1/1 requested topic(s) seen in metadata
%7|1562193920.223|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state DOWN -> INIT
%7|1562193920.223|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state INIT -> TRY_CONNECT
%7|1562193920.223|CONNECT|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: broker in state TRY_CONNECT connecting
%7|1562193920.223|STATE|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: Broker changed state TRY_CONNECT -> CONNECT
%7|1562193920.224|BROKERFAIL|rdkafka#producer-1| [thrd:455b444cbea7:9092/1001]: 455b444cbea7:9092/1001: failed: err: Local: Host resolution failure: (errno: Bad address)

UPDATE

После модификации docker compose с помощью слушателей kafka журнал теперь выглядит следующим образом:

%7|1562359630.985|BROKER|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1562359630.985|BRKMAIN|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1562359630.985|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1562359630.985|CONNECT|rdkafka#producer-1| [thrd:app]: localhost:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1562359630.985|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v1.0.1 (0x10001ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd, GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM CRC32C_HW, debug 0x46)
%7|1562359630.985|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1562359630.985|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1562359630.985|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1562359630.985|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1562359630.985|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: anothertopic
%7|1562359630.985|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW anothertopic [-1] 0x7f9fe2e8c350 (at rd_kafka_topic_new0:393)
produce called
calling flush
%7|1562359630.987|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv6#[::1]:9092 (plaintext) with socket 10
%7|1562359630.987|CONNECT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected to ipv6#[::1]:9092
%7|1562359630.987|CONNECTED|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected (#1)
%7|1562359630.987|FEATURE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1562359630.987|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1562359630.994|FEATURE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD
%7|1562359630.994|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1562359630.999|STATE|rdkafka#producer-1| [thrd:main]: Topic anothertopic changed state unknown -> exists
%7|1562359630.999|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic anothertopic partition count changed from 0 to 1
%7|1562359630.999|TOPPARNEW|rdkafka#producer-1| [thrd:main]: NEW anothertopic [0] 0x7f9fe2e8d420 (at rd_kafka_topic_partition_cnt_update:620)
%7|1562359630.999|METADATA|rdkafka#producer-1| [thrd:main]:   Topic anothertopic partition 0 Leader 1001
%7|1562359630.999|BRKDELGT|rdkafka#producer-1| [thrd:main]: anothertopic [0]: delegate to broker (none) (rktp 0x7f9fe2e8d420, term 0, ref 2, remove 0)
%7|1562359630.999|BRKDELGT|rdkafka#producer-1| [thrd:main]: anothertopic [0]: broker :0/internal is now leader for partition with 0 messages (0 bytes) queued
%7|1562359630.999|BRKMIGR|rdkafka#producer-1| [thrd:main]: Migrating topic anothertopic [0] 0x7f9fe2e8d420 from (none) to :0/internal (sending PARTITION_JOIN to :0/internal)
%7|1562359630.999|PARTCNT|rdkafka#producer-1| [thrd:main]: Partitioning 1 unassigned messages in topic anothertopic to 1 partitions
%7|1562359630.999|TOPBRK|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Topic anothertopic [0]: joining broker (rktp 0x7f9fe2e8d420, 0 message(s) queued)
%7|1562359630.999|FETCHADD|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Added anothertopic [0] to active list (1 entries, opv 0, 0 messages queued)
%7|1562359630.999|UAS|rdkafka#producer-1| [thrd:main]: 1/1 messages were partitioned in topic anothertopic
%7|1562359630.999|UPDATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: NodeId changed from -1 to 1001
%7|1562359630.999|UPDATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Name changed from localhost:9092/bootstrap to localhost:9092/1001
%7|1562359630.999|METADATA|rdkafka#producer-1| [thrd:main]: localhost:9092/1001: 1/1 requested topic(s) seen in metadata
%7|1562359630.999|TOPICUPD|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: Topic anothertopic [0] migrated from broker -1 to 1001
%7|1562359630.999|BRKDELGT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: anothertopic [0]: delegate to broker localhost:9092/1001 (rktp 0x7f9fe2e8d420, term 0, ref 2, remove 0)
%7|1562359630.999|BRKDELGT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: anothertopic [0]: broker :0/internal no longer leader
%7|1562359630.999|BRKDELGT|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: anothertopic [0]: broker localhost:9092/1001 is now leader for partition with 1 messages (7 bytes) queued
%7|1562359630.999|BRKMIGR|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: Migrating topic anothertopic [0] 0x7f9fe2e8d420 from :0/internal to localhost:9092/1001 (sending PARTITION_LEAVE to :0/internal)
%7|1562359630.999|LEADER|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Mapped 1 partition(s) to broker
%7|1562359630.999|TOPBRK|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Topic anothertopic [0]: leaving broker (0 messages in xmitq, next leader localhost:9092/1001, rktp 0x7f9fe2e8d420)
%7|1562359630.999|FETCHADD|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Removed anothertopic [0] from active list (0 entries, opv 0)
%7|1562359630.999|CLUSTERID|rdkafka#producer-1| [thrd:main]: localhost:9092/1001: ClusterId update "" -> "81Cu9QJLRwerPnUGz-DzjA"
%7|1562359630.999|CONTROLLERID|rdkafka#producer-1| [thrd:main]: localhost:9092/1001: ControllerId update -1 -> 1001
%7|1562359630.999|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Broker changed state UP -> UPDATE
%7|1562359630.999|TOPBRK|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Topic anothertopic [0]: joining broker (rktp 0x7f9fe2e8d420, 1 message(s) queued)
%7|1562359630.999|FETCHADD|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Added anothertopic [0] to active list (1 entries, opv 0, 1 messages queued)
%7|1562359630.999|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Broker changed state UPDATE -> UP
%7|1562359630.999|PRODUCE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: anothertopic [0]: Produce MessageSet with 1 message(s) (75 bytes, ApiVersion 7, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid})
Message delivered to anothertopic [0]
flush called
%7|1562359631.003|MSGSET|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: anothertopic [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) delivered
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1562359631.056|PARTCNT|rdkafka#producer-1| [thrd:main]: Topic anothertopic partition count changed from 1 to 0
%7|1562359631.056|REMOVE|rdkafka#producer-1| [thrd:main]: anothertopic [0] no longer reported in metadata
%7|1562359631.056|BRKMIGR|rdkafka#producer-1| [thrd:main]: anothertopic [0] 0x7f9fe2e8d420 sending final LEAVE for removal by localhost:9092/1001
%7|1562359631.056|TOPPARREMOVE|rdkafka#producer-1| [thrd:main]: Removing toppar anothertopic [-1] 0x7f9fe2e8c350
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:main]: anothertopic [-1]: 0x7f9fe2e8c350 DESTROY_FINAL
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to localhost:9092/1001
%7|1562359631.056|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1562359631.056|BROKERFAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Undefined error: 0)
%7|1562359631.056|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 71ms in state INIT)
%7|1562359631.056|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1562359631.056|TOPBRK|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Topic anothertopic [0]: leaving broker (0 messages in xmitq, next leader (none), rktp 0x7f9fe2e8d420)
%7|1562359631.056|FETCHADD|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Removed anothertopic [0] from active list (0 entries, opv 0)
%7|1562359631.056|TOPBRK|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Topic anothertopic [0]: no next leader, failing 0 message(s) in partition queue
%7|1562359631.056|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7f9fe24fb728), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1562359631.056|BROKERFAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Undefined error: 0)
%7|1562359631.056|TOPPARREMOVE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: Removing toppar anothertopic [0] 0x7f9fe2e8d420
%7|1562359631.056|DESTROY|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: anothertopic [0]: 0x7f9fe2e8d420 DESTROY_FINAL
%7|1562359631.056|TERM|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Received TERMINATE op in state UP: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1562359631.056|BROKERFAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: failed: err: Local: Broker handle destroyed: (errno: Resource temporarily unavailable)
%7|1562359631.056|FAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Client is terminating (after 57ms in state UP)
%7|1562359631.056|STATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Broker changed state UP -> DOWN
%7|1562359631.056|TERMINATE|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: Handle is terminating in state DOWN: 1 refcnts (0x7f9fe24da128), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1562359631.056|BROKERFAIL|rdkafka#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/1001: failed: err: Local: Broker handle destroyed: (errno: Resource temporarily unavailable)

Process finished with exit code 0

1 Ответ

1 голос
/ 05 июля 2019

Ваш запрос на создание темы перенаправляется в Zookeeper и не возвращается обратно в API производителя.

Вам необходимо настроить образ Docker по-другому, чтобы иметь возможность производить и потреблять вне (Python) контейнера

Для битнами я смог заставить их работать

ports:
    - 9092:9092
    - 29092:29092
environment:
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
    KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092

И добавьте существующие переменные окружения

...