У меня есть очередь блокировки, которая продолжает получать сообщения через какое-то приложение, теперь в приложении asp.net я пытался использовать очередь и записать вывод в файл CSV / JSON.
Здесь я хочу хранить сообщения размером до 1 МБ, которые получают из очереди блокировки, а затем записывают их, теперь снова содержат данные на 1 МБ и снова записывают ... так далее.
В приведенном ниже коде я использую буфер system.reactive
и могу хранить количество наблюдаемых и записывать в JSON, но есть ли какой-либо способ изменения размера наблюдаемых?
class Program
{
private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
private static void Producer()
{
int ctr = 1;
while (ctr <= 11)
{
MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
Thread.Sleep(1000);
ctr++;
}
}
private static void Consumer()
{
var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
.Subscribe(ts =>
{
WriteToFile(ts.ToList());
});
}
private static void WriteToFile(List<Message> listToWrite)
{
using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
{
outFile.Write(JsonConvert.SerializeObject(listToWrite));
}
}
static void Main(string[] args)
{
var producer = Task.Factory.StartNew(() => Producer());
var consumer = Task.Factory.StartNew(() => Consumer());
Console.Read();
}
}
Реактивный метод продления,
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
TimeSpan threshold, int noOfStream)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Класс сообщения,
public class Message
{
public int Id { get; set; }
public string Name { get; set; }
}