Я тестирую потоки Akka.Net, чтобы посмотреть, смогу ли я интегрировать его в свой проект, и я немного запутался с GroupedWithin
.
Если я использую его таким образом, он не разделяет поток на части, но выполняется только один раз:
Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.SelectAsync(2, BulkInsertAsync)
.RunWith(Sink.Ignore<IEnumerable<Measurement>>(), mat);
Если я использую его таким образом, он ведет себя как ожидалось:
Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.RunForeach(async x => await BulkInsertAsync(x), mat);`
Я что-то упустил или это так должно работать?
Заранее спасибо.
Обновление:
Это работает для меня:
Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.SelectAsync(2, x => {
await BulkInsertAsync(x);
return x;
})
.RunWith(Sink.Ignore<IEnumerable<Measurement>>(), mat);