Last week, I tried to investigate raw data from Dead Letter Queue (DLQ) at work. One of the functionalities that I want to do is poll sources from the SQS DLQ and write those data in a file for further investigation. I thought using FS2 will be a great use-case for this.
FS2 is a light-weight streaming library that can help you do a lot of data processing functionality. Often, we want to get some data from upstream and write them to a file for any investigation.
I want to share how we can create a simple type class for writing data to a file system with FS2.
We will break it down by the simplest use-case of writing any value to a file. We will then explore how we can incorporate Queue to decrease the back-pressure of writing to a file.
Let’s start by defining our function.
In the fs2 guide, there is already a sample example of how we want to write to an external file system.
The code specified that we will read all from the file and do filtering on some function fahrenheitToCelsion
. Then, encode it into a byte and write it to the testdata/celsius.txt
.
through
will combine one stream to another, and text.utf8Encode
returns a Pipe[F[_], I+, O-]
which is equivalent to Stream[F,I] => Stream[F,O]
.
Therefore, we got our first questions answered!
We can write our initial function that takes in an upstream and write it to file. Let’s create toFile(fileName:String, upstream:Stream[IO, String]): Stream[IO, Unit]
:
We need a blocker for writeAll
because it is an operation that will block
the thread. Therefore, cats-effect provide a dedicated thread pool Blocker[IO]
to explicitly handle blocking operations.
So far it looks great. However, we can make toFile
as a Pipe
since it needs upstream:Stream[IO,String]
as a dependency. Let’s refactor our toFile
to return a Pipe
:
Then, we can run this operation like this:
Now that we have toFile
essential operation of writing a file let’s expand more by using Queue to alleviate the back-pressure of writing to the file.
Essentially, the caller will want to call write(item: String)
, and the function will handle writing those items to a file. Let’s start by defining our function arguments:
We want to create a way when each time the caller calls write(item)
, it will concurrently write the item to a queue, and there will another thread that concurrently polls the value from that Queue and write those value to the file.
How can we create that internal Queue?
Fortunately, FS2 has a concurrent package with a queue implemented that helps create an internal queue.
There are two portions when implementing the Queue - enqueue and dequeue.
Based on your application, sometimes, you want to leave either one of the portions to the caller to have full control over it.
In this scenario, we will encapsulate enqueue
in the equivalent of write
to the caller and implement dequeue by writing the file’s value.
Dequeue from FS2 Queue
Let’s implement our dequeue
method first. The code snippet below is equivalent to constantPoll
from the Queue and pipe the stream will toFile
function that we created earlier.
We create a Queue of Queue[IO, Option[Either[Throwable, String]]]
.
Then, we create a Resource that will close the Queue if there is no more value from the upstream by enqueue1
with a None
type. This is often the workaround if you are not using NoneTerminatedQueue
.
The value inside of Stream.bracket
is downstream. It will terminate if it gets a None
, meaning we have no more amount left in the Queue.
write(item:String)
enqueue the item
to the internal Queue
:
Finally, we can wire the enqueue and dequeue together by spawning dequeue into another thread while letting our caller access the WriteToFile
instances.
The key here is the start
method, where it will spawn a fiber and run the constantPoll
in the background. If you remove the start
the operation will be sequential.
We refactor our function by letting the user invoke WriteToFile.create
. The user will need to supply the Queue and the file destination - something like this:
Therefore, our WriteToFile
function looks like this:
Then, we can write a simple program to leverage our implementation like this:
Finally, execute the program:
program.unsafeRunSync
Conclusion
First, we create the toFile
method that takes a chunk of stream, encode it to byte code and write it to the file.
Then, we try to create a wrapper around WriteToFile
that encapsulates all the complexity and uses FS2 primitive, Queue[F,A]
, on the background to handle loads of data. The caller can call write(item: String): F[Unit]
and do the job for them.
Lastly, we wired everything and untangled the complex logic wired our WriteToFile
trait to toFile
and created a library that enables the user to write any value to an external file.
We leveraged the FS2 API to write a complex logic with a little line of code. Concurrently creating an internal queue will be challenging and error-prone without all the combinator of the API.
All source codes are here.