Вопросы об ошибке при назначении смещения - PullRequest
0 голосов
/ 07 ноября 2019

Я написал функцию потребителя Kafka, которая сначала извлекает сообщение из последнего прочитанного смещения, затем начинает использовать следующее смещение, а затем фиксирует текущее смещение.

Затем я запускаю тест производительности, который вызывает эту функцию 60 раз в секунду, а затем показывает эту ошибку при вызове assign:

*** rdkafka_cgrp.c:2961:rd_kafka_cgrp_op_serve: assert: rktp->rktp_assigned ***
rd_kafka_t 0x2605000: rdkafka#consumer-5
 producer.msg_cnt 0 (0 bytes)
 rk_rep reply queue: 61 ops
 brokers:
 rd_kafka_broker_t 0x26072e0: :0/internal NodeId -1 in state INIT (for 31.797s)
  refcnt 2
  outbuf_cnt: 0 waitresp_cnt: 0
  0 messages sent, 0 bytes, 0 errors, 0 timeouts
  0 messages received, 0 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 rd_kafka_broker_t 0x2607f60: kafka-1:9091/1 NodeId 1 in state UP (for 30.787s)
  refcnt 6
  outbuf_cnt: 0 waitresp_cnt: 1
  18 messages sent, 1329 bytes, 0 errors, 0 timeouts
  17 messages received, 389654 bytes, 0 errors
  0 messageset transmissions were retried
  1 toppars:
   topic1 [0] leader kafka-1:9091/1
    refcnt 68
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
 rd_kafka_broker_t 0x26065a0: GroupCoordinator NodeId -1 in state UP (for 29.766s)
  refcnt 3
  outbuf_cnt: 0 waitresp_cnt: 0
  10 messages sent, 666 bytes, 0 errors, 0 timeouts
  10 messages received, 843 bytes, 0 errors
  0 messageset transmissions were retried
  0 toppars:
 cgrp:
  G2 in state up, flags 0x0
   coord_id 1, broker kafka-1:9091/1
  toppars:
 topics:
  topic1 with 1 partitions, state exists, refcnt 3
   topic1 [-1] leader none
    refcnt 1
    msgq:      0 messages
    xmit_msgq: 0 messages
    total:     0 messages, 0 bytes
service1    |
Metadata cache with 1 entries:
  topic1 (inserted 552ms ago, expires in 899447ms, 1 partition(s), valid)

в соответствии с этой проблемой: https://github.com/edenhill/librdkafka/issues/1983

кажется, что временное решение - добавить time.Sleep(time.Duration(1) * time.Second) после assign() part

Но я не понимаю, почему это решит эту проблему, и это необъясните причину этой проблемы либо

И я также попытался добавить время. Сон, но это не сработало, проблема все еще сохраняется


    /*
        retrieving lowest/highest offset in the partition
    */
    lowOffset, highOffset, _ := c.QueryWatermarkOffsets(topic, partition, -1)

    log.Printf("The highest offset: %v", highOffset)
    log.Printf("The lowest offset: %v", lowOffset)

    if lowOffset == highOffset {
        log.Printf("Topic %v is empty", topic)
        return
    }

    part, _ := c.Committed(kafka.TopicPartitions{{Topic: &topic, Partition: partition}}, -1)
    if len(part) == 0 {
        startOffset = lowOffset
    } 

    var partitions kafka.TopicPartitions

    visitOffset, err := kafka.NewOffset(fmt.Sprintf("%v", startOffset))

    partitions = append(partitions, kafka.TopicPartition{
        Topic:     &topic,
        Partition: partition,
        Offset:    visitOffset,
        Error:     err,
    })

    err = c.Assign(partitions)
    if err != nil {
        log.Fatalf("Assign failed: %s", err)
        return
    }
    assignment, _ := c.Assignment()
    log.Printf("Assignment %v\n", assignment)
    timeout := 6000
    ev := c.Poll(timeout)
    c.Unassign()

...