AWS Kinesis .NET Consumer - PullRequest
       12

AWS Kinesis .NET Consumer

8 голосов
/ 03 мая 2019

Я экспериментирую с производителем и потребителем, используя AWS Kinesis, и проблема в том, что потребитель продолжает получать первое сообщение (или запись), которое мы создали, хотя мы изменили объект данных, отправленный несколько раз. Кроме того, мы пробовали несколько ShardIteratorType, и ни один из них не работал. Последние не дают результатов, все остальные производят те же оригинальные записи.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;

namespace ConsoleApp7
{
    internal class Program
    {
        private static AmazonKinesisClient _client;
        private static string _streamName;

        static async Task ReadFromStream()
        {
            var kinesisStreamName = _streamName;

            var describeRequest = new DescribeStreamRequest
            {
                StreamName = kinesisStreamName,
            };

            var describeResponse = await _client.DescribeStreamAsync(describeRequest);
            var shards = describeResponse.StreamDescription.Shards;

            foreach (var shard in shards)
            {
                var iteratorRequest = new GetShardIteratorRequest
                {
                    StreamName = kinesisStreamName,
                    ShardId = shard.ShardId,
                    ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
                    Timestamp = DateTime.MinValue
                };

                var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
                var iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    var getRequest = new GetRecordsRequest
                    {
                        ShardIterator = iteratorId, Limit = 10000
                    };

                    var getResponse = await _client.GetRecordsAsync(getRequest);
                    var nextIterator = getResponse.NextShardIterator;
                    var records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (var record in records)
                        {
                            var json = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("Json string: " + json);
                        }
                    }

                    iteratorId = nextIterator;
                }
            }
        }

        private static async Task<string> Produce()
        {
            var data = new
            {
                Message = "Hello world!",
                Author = "Amir"
            };

            //convert to byte array in prep for adding to stream
            var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

            using (var ms = new MemoryStream(oByte))
            {
                //create put request
                var requestRecord = new PutRecordRequest
                {
                    StreamName = _streamName,
                    PartitionKey = Guid.NewGuid().ToString(),
                    Data = ms
                };
                //list name of Kinesis stream
                //give partition key that is used to place record in particular shard
                //add record as memorystream

                //PUT the record to Kinesis
                var response = await _client.PutRecordAsync(requestRecord);

                return response.SequenceNumber;
            }
        }

        static void Main(string[] args)
        {
            _client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);

            _streamName = "SomeStream";

            Produce().Wait();

            ReadFromStream().Wait();
        }
    }
}

1 Ответ

2 голосов
/ 10 мая 2019

Прежде всего, когда я отлаживал ваш код, я заметил, что он бесконечно зацикливается во внутреннем цикле (while (!string.IsNullOrEmpty(iteratorId))) и никогда не зацикливается на всех осколках вашего потока (при условии, что у вас> 1). Причина объясняется в https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty - поскольку производитель никогда не звонил MergeShards или SplitShards, они остаются открытыми, поэтому NextShardIterator никогда не будет NULL.

Вот почему вы когда-либо видите записи, помещенные в первый шард (или, по крайней мере, я это делал при запуске вашего кода) - вы должны читать из шардов параллельно.

Что касается вашего шаблона использования, вы используете:

ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue

Таким образом, вы, по сути, говорите Кинезису: «Дайте мне все записи в потоке с начала времен» (или, по крайней мере, до тех пор, пока не истечет срок хранения). Вот почему вы продолжаете видеть одни и те же старые записи в дополнение к новым (опять же, это то, что я видел, когда запускал ваш код).

Вызов GetRecords[Async] фактически не удаляет записи из потока (см. https://stackoverflow.com/a/25741304/4940707).. Правильный способ использования Kinesis - это перемещение контрольной точки в контрольную точку. Если потребитель должен был сохранить SequenceNumber из прочитайте последнюю запись и перезапустите как таковой:

ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber

Тогда вы увидите только новые записи.

...