как использовать последние данные текстового файла, отправленные в kafka и записать обратно в другой текстовый файл, используя c#? - PullRequest
0 голосов
/ 09 апреля 2020

Я отправил один текстовый файл данных производителю Kafka после прочтения этого файла в строку. Теперь я хочу использовать те же данные в текстовом файле. Как его использовать?

var fileName = @"D:\kafka_examples\new2.txt";
var options = new KafkaOptions(new Uri("http://localhost:9092"),
              new Uri("http://localhost:9092"));
var router = new BrokerRouter(options);
var consumer = new KafkaNet.Consumer(new ConsumerOptions("Hello-Kafka",
               new BrokerRouter(options)));
var text="";
//Consume returns a blocking IEnumerable (ie: never ending stream)
if (File.Exists(fileName))
{
  File.Delete(fileName);
}

foreach (var message in consumer.Consume())
{
  Console.WriteLine("Response: P{0},O{1} : {2}",
                   message.Meta.PartitionId, message.Meta.Offset,
                  text= Encoding.UTF8.GetString(message.Value));
                using (StreamWriter sw = File.CreateText(fileName))
  {
    sw.WriteLine(text);
  }
}            

Я пробовал это, но файл не записан в данном текстовом файле. Все сообщения приходят. Я хочу только последнее сообщение.

1 Ответ

0 голосов
/ 10 апреля 2020

Нет понятия "последнее" сообщение в потоке; они бесконечны.

Но вы можете попытаться найти текущее последнее смещение при запуске кода, затем вычесть одно (или количество строк в файле), затем seek группу потребителей и break for l oop при чтении только этих многочисленных сообщений.

т.е.

var filename = ...
var lines = linesInFile(filename)
var consumer = ... // (with a consumer group id)
var latestOffset = seekToEnd(consumer, -1 * lines) // second param is the delta offset from the end

var i = lines;
foreach (var message in consumer.Consume()) {
   ... 

   if (i <= 0) break;
}

Кроме того, Kafka не является службой http. Удалите http:// и ваш дубликат локального адреса из кода

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...