Задержка в получении клиентом Golang сообщений Kafka после подключения к Kafka - PullRequest
0 голосов
/ 07 декабря 2018

Я новичок в Golang и Kafa, так что это может показаться глупым вопросом.

После того, как мой потребитель Kafka впервые подключается к серверу Kafka, почему происходит задержка (~20 секунд) между установлением соединения с сервером Kafka и получением первого сообщения?

Он печатает сообщение прямо перед consumer.Messages() и печатает другое сообщение для каждого полученного сообщения.Задержка ~ 20 секунд находится между первым fmt.Println и вторым fmt.Println.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // Create the consumer and listen for new messages
    consumer := createConsumer()

    // Create a signal channel to know when we are done
    done := make(chan bool)

    // Start processing messages
    go func() { 
        fmt.Println("Start consuming Kafka messages")
        for msg := range consumer.Messages() {
            s := string(msg.Value[:])
            fmt.Println("Msg: ", s)
        }
    }()

    <-done

}

func createConsumer() *cluster.Consumer {
    // Define our configuration to the cluster
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = false
    config.Group.Return.Notifications = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // Create the consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"orders"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        log.Fatal("Unable to connect consumer to Kafka")
    }
    go handleErrors(consumer)
    go handleNotifications(consumer)
    return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    hostname: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

broker-1:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    hostname: broker-1
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_BROKER_RACK: rack-a
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_DELETE_TOPIC_ENABLE: "true"
    KAFKA_JMX_PORT: 9999
    KAFKA_JMX_HOSTNAME: 'broker-1'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
    CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    KAFKA_CREATE_TOPICS: "orders:1:1"

1 Ответ

0 голосов
/ 20 июня 2019

После того, как мой потребитель Kafka впервые подключается к серверу Kafka, почему происходит задержка (~ 20 секунд) между установлением соединения с сервером Kafka и получением первого сообщения?

Не удаетсятак много задержек, потому что потребитель использовал канал сообщений, получающий сообщения от кафки.Как только сообщение будет доступно в очереди kafka, оно будет отправлено на канал сообщений, который может получить потребитель.

Приходящая к вам реализация кода: -

for msg := range consumer.Messages() {
    s := string(msg.Value[:])
    fmt.Println("Msg: ", s)
}

consumer.Messages()возвращает канал, и for зацикливается на канале, который возвращает сообщение всякий раз, когда оно доступно внутри канала.

См. этот вопрос Как создать группу потребителей kafka в Голанге? для подключенияиспользуя сараму.вам не нужен sarama-cluster для соединения.

...