Почему при использовании обработчика Lambda задержка обработки записи Kinesis не отображается в метрике «GetRecords.IteratorAgeMilliseconds» - PullRequest
1 голос
/ 23 декабря 2019

Я экспериментирую с Kinesis и Lambda.

Я не вижу задержки в метрике "GetRecords.IteratorAge" Kinesis, хотя она явно задерживается.

Среда эксперимента

  • Потоки данных Kinesis: 1 поток, состоящий из 1 сегмента без расширенного разветвления.
  • Источник: На локальном ПК запускается следующая producer.rb. Он помещает запись каждую секунду.
  • Потребитель: Следующее lambda_handler.rb выполняется в Lambda. Он просто помещает записи с меткой времени в таблицу DynamoDB и спит 3 секунды для каждой записи.
  • Настройка триггера:
    • Размер пакета: 50
    • Окно пакета: Нет
    • Количество одновременных пакетов на осколок: 1
    • Результат последней обработки: записей не обработано
    • Максимальный возраст записи: 604800
    • Попытки повторных попыток: 10000
    • Разделить пакет при ошибке: Нет

provider.rb

require 'aws-sdk'

kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')

COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'

COUNT.times do |i|
  kinesis.put_record(
    stream_name: STREAM_NAME,
    data: (i+1).to_s,
    partition_key: PKEY
  )
  sleep 1
end

lambda_handler.rb

require 'json'
require 'aws-sdk'
require 'base64'

def lambda_handler(event:, context:)
  dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
  table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
  item = {
    'aws_request_id' => context.aws_request_id,
    'start' => Time.now.to_s
  }
  event['Records'].each do { sleep 3 }
  item['end'] = Time.now.to_s
  table.put_item({item: item})
  { statusCode: 200 }
end

Результат выглядел примерно так в DynamoDB, а показатели в Cloudwatch выглядели примерно так:

Обрабатывал записи между 04:09:03 и 04:24:04. Почему «GetRecords.IteratorAge» не увеличивается, даже если обработка записи не выполняется?

enter image description here

enter image description here

1 Ответ

0 голосов
/ 27 декабря 2019

Этот вопрос был решен самостоятельно.

https://youtu.be/xmacMfbrG28

Это видео предоставило подробное объяснение внутренней структуры процесса потокового источника из Lambda.

Подписчик "Poller"чтобы получить фрагменты и извлечь записи из итератора сегмента с помощью GetRecords, «Poller» вызывает функцию внешнего интерфейса и передает ее записи. Таким образом, в GetRecords не было никакой задержки, даже если лямбда-функция задерживается.

...