How to Write Data to a File with FS2

Photo by Aaron Burden

Last week, I tried to investigate raw data from Dead Letter Queue (DLQ) at work. One of the functionalities that I want to do is poll sources from the SQS DLQ and write those data in a file for further investigation. I thought using FS2 will be a great use-case for this.

FS2 is a light-weight streaming library that can help you do a lot of data processing functionality. Often, we want to get some data from upstream and write them to a file for any investigation.

I want to share how we can create a simple type class for writing data to a file system with FS2.

We will break it down by the simplest use-case of writing any value to a file. We will then explore how we can incorporate Queue to decrease the back-pressure of writing to a file.

Let’s start by defining our function.

In the fs2 guide, there is already a sample example of how we want to write to an external file system.

The code specified that we will read all from the file and do filtering on some function fahrenheitToCelsion. Then, encode it into a byte and write it to the testdata/celsius.txt.

through will combine one stream to another, and text.utf8Encode returns a Pipe[F[_], I+, O-] which is equivalent to Stream[F,I] => Stream[F,O].

Therefore, we got our first questions answered!

We can write our initial function that takes in an upstream and write it to file. Let’s create toFile(fileName:String, upstream:Stream[IO, String]): Stream[IO, Unit]:

We need a blocker for writeAll because it is an operation that will block the thread. Therefore, cats-effect provide a dedicated thread pool Blocker[IO] to explicitly handle blocking operations.

So far it looks great. However, we can make toFile as a Pipe since it needs upstream:Stream[IO,String] as a dependency. Let’s refactor our toFile to return a Pipe:

Then, we can run this operation like this:

Now that we have toFile essential operation of writing a file let’s expand more by using Queue to alleviate the back-pressure of writing to the file.

Essentially, the caller will want to call write(item: String), and the function will handle writing those items to a file. Let’s start by defining our function arguments:

We want to create a way when each time the caller calls write(item), it will concurrently write the item to a queue, and there will another thread that concurrently polls the value from that Queue and write those value to the file.

How can we create that internal Queue?

Fortunately, FS2 has a concurrent package with a queue implemented that helps create an internal queue.

There are two portions when implementing the Queue - enqueue and dequeue.

Based on your application, sometimes, you want to leave either one of the portions to the caller to have full control over it.

In this scenario, we will encapsulate enqueue in the equivalent of write to the caller and implement dequeue by writing the file’s value.

Dequeue from FS2 Queue

Let’s implement our dequeue method first. The code snippet below is equivalent to constantPoll from the Queue and pipe the stream will toFile function that we created earlier.

We create a Queue of Queue[IO, Option[Either[Throwable, String]]].

Then, we create a Resource that will close the Queue if there is no more value from the upstream by enqueue1 with a None type. This is often the workaround if you are not using NoneTerminatedQueue.

The value inside of Stream.bracket is downstream. It will terminate if it gets a None, meaning we have no more amount left in the Queue.

write(item:String) enqueue the item to the internal Queue:

Finally, we can wire the enqueue and dequeue together by spawning dequeue into another thread while letting our caller access the WriteToFile instances.

The key here is the start method, where it will spawn a fiber and run the constantPoll in the background. If you remove the start the operation will be sequential.

We refactor our function by letting the user invoke WriteToFile.create. The user will need to supply the Queue and the file destination - something like this:

Therefore, our WriteToFile function looks like this:

Then, we can write a simple program to leverage our implementation like this:

Finally, execute the program:

program.unsafeRunSync

Conclusion

First, we create the toFile method that takes a chunk of stream, encode it to byte code and write it to the file.

Then, we try to create a wrapper around WriteToFile that encapsulates all the complexity and uses FS2 primitive, Queue[F,A], on the background to handle loads of data. The caller can call write(item: String): F[Unit] and do the job for them.

Lastly, we wired everything and untangled the complex logic wired our WriteToFile trait to toFile and created a library that enables the user to write any value to an external file.

We leveraged the FS2 API to write a complex logic with a little line of code. Concurrently creating an internal queue will be challenging and error-prone without all the combinator of the API.

All source codes are here.

Like this Article?

Sign up for my newsletter to get notified for new articles!


Related Posts

5 Anti Pattern for Writing Code in a Functional Programming Language

No 1. Nested Asynchronous Function

Why Do Functional Programmers Prefer For-Comprehension Over Imperative Code Block

Short Answer - Readability

How to Turn Domain Model into DynamoDB AttributeValue

A brief introduction about Dynosaur

Why is REST more popular than RPC?

REST is much more flexible

Want to Safe more money and Resources for your Company? Treat Your Software as a Living Organism

Codebase are not machines but a living organism