Я написал функцию потребителя Kafka, которая сначала извлекает сообщение из последнего прочитанного смещения, затем начинает использовать следующее смещение, а затем фиксирует текущее смещение.
Затем я запускаю тест производительности, который вызывает эту функцию 60 раз в секунду, а затем показывает эту ошибку при вызове assign
:
*** rdkafka_cgrp.c:2961:rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned ***
rd_kafka_t 0x2605000: rdkafka#consumer-5
producer.msg_cnt 0 (0 bytes)
rk_rep reply queue: 61 ops
brokers:
rd_kafka_broker_t 0x26072e0: :0/internal NodeId -1 in state INIT (for 31.797s)
refcnt 2
outbuf_cnt: 0 waitresp_cnt: 0
0 messages sent, 0 bytes, 0 errors, 0 timeouts
0 messages received, 0 bytes, 0 errors
0 messageset transmissions were retried
0 toppars:
rd_kafka_broker_t 0x2607f60: kafka-1:9091/1 NodeId 1 in state UP (for 30.787s)
refcnt 6
outbuf_cnt: 0 waitresp_cnt: 1
18 messages sent, 1329 bytes, 0 errors, 0 timeouts
17 messages received, 389654 bytes, 0 errors
0 messageset transmissions were retried
1 toppars:
topic1 [0] leader kafka-1:9091/1
refcnt 68
msgq: 0 messages
xmit_msgq: 0 messages
total: 0 messages, 0 bytes
rd_kafka_broker_t 0x26065a0: GroupCoordinator NodeId -1 in state UP (for 29.766s)
refcnt 3
outbuf_cnt: 0 waitresp_cnt: 0
10 messages sent, 666 bytes, 0 errors, 0 timeouts
10 messages received, 843 bytes, 0 errors
0 messageset transmissions were retried
0 toppars:
cgrp:
G2 in state up, flags 0x0
coord_id 1, broker kafka-1:9091/1
toppars:
topics:
topic1 with 1 partitions, state exists, refcnt 3
topic1 [-1] leader none
refcnt 1
msgq: 0 messages
xmit_msgq: 0 messages
total: 0 messages, 0 bytes
service1 |
Metadata cache with 1 entries:
topic1 (inserted 552ms ago, expires in 899447ms, 1 partition(s), valid)
в соответствии с этой проблемой: https://github.com/edenhill/librdkafka/issues/1983
кажется, что временное решение - добавить time.Sleep(time.Duration(1) * time.Second)
после assign()
part
Но я не понимаю, почему это решит эту проблему, и это необъясните причину этой проблемы либо
И я также попытался добавить время. Сон, но это не сработало, проблема все еще сохраняется
/*
retrieving lowest/highest offset in the partition
*/
lowOffset, highOffset, _ := c.QueryWatermarkOffsets(topic, partition, -1)
log.Printf("The highest offset: %v", highOffset)
log.Printf("The lowest offset: %v", lowOffset)
if lowOffset == highOffset {
log.Printf("Topic %v is empty", topic)
return
}
part, _ := c.Committed(kafka.TopicPartitions{{Topic: &topic, Partition: partition}}, -1)
if len(part) == 0 {
startOffset = lowOffset
}
var partitions kafka.TopicPartitions
visitOffset, err := kafka.NewOffset(fmt.Sprintf("%v", startOffset))
partitions = append(partitions, kafka.TopicPartition{
Topic: &topic,
Partition: partition,
Offset: visitOffset,
Error: err,
})
err = c.Assign(partitions)
if err != nil {
log.Fatalf("Assign failed: %s", err)
return
}
assignment, _ := c.Assignment()
log.Printf("Assignment %v\n", assignment)
timeout := 6000
ev := c.Poll(timeout)
c.Unassign()