Повторно разделите канал на большие куски, используя комбинаторы

Я пытаюсь построитьConduit который получает в качестве входных данныхByteStrings (размером около 1 КБ на чанк) и производит как объединенный выводByteStringс 512kb кусков.

Кажется, это должно быть просто сделать, но у меня много проблем, большинство стратегий, которые я пробовал использовать, преуспели только в том, чтобы разделить порции на более мелкие порции, мне не удалось объединить более крупные порции.

Я начал пытатьсяisolate, затемtakeExactlyE и в конце концовconduitVector, но безрезультатно. В конце концов я остановился на этом:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

Постскриптум Я решил не разбивать большие фрагменты для этой функции, но это было просто удобное упрощение.

Тем не менее, это кажется очень многословным, учитывая все функции канала, которые имеют дело с порцией[1,2,3,4], Пожалуйста помоги! Конечно, должен быть лучший способ сделать это с помощью комбинаторов, но мне не хватает какой-то части интуиции!

P.P.S. Можно ли использовать для буфера lazy bytestring, как я сделал? Мне немного непонятно внутреннее представление для bytestring и поможет ли это, тем более что я используюBL.length который, я думаю, может оценить Thunk в любом случае?

Заключение

Просто чтобы уточнить ответ и комментарии Майкла, я закончил с этим каналом:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

Я понимаю, чтоvectorBuilder заплатит за объединение меньших кусков на ранней стадии, получая агрегированные куски как строгие строки.

Из того, что я могу сказать, альтернативная реализация, которая производит ленивые фрагменты байтовых строк (т.е."чанки") может быть желательным, когда агрегированные порции очень велики и / или передаются в естественно потоковый интерфейс, такой как сетевой сокет. Вот моя лучшая попытка версии "lazy bytestring":

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

Ответы на вопрос(1)

Ваш ответ на вопрос