KCL-клиент Amazon Kinesis для потребителей не работает в .NET - PullRequest
0 голосов
/ 05 февраля 2019

пожалуйста, помогите.У меня проблемы с настройкой потока данных Consumer for Kinesis в консольном приложении .NET.

Я сделал все в соответствии с документацией, но у меня все равно появляется пустой экран консоли, когда я запускаю потребителя.Пока продюсер работает нормально, а учетные данные AWS работают.

  1. У меня в системе хорошо настроен JDK (не новичок в разработке Java)
  2. У меня есть все необходимые политики, прикрепленные к моему пользователю IAM
  3. Я могуобратите внимание, что производитель может программно создавать потоки, потоки дескрипторов и т. д. с теми же учетными данными AWS

Я могу достичь точки останова в программе при создании KclProcess, но не могу достичь ни одной точки останова внутри класса KinesisTest ниже

Что касается потребителя, я создал класс Program.cs, как показано ниже:

class Program  
{

    public static void Main(string[] args)
    {
        //added these lines after trying everything
        Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "***");
        Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "***");
        Environment.SetEnvironmentVariable("AWS_REGION", "us-east-1");

        try
        {
            KclProcess.Create(new KinesisTest()).Run();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine("ERROR: " + e);
        }
    }

}

и еще один класс

public class KinesisTest: IRecordProcessor
{

    private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);
    private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);
    private static readonly int NumRetries = 10;

    /// <value>The shard ID on which this record processor is working.</value>
    private string _kinesisShardId;

    private DateTime _nextCheckpointTime = DateTime.UtcNow;


    public void Initialize(InitializationInput input)
    {
        Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
        this._kinesisShardId = input.ShardId;
    }

    public void ProcessRecords(ProcessRecordsInput input)
    {
        Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);
        ProcessRecordsWithRetries(input.Records);

        // Checkpoint once every checkpoint interval.
        if (DateTime.UtcNow >= _nextCheckpointTime)
        {
            Checkpoint(input.Checkpointer);
            _nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
        }
    }

    public void Shutdown(ShutdownInput input)
    {
        Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
        // Checkpoint after reaching end of shard, so we can start processing data from child shards.
        if (input.Reason == ShutdownReason.TERMINATE)
        {
            Checkpoint(input.Checkpointer);
        }
    }

    private void ProcessRecordsWithRetries(List<Record> records)
    {
        foreach (Record rec in records)
        {
            bool processedSuccessfully = false;
            string data = null;
            for (int i = 0; i < NumRetries; ++i)
            {
                try
                {
                    data = System.Text.Encoding.UTF8.GetString(rec.Data);

                    Console.Error.WriteLine( String.Format("Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}", rec.PartitionKey, rec.SequenceNumber, data));

                    // Your own logic to process a record goes here.

                    processedSuccessfully = true;
                    break;
                }
                catch (Exception e)
                {
                    Console.Error.WriteLine("Exception processing record data: " + data, e);
                }

                //Back off before retrying upon an exception.
                Thread.Sleep(Backoff);
            }

            if (!processedSuccessfully)
            {
                Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
            }
        }
    }

    private void Checkpoint(Checkpointer checkpointer)
    {
        Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);

        checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
    }
}

и, наконец, файл kcl.properties:

executableName = dotnet KinesisTest.dll

streamName = testStream

applicationName = KinesisTest

AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

processingLanguage = C#

initialPositionInStream = TRIM_HORIZON

regionName = us-east-1

maxRecords = 5000

idleTimeBetweenReadsInMillis = 1000

# failoverTimeMillis = 10000
# workerId =
# shardSyncIntervalMillis = 60000
# callProcessRecordsEvenForEmptyRecordList = false
# parentShardPollIntervalMillis = 10000
# cleanupLeasesUponShardCompletion = true
# taskBackoffTimeMillis = 500
# metricsBufferTimeMillis = 10000
# metricsMaxQueueSize = 10000
# validateSequenceNumberBeforeCheckpointing = true
# maxActiveThreads = 0

Пожалуйста, дайте мне знать, если я делаю что-то не так.

Я ожидал, что Потребитель обрабатывает данные из потока, но это просто пустая консоль

...