Haskell – idiom prefetching in streaming Library

I am using the streaming library, but I will accept the answer of using pipeline or pipeline

Say I have

import Streaming (Stream,Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $\go thingID ->
    unless (thingID > lastID) $do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

To reduce latency, I want to fork highlatency get to retrieve the next thing while processing the previous thing in the consumer

Obviously, I can convert my function to create a new Mvar and allocate the next batch before calling yield, etc

But I wonder if there is a conventional (composable) way to do this so that it can be packaged in a library and used on any IO streams Ideally, we can also configure pre values, such as:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

Solution

This solution uses pipes, but it can easily adapt to the use of streaming media Specifically, it requires pipes, pipes - concurrent and asynchronous packages

It cannot work in a "direct" way It not only simply converts producers, but also needs a "folding function" that consumes producers This continuation delivery method is necessary to set up and dismantle the concurrency mechanism

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (out@R_215_2419@,in@R_215_2419@,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput out@R_215_2419@)))
        *>
        Concurrently (cutcord (foldfunc (fromInput in@R_215_2419@)))

The output of the original producer is redirected to a bounded queue At the same time, we apply the producer - folding function to the producers read from the queue

Whenever the concurrent operation is completed, we will carefully close the channel immediately to avoid hanging on the other side

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>