Повторно разделите канал на большие куски, используя комбинаторы
Я пытаюсь построитьConduit
который получает в качестве входных данныхByteString
s (размером около 1 КБ на чанк) и производит как объединенный выводByteString
с 512kb кусков.
Кажется, это должно быть просто сделать, но у меня много проблем, большинство стратегий, которые я пробовал использовать, преуспели только в том, чтобы разделить порции на более мелкие порции, мне не удалось объединить более крупные порции.
Я начал пытатьсяisolate
, затемtakeExactlyE
и в конце концовconduitVector
, но безрезультатно. В конце концов я остановился на этом:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
Постскриптум Я решил не разбивать большие фрагменты для этой функции, но это было просто удобное упрощение.
Тем не менее, это кажется очень многословным, учитывая все функции канала, которые имеют дело с порцией[1,2,3,4], Пожалуйста помоги! Конечно, должен быть лучший способ сделать это с помощью комбинаторов, но мне не хватает какой-то части интуиции!
P.P.S. Можно ли использовать для буфера lazy bytestring, как я сделал? Мне немного непонятно внутреннее представление для bytestring и поможет ли это, тем более что я используюBL.length
который, я думаю, может оценить Thunk в любом случае?
Просто чтобы уточнить ответ и комментарии Майкла, я закончил с этим каналом:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
Я понимаю, чтоvectorBuilder
заплатит за объединение меньших кусков на ранней стадии, получая агрегированные куски как строгие строки.
Из того, что я могу сказать, альтернативная реализация, которая производит ленивые фрагменты байтовых строк (т.е."чанки") может быть желательным, когда агрегированные порции очень велики и / или передаются в естественно потоковый интерфейс, такой как сетевой сокет. Вот моя лучшая попытка версии "lazy bytestring":
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize