пожалуйста, помогите.У меня проблемы с настройкой потока данных Consumer for Kinesis в консольном приложении .NET.
Я сделал все в соответствии с документацией, но у меня все равно появляется пустой экран консоли, когда я запускаю потребителя.Пока продюсер работает нормально, а учетные данные AWS работают.
- У меня в системе хорошо настроен JDK (не новичок в разработке Java)
- У меня есть все необходимые политики, прикрепленные к моему пользователю IAM
- Я могуобратите внимание, что производитель может программно создавать потоки, потоки дескрипторов и т. д. с теми же учетными данными AWS
Я могу достичь точки останова в программе при создании KclProcess, но не могу достичь ни одной точки останова внутри класса KinesisTest ниже
Что касается потребителя, я создал класс Program.cs, как показано ниже:
class Program
{
public static void Main(string[] args)
{
//added these lines after trying everything
Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "***");
Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "***");
Environment.SetEnvironmentVariable("AWS_REGION", "us-east-1");
try
{
KclProcess.Create(new KinesisTest()).Run();
}
catch (Exception e)
{
Console.Error.WriteLine("ERROR: " + e);
}
}
}
и еще один класс
public class KinesisTest: IRecordProcessor
{
private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);
private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);
private static readonly int NumRetries = 10;
/// <value>The shard ID on which this record processor is working.</value>
private string _kinesisShardId;
private DateTime _nextCheckpointTime = DateTime.UtcNow;
public void Initialize(InitializationInput input)
{
Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
this._kinesisShardId = input.ShardId;
}
public void ProcessRecords(ProcessRecordsInput input)
{
Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);
ProcessRecordsWithRetries(input.Records);
// Checkpoint once every checkpoint interval.
if (DateTime.UtcNow >= _nextCheckpointTime)
{
Checkpoint(input.Checkpointer);
_nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
}
}
public void Shutdown(ShutdownInput input)
{
Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
// Checkpoint after reaching end of shard, so we can start processing data from child shards.
if (input.Reason == ShutdownReason.TERMINATE)
{
Checkpoint(input.Checkpointer);
}
}
private void ProcessRecordsWithRetries(List<Record> records)
{
foreach (Record rec in records)
{
bool processedSuccessfully = false;
string data = null;
for (int i = 0; i < NumRetries; ++i)
{
try
{
data = System.Text.Encoding.UTF8.GetString(rec.Data);
Console.Error.WriteLine( String.Format("Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}", rec.PartitionKey, rec.SequenceNumber, data));
// Your own logic to process a record goes here.
processedSuccessfully = true;
break;
}
catch (Exception e)
{
Console.Error.WriteLine("Exception processing record data: " + data, e);
}
//Back off before retrying upon an exception.
Thread.Sleep(Backoff);
}
if (!processedSuccessfully)
{
Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
}
}
}
private void Checkpoint(Checkpointer checkpointer)
{
Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);
checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
}
}
и, наконец, файл kcl.properties:
executableName = dotnet KinesisTest.dll
streamName = testStream
applicationName = KinesisTest
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = C#
initialPositionInStream = TRIM_HORIZON
regionName = us-east-1
maxRecords = 5000
idleTimeBetweenReadsInMillis = 1000
# failoverTimeMillis = 10000
# workerId =
# shardSyncIntervalMillis = 60000
# callProcessRecordsEvenForEmptyRecordList = false
# parentShardPollIntervalMillis = 10000
# cleanupLeasesUponShardCompletion = true
# taskBackoffTimeMillis = 500
# metricsBufferTimeMillis = 10000
# metricsMaxQueueSize = 10000
# validateSequenceNumberBeforeCheckpointing = true
# maxActiveThreads = 0
Пожалуйста, дайте мне знать, если я делаю что-то не так.
Я ожидал, что Потребитель обрабатывает данные из потока, но это просто пустая консоль