Каков минимальный пример использования Kafka с Python? - PullRequest
0 голосов
/ 05 сентября 2018

Что я пробовал

  1. Я клонировал https://github.com/wurstmeister/kafka-docker и казнил sudo docker-compuse up.
  2. Я начал producer.py, указанный ниже.
  3. Я начал consumer.py, указанный ниже.

Это не сработало. Я изменил ports из docker-compose.yml на

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

После запуска producer.py завершил выполнение, и терминал docker-compose показал

zookeeper_1  | 2018-09-05 14:21:44,001 [myid:] - INFO  [SessionTracker:ZooKeeperServer@358] - Expiring session 0x165aa1acb900000, timeout of 6000ms exceeded
zookeeper_1  | 2018-09-05 14:21:44,002 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x165aa1acb900000
kafka_1      | [2018-09-05 14:21:44,028] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1      | [2018-09-05 14:21:44,033] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
zookeeper_1  | 2018-09-05 14:21:44,141 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x32 zxid:0x6f txntype:-1 reqpath:n/a Error Path:/admin/reassign_partitions Error:KeeperErrorCode = NoNode for /admin/reassign_partitions
zookeeper_1  | 2018-09-05 14:21:44,152 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x34 zxid:0x70 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
zookeeper_1  | 2018-09-05 14:21:47,621 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:setData cxid:0x3c zxid:0x71 txntype:-1 reqpath:n/a Error Path:/config/topics/mytopic Error:KeeperErrorCode = NoNode for /config/topics/mytopic
kafka_1      | [2018-09-05 14:21:47,628] INFO Topic creation Map(mytopic-0 -> ArrayBuffer(1003)) (kafka.zk.AdminZkClient)
kafka_1      | [2018-09-05 14:21:47,639] INFO [KafkaApi-1003] Auto creation of topic mytopic with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)

Тема создана, это хорошо. Но потом, когда я убиваю потребителей, они ничего не делают. Но докер-композитор показывает

kafka_1      | [2018-09-05 14:24:52,566] ERROR [KafkaApi-1003] Number of alive brokers '0' does not meet the required replication factor '1' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

Как мне установить минимальную установку / настройку Kafka, чтобы увидеть, как Kafka работает с Python?

producer.py

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('mytopic', key='hello', value='world')
print("produce done")
p.flush(10)

consumer.py

from confluent_kafka import Consumer, KafkaError


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'default.topic.config': {
        'auto.offset.reset': 'smallest'
    }
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            continue
        else:
            print(msg.error())
            break

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

1 Ответ

0 голосов
/ 05 сентября 2018

Попробуйте этот раздел env

  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:9094
  KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
  KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

Добавьте 9094:9094 к портам и укажите Python на localhost:9094, если вы не запускаете свой код в Docker-контейнере Python

Confluent образы имеют аналогичную настройку , но вместо порта 29092

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...