Я думаю, что вложенные итерации - правильный подход, но в этом случае есть некоторые уникальные проблемы, которые немного отличают его от большинства распространенных примеров.
Куски и группы
Первая проблема - правильно настроить источник данных.По сути, описанные вами логические деления дают вам поток, эквивалентный [[ByteString]]
.Если вы создадите перечислитель для непосредственного создания этого элемента, каждый элемент в потоке будет полной группой фрагментов, которых, вероятно, вы хотите избежать (по соображениям памяти).Вы можете объединить все в один [ByteString]
, но тогда вам нужно будет заново ввести границы, что было бы довольно расточительно, поскольку БД делает это за вас.
Игнорирование потока групп на данный моментПохоже, что вам нужно разделить данные на куски самостоятельно.Я бы смоделировал это как:
enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
where
cb () = do
(code, data) <- getResultData
case code of
OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
NO_MORE_DATA -> return $ Right ((False, ()), data)
GET_DATA_FAILED -> return $ Left MyException
Поскольку куски имеют фиксированный размер, вы можете легко разделить это на части с помощью Data.Iteratee.group
.
enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize
Сравните тип этого с Enumerator
type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)
Так что enumGroupChunked
- это в основном причудливый перечислитель, который меняет тип потока.Это означает, что он принимает потребителя [ByteString] iteratee и возвращает его, который использует простые строки байтов.Часто тип возвращаемого значения перечислителя не имеет значения;это просто итератор, который вы оцениваете с помощью run
(или tryRun
), чтобы получить на выходе, так что вы можете сделать то же самое здесь:
evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run
Если у вас есть более сложная обработка для каждогоgroup, проще всего сделать это с помощью функции enumGroupChunked
.
Поток групп
Теперь это не так, что с этим делатьпоток групп?Ответ зависит от того, как вы хотите их потреблять.Если вы хотите по существу обрабатывать каждую группу в потоке независимо, я бы сделал что-то похожее на это:
foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
val <- evalGroupChunked iter
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
NO_MORE_DATA -> return $ f acc0 val
GET_DATA_FAILED -> error "had a problem"
Однако, допустим, вы хотите выполнить некоторую потоковую обработку всего набора данных, а не толькоотдельные группы.То есть у вас есть
bigProc :: Iteratee [ByteString] IO a
, который вы хотите запустить по всему набору данных.Здесь полезен итератор возврата перечислителя.Некоторый предыдущий код теперь будет немного отличаться:
enumGroupChunked' :: Iteratee [ByteString] IO a
-> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize
procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
i' <- enumGroupChunked' iter >>= run
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> procStream i'
NO_MORE_DATA -> run i'
GET_DATA_FAILED -> error "had a problem"
Такое использование вложенных итераторов (т.е. Iteratee s1 m (Iteratee s2 m a)
) немного необычно, но особенно полезно, когда вы хотите последовательно обрабатывать данные из нескольких перечислителей.Ключ в том, чтобы признать, что run
внешний итератор даст вам итератора, который готов получать больше данных.Это модель, которая хорошо работает в этом случае, потому что вы можете перечислять каждую группу независимо, но обрабатывать их как один поток.
Одно предупреждение: внутренний итератор будет в том состоянии, в котором он был оставлен. Предположим, чтопоследний кусок группы может быть меньше, чем полный кусок, например,
Group A Group B Group C
1024, 1024, 512 1024, 1024, 1024 1024, 1024, 1024
В этом случае произойдет следующее: поскольку group
объединяет данные в порции размером 1024, он объединяет последниекусок группы A с первыми 512 байтами группы B. Это не проблема с примером foldStream
, потому что этот код завершает внутреннюю итерацию (с joinI
).Это означает, что группы действительно независимы, поэтому вы должны относиться к ним как к таковым.Если вы хотите объединить группы, как в procStream
, вам нужно продумать весь поток.Если это ваш случай, вам нужно будет использовать что-то более сложное, чем просто group
.
Data.Iteratee vs Data.Enumerator
Не входя вобсуждение достоинств любого из пакетов, не говоря уже о IterIO (я по общему признанию предвзят), я хотел бы указать на то, что я считаю наиболее существенным различием между этими двумя понятиями: абстракция потока.
В Data.Iteratee потребитель Iteratee ByteString m a
работает с условной байтовой строкой некоторой длины с доступом к одному фрагменту ByteString
за один раз.
В Data.Enumerator,потребитель Iteratee ByteString m a
работает с условной [ByteString] с доступом к одному или нескольким элементам (строкам байтов) одновременно.
Это означает, что большинство операций Data.Iteratee ориентированы на элементы, то есть с Iteratee ByteString
они будут работать на одном Word8
, тогда как операции Data.Enumerator ориентированы на чанки и работают на ByteString
.
Можно вспомнить Data.Iteratee.Iteratee [s] m a
=== Data.Enumerator.Iteratee s m a
.