Haskell – idiom prefetching in streaming Library
I am using the streaming library, but I will accept the answer of using pipeline or pipeline
Say I have
import Streaming (Stream,Of) import qualified Streaming.Prelude as S streamChunks :: Int -> Stream (Of Thing) IO () streamChunks lastID = do flip fix 0 $\go thingID -> unless (thingID > lastID) $do thing <- highLatencyGet thingID S.yield thing go (thingID+1)
To reduce latency, I want to fork highlatency get to retrieve the next thing while processing the previous thing in the consumer
Obviously, I can convert my function to create a new Mvar and allocate the next batch before calling yield, etc
But I wonder if there is a conventional (composable) way to do this so that it can be packaged in a library and used on any IO streams Ideally, we can also configure pre values, such as:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
Solution
This solution uses pipes, but it can easily adapt to the use of streaming media Specifically, it requires pipes, pipes - concurrent and asynchronous packages
It cannot work in a "direct" way It not only simply converts producers, but also needs a "folding function" that consumes producers This continuation delivery method is necessary to set up and dismantle the concurrency mechanism
import Pipes import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically) import Control.Concurrent.Async (Concurrently(..)) import Control.Exception (finally) prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r prefetching bufsize source foldfunc = do (out@R_215_2419@,in@R_215_2419@,seal) <- spawn' (bounded bufsize) let cutcord effect = effect `finally` atomically seal runConcurrently $ Concurrently (cutcord (runEffect (source >-> toOutput out@R_215_2419@))) *> Concurrently (cutcord (foldfunc (fromInput in@R_215_2419@)))
The output of the original producer is redirected to a bounded queue At the same time, we apply the producer - folding function to the producers read from the queue
Whenever the concurrent operation is completed, we will carefully close the channel immediately to avoid hanging on the other side