Я никогда раньше не использовал кафку.У меня есть две тестовые программы Go, обращающиеся к локальному экземпляру kafka: читатель и писатель.Я пытаюсь настроить параметры своего производителя, потребителя и сервера kafka, чтобы получить определенное поведение.
Мой писатель:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
rand.Seed(time.Now().UnixNano())
topics := []string{
"policymanager-100",
"policymanager-200",
"policymanager-300",
}
progress := make(map[string]int)
for _, t := range topics {
progress[t] = 0
}
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0",
})
if err != nil {
panic(err)
}
defer producer.Close()
fmt.Println("producing messages...")
for i := 0; i < 30; i++ {
index := rand.Intn(len(topics))
topic := topics[index]
num := progress[topic]
num++
fmt.Printf("%s => %d\n", topic, num)
msg := &kafka.Message{
Value: []byte(strconv.Itoa(num)),
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
}
err = producer.Produce(msg, nil)
if err != nil {
panic(err)
}
progress[topic] = num
time.Sleep(time.Millisecond * 100)
}
fmt.Println("DONE")
}
В моей локальной kafka существуют три темы:policymanager-100, policymanager-200, policymanager-300.Каждый из них имеет только 1 раздел, чтобы гарантировать, что все сообщения будут отсортированы к моменту их получения.Мой автор случайным образом выберет одну из этих тем и выдаст сообщение, состоящее из числа, которое увеличивается только для этой темы.Когда он закончится, я ожидаю, что очереди будут выглядеть примерно так (названия тем сокращены для удобства чтения):
100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12
Пока все хорошо.Я пытаюсь настроить вещи так, чтобы любое количество потребителей можно было ускорить и использовать эти сообщения по порядку.Под «по порядку» я подразумеваю, что ни один потребитель не должен получать сообщение 2 по теме 100 до тех пор, пока сообщение 1 не ЗАПОЛНЕНО (не только начато).Если работает сообщение 1 для темы 100, потребители могут использовать другие темы, которые в настоящее время не обрабатывают сообщение.Если сообщение темы было отправлено потребителю, вся эта тема должна стать «заблокированной» до тех пор, пока либо не истечет время ожидания того, что потребитель отказал, либо потребитель подтвердит сообщение, тогда тема «разблокирована», чтобы сделать следующее сообщение выполненным.доступны для потребления.
Мой читатель:
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
count := 2
for i := 0; i < count; i++ {
go consumer(i + 1)
}
fmt.Println("cosuming...")
// hold this thread open indefinitely
select {}
}
func consumer(id int) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "0", // strconv.Itoa(id),
"enable.auto.commit": "false",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
for {
msg, err := c.ReadMessage(-1)
if err != nil {
panic(err)
}
fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
time.Sleep(time.Second)
_, err = c.CommitMessage(msg)
if err != nil {
fmt.Printf("ERROR commiting: %+v\n", err)
}
}
}
Исходя из моего нынешнего понимания, я могу достичь этого путем правильной настройки своего потребителя.Я пробовал много разных вариантов этой программы.Я пытался, чтобы у всех моих программ были одни и те же потребители.Я пытался использовать разные group.id
для каждой программы.Ни одна из них не была правильной конфигурацией, чтобы получить поведение, за которым я следую.
То, что делает размещенный код, очищает одну тему за раз.Несмотря на наличие нескольких процедур, процесс будет читать все 100, затем переходить к 200, а затем к 300, и только одна программа будет фактически выполнять все чтение.Когда я даю каждой goroutine различную group.id
, тогда сообщения читаются несколькими goroutines, которые я бы хотел предотвратить.
Мой примерный потребитель просто разбивает вещи с помощью goroutines, но когда я начинаю работать над этим проектом в свойИспользование варианта на работе, мне нужно это для работы с несколькими экземплярами kubernetes, которые не будут общаться друг с другом, поэтому использование всего, что взаимодействует между goroutines, не будет работать, как только 2 экземпляра на 2 кубах.Вот почему я надеюсь заставить Кафку делать привратники, которые я хочу.