пакет не может быть получен из раздела - PullRequest
0 голосов
/ 31 октября 2018

Я не могу использовать тему и не уверен, что с моим кодом или конфигурацией kafka что-то не так. Проблема, которую я получаю, заключается в том, что она застряла в операторе print «Starts», поэтому она не получает сообщение из канала <-partitionConsumer.Messages (). </p>

Это шаги, которые я предпринял для настройки kafka (https://kafka.apache.org/quickstart) с некоторыми сообщениями в них, и я точно знаю, что они существуют, потому что, когда я запускаю следующую команду, я вижу значения.

bin / kafka-console-consumer.sh - раздел 0 - топический тест --bootstrap-server localhost: 9092 --offset самыми ранними

  1. bin / zookeeper-server-start.sh config / zookeeper.properties
  2. bin / kafka-server-start.sh config / server.properties
  3. bin / kafka-topics.sh --create --zookeeper localhost: 2181 - коэффициент репликации 1 - разделы 1 - топический тест
  4. bin / kafka-console-producer.sh --broker-list localhost: 9092 - топический тест
func RetrieveConsumed() (int){
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        fmt.Println("ERROR")
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            fmt.Println("ERROR")
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
    if err != nil {
        fmt.Println("ERROR")
        panic(err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            fmt.Println("ERROR")
        }
    }()

    // Trap SIGINT to trigger a shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    consumed := 0
    ConsumerLoop:
        for {
            fmt.Println("Starts")
            select {
            case msg := <-partitionConsumer.Messages():
                fmt.Printf("Consumed message offset %d\n", msg.Offset)
                consumed++
            case <- partitionConsumer.Errors():
                break ConsumerLoop
            case <-signals:
                break ConsumerLoop
            }
        }

        fmt.Printf("Consumed: %d\n", consumed)
        return consumed
}
...