Как я могу гарантировать, что мои потребители обрабатывают сообщения в темах kafka по порядку, только один раз? - PullRequest
0 голосов
/ 24 января 2019

Я никогда раньше не использовал кафку.У меня есть две тестовые программы Go, обращающиеся к локальному экземпляру kafka: читатель и писатель.Я пытаюсь настроить параметры своего производителя, потребителя и сервера kafka, чтобы получить определенное поведение.

Мой писатель:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    topics := []string{
        "policymanager-100",
        "policymanager-200",
        "policymanager-300",
    }
    progress := make(map[string]int)
    for _, t := range topics {
        progress[t] = 0
    }

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "0",
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    fmt.Println("producing messages...")
    for i := 0; i < 30; i++ {
        index := rand.Intn(len(topics))
        topic := topics[index]
        num := progress[topic]
        num++
        fmt.Printf("%s => %d\n", topic, num)
        msg := &kafka.Message{
            Value: []byte(strconv.Itoa(num)),
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
            },
        }
        err = producer.Produce(msg, nil)
        if err != nil {
            panic(err)
        }
        progress[topic] = num
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("DONE")
}

В моей локальной kafka существуют три темы:policymanager-100, policymanager-200, policymanager-300.Каждый из них имеет только 1 раздел, чтобы гарантировать, что все сообщения будут отсортированы к моменту их получения.Мой автор случайным образом выберет одну из этих тем и выдаст сообщение, состоящее из числа, которое увеличивается только для этой темы.Когда он закончится, я ожидаю, что очереди будут выглядеть примерно так (названия тем сокращены для удобства чтения):

100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12

Пока все хорошо.Я пытаюсь настроить вещи так, чтобы любое количество потребителей можно было ускорить и использовать эти сообщения по порядку.Под «по порядку» я подразумеваю, что ни один потребитель не должен получать сообщение 2 по теме 100 до тех пор, пока сообщение 1 не ЗАПОЛНЕНО (не только начато).Если работает сообщение 1 для темы 100, потребители могут использовать другие темы, которые в настоящее время не обрабатывают сообщение.Если сообщение темы было отправлено потребителю, вся эта тема должна стать «заблокированной» до тех пор, пока либо не истечет время ожидания того, что потребитель отказал, либо потребитель подтвердит сообщение, тогда тема «разблокирована», чтобы сделать следующее сообщение выполненным.доступны для потребления.

Мой читатель:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    count := 2
    for i := 0; i < count; i++ {
        go consumer(i + 1)
    }
    fmt.Println("cosuming...")
    // hold this thread open indefinitely
    select {}
}

func consumer(id int) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "0", // strconv.Itoa(id),
        "enable.auto.commit": "false",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            panic(err)
        }

        fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
        time.Sleep(time.Second)
        _, err = c.CommitMessage(msg)
        if err != nil {
            fmt.Printf("ERROR commiting: %+v\n", err)
        }
    }
}

Исходя из моего нынешнего понимания, я могу достичь этого путем правильной настройки своего потребителя.Я пробовал много разных вариантов этой программы.Я пытался, чтобы у всех моих программ были одни и те же потребители.Я пытался использовать разные group.id для каждой программы.Ни одна из них не была правильной конфигурацией, чтобы получить поведение, за которым я следую.

То, что делает размещенный код, очищает одну тему за раз.Несмотря на наличие нескольких процедур, процесс будет читать все 100, затем переходить к 200, а затем к 300, и только одна программа будет фактически выполнять все чтение.Когда я даю каждой goroutine различную group.id, тогда сообщения читаются несколькими goroutines, которые я бы хотел предотвратить.

Мой примерный потребитель просто разбивает вещи с помощью goroutines, но когда я начинаю работать над этим проектом в свойИспользование варианта на работе, мне нужно это для работы с несколькими экземплярами kubernetes, которые не будут общаться друг с другом, поэтому использование всего, что взаимодействует между goroutines, не будет работать, как только 2 экземпляра на 2 кубах.Вот почему я надеюсь заставить Кафку делать привратники, которые я хочу.

1 Ответ

0 голосов
/ 24 января 2019

Вообще говоря, вы не можете. Даже если у вас был один потребитель, который использовал все разделы для данной темы, разделы будут использоваться в недетерминированном порядке, и ваш общий порядок по всем разделам не будет гарантирован.

Попробуйте ключевые сообщения, подумайте, что вы можете найти это полезным для вашего случая использования.

...