Трубопровод: как?Несколько производителей к одному потребителю с противодавлением - PullRequest
0 голосов
/ 27 марта 2019

Могу ли я в любом случае выразить "противодавление", чтобы уменьшить производство значений из источника канала?

Предположим, у меня есть что-то похожее на:

source :: ConduitT () Tweet Twitter ()
source = do
  ts <- lift $ getNewTweets [("screen_name", "Bits90824664")]
  yieldMany ts
  _ <- liftIO $ threadDelay 3000000
  source

Где getNewTweets запросыего данные через ограниченный веб-API.Я успешно смог ускорить количество выполняемых запросов, добавив threadDelay к производителю.Однако я планирую добавить дополнительные источники в свой конвейер и начать циклическое потребление данных от каждого производителя.Таким образом, размещение threadDelay в производителе больше не имеет смысла.Я хотел бы разместить задержку где-то вверх по течению.

Я пробовал такие вещи, как добавление задержки для потребителя или добавление iterMC с threadDelay в середину конвейера, но не делаетчто-нибудь.Я предполагаю, что многопоточность происходит, поэтому задержки не работают правильно?

Или новая мысль!Большую часть времени мой производитель вообще не выдает никаких значений и делает yieldMany [] ... поэтому задержка вне потребителя может не сработать, если я не обработаю [] как нормальное значение потока и переписываю производителя вбыть ConduitT () [Value] IO (). Есть какие-нибудь идеи о том, будет ли это работать с кусочными (CE) вариантами потребителей или как-то будут закорочены пустые чанки? Я думаю, я бы использовал преобразователь без кусочков, чтобы добавитьзадержка, чтобы он запускался с каждым запросом и использовал один из вариантов CE-потребителей в конце конвейера.Я буду экспериментировать, когда получу шанс.

Если есть какой-то другой метод для кодирования такого поведения в Haskell, я был бы благодарен за любой совет, поскольку я относительно новичок в Haskell, особенно в библиотеке каналов;Я даже не уверен, как / если я смогу переключаться между несколькими источниками.

1 Ответ

0 голосов
/ 28 марта 2019

Изменение потока для возврата порций данных (включая пустые порции) позволяет добавить задержку в восходящем направлении.У меня есть грубый прототип, работающий с ZipSource, но, скорее всего, мне придется изменить мой подход, чтобы источники извлекали данные независимо друг от друга.

followUser :: Text -> ConduitT () [Tweet] Twitter ()
followUser sn = do
  ts <- lift $ getNewTweets [("screen_name", sn)]
  yield ts
  source

handleTweet :: Tweet -> IO ()
handleTweet = print

main :: IO ()
main = do
  let sources = sequenceSources [ followUser "user1"
                                , followUser "user2"
                                ]

  _ <- runTwitter $ runConduit
    $ sources
    .| iterMC (\a -> threadDelay 3000000)
    .| mapM_CE (liftIO . Prelude.mapM_ handleTweet)

  return ()

...