Я только что заметил, что когда я создаю одно сообщение в раздел, мой потребитель не получает его. Только после того, как я создаю еще несколько сообщений в том же разделе, потребитель получает их. Мой fetch.min.bytes
установлен на 1.
Есть ли другие конфигурации, которые могут повлиять на это?
У меня есть выделенный потребитель для каждого раздела.
Код потребителя для соответствующей части. Мой потребитель запускает несколько тем для разных тем, которые определены configs['stream']
. Использует https://github.com/mmustala/rdkafka-ruby, который является вилкой из оригинального потребительского драгоценного камня. Я добавил пакетный метод потребления. И способ управляемого отключения потребителя.
key = configs['app_key']
consumer = Rdkafka::Config.new(config(configs)).consumer
topic = "#{topic_prefix}#{app_env}_#{configs['stream']}"
consumer.subscribe(topic)
logger.info "#{rand}| Starting consumer for #{key} with topic #{topic}"
begin
retry_counter = 0
retries_started_at = nil
current_assignment = nil
partitions = []
consumer.each_batch(configs['max_messages_per_partition'] || 5, 100, rand) do |messages|
partitions = messages.collect {|m| m.partition}.uniq.sort
logger.info "#{rand}| Batch started. Received #{messages.length} messages from partitions #{partitions} for app #{key}"
current_assignment = consumer.assignment.to_h
values = messages.collect {|m| JSON.parse(m.payload)}
skip_commit = false
begin
values.each_slice((values.length / ((retry_counter * 2) + 1).to_f).ceil) do |slice|
logger.info "#{rand}| Sending #{slice.length} messages to lambda"
result = invoke_lambda(key, slice)
if result.status_code != 200 || result.function_error
logger.info "#{rand}| Batch finished with error #{result.function_error}"
raise LambdaError, result.function_error.to_s
end
end
rescue LambdaError => e
logger.warn "#{rand}| #{e}"
if consumer.running? && current_assignment == consumer.assignment.to_h
retry_counter += 1
retries_started_at ||= Time.now
if retry_counter <= 5 && Time.now - retries_started_at < 600
logger.warn "#{rand}| Retrying from: #{e.cause}, app_key: #{key}"
Rollbar.warning("Retrying from: #{e.cause}", app_key: key, thread: rand, partitions: partitions.join(', '))
sleep 5
retry if consumer.running? && current_assignment == consumer.assignment.to_h
else
raise e # Raise to exit the retry loop so that consumers are rebalanced.
end
end
skip_commit = true
end
retry_counter = 0
retries_started_at = nil
if skip_commit
logger.info "#{rand}| Commit skipped"
else
consumer.commit
logger.info "#{rand}| Batch finished"
end
end
consumer.close
logger.info "#{rand}| Stopped #{key}"
rescue Rdkafka::RdkafkaError => e
logger.warn "#{rand}| #{e}"
logger.info "#{rand}| assignment: #{consumer.assignment.to_h}"
if e.to_s.index('No offset stored')
retry
else
raise e
end
end
конфигурация
def config(app_config)
{
"bootstrap.servers": brokers,
"group.id": app_configs['app_key'],
"enable.auto.commit": false,
"enable.partition.eof": false,
"log.connection.close": false,
"session.timeout.ms": 30*1000,
"fetch.message.max.bytes": ['sources'].include?(app_configs['stream']) ? 102400 : 10240,
"queued.max.messages.kbytes": ['sources'].include?(app_configs['stream']) ? 250 : 25,
"queued.min.messages": (app_configs['max_messages_per_partition'] || 5) * 10,
"fetch.min.bytes": 1,
"partition.assignment.strategy": 'roundrobin'
}
end
Код производителя использует https://github.com/zendesk/ruby-kafka
def to_kafka(stream_name, data, batch_size)
stream_name_with_env = "#{Rails.env}_#{stream_name}"
topic = [Rails.application.secrets.kafka_topic_prefix, stream_name_with_env].compact.join
partitions_count = KAFKA.partitions_for(topic)
Rails.logger.info "Partition count for #{topic}: #{partitions_count}"
if @job.active? && @job.partition.blank?
@job.connect_to_partition
end
partition = @job.partition&.number.to_i % partitions_count
producer = KAFKA.producer
if data.is_a?(Array)
data.each_slice(batch_size) do |slice|
producer.produce(JSON.generate(slice), topic: topic, partition: partition)
end
else
producer.produce(JSON.generate(data), topic: topic, partition: partition)
end
producer.deliver_messages
Rails.logger.info "records sent to topic #{topic} partition #{partition}"
producer.shutdown
end
ОБНОВЛЕНИЕ: похоже, что количество сообщений не имеет значения. Я только что произвел более 100 сообщений в один раздел, и потребитель еще не начал их использовать.
ОБНОВЛЕНИЕ2: Ночью сообщения не начинались. Но сегодня утром, когда я создал новый набор сообщений в том же разделе, он проснулся и начал потреблять только что созданные сообщения. Он пропустил сообщения, созданные вчера вечером.