Akka.Net Streams - пакетная передача потоков с помощью GroupedWithin - PullRequest
0 голосов
/ 24 января 2019

Я тестирую потоки Akka.Net, чтобы посмотреть, смогу ли я интегрировать его в свой проект, и я немного запутался с GroupedWithin. Если я использую его таким образом, он не разделяет поток на части, но выполняется только один раз:

Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.SelectAsync(2, BulkInsertAsync)
.RunWith(Sink.Ignore<IEnumerable<Measurement>>(), mat);

Если я использую его таким образом, он ведет себя как ожидалось:

Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.RunForeach(async x => await BulkInsertAsync(x), mat);`

Я что-то упустил или это так должно работать?

Заранее спасибо.

Обновление: Это работает для меня:

Akka.Streams.Dsl.Source.Tick(TimeSpan.Zero, TimeSpan.FromMilliseconds(1), new { })
.Select(x => new Measurement { Guid = Guid.NewGuid(), Timestamp = DateTime.Now.Ticks })
.GroupedWithin(1000, TimeSpan.FromMilliseconds(80))
.SelectAsync(2, x => {
    await BulkInsertAsync(x);
    return x;
})
.RunWith(Sink.Ignore<IEnumerable<Measurement>>(), mat);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...