How to Write Data Processing Application in FS2

Photo by Cam Adams

In my previous article, I discussed how to write a data processing application with classic Akka actors.

In this article, I would like to switchgear and used a functional programming streaming library, FS2, to process HTTP logs from a CSV file to get the numbers of HTTP status we have in the log file.

I found that learning FS2, in general, is a bit hard, especially from a background where you never heard of stream. Therefore, before we start diving into the codebase, I would like to begin by defining a Stream, and some necessary Stream processing in FS2.

Note: I will be using FS2 with effect type cats.effect IO in the code below. If you are interested to know more about cats.effect IO, check out their documentation.

What is a Stream?

Stream, as in Wikipedia, means a continuous flow of something, becoming from a single source, a liquid, air, or gas. In computing, it said that it transmit or receive (data) over the internet as a steady, continuous flow.

The simple term for it that I could think of will be an iterator but on steroids. That means there will be an infinite number of containers that you will need to iterate. In OOP, you can think of it as an iterator that will iterate finite or infinite amount of array. While you iterate through all those elements, you can do any sort of crazy things and interact with the outside world.

The stream has a lot of useful features that List doesn’t.

First, Each of the elements in a Stream is lazily evaluated, meaning when writing the code, it is not eagerly evoked until you command to execute them. This feature is a big deal, because it enables load, transform, and write big data sets without blowing up your heap.

Second, the stream doesn’t mutate the data like how List will do. If you want to map an element A to an element B, it doesn’t alter the original value of A. It helps if you are operating big data sets and files in a concurrent environment.

Lastly, you can do a lot of operation in Streams of data like doing IO in the intermediate processor combining it with another stream.

Basic Stream operation

FS2 has a type:Stream[F[_], O]. It means you get a value stream that emits O type. The F[_] is the effect type. The effect type is implying that if you have some sort of interaction with the outside world, writing to DB or doing IO. Usually, this is an IO type. If you don’t have any effect, a basic calculation that only affects your CPU is often the Pure type.

You can compose a stream like how you would compose a List:

Stream(1,2,3)

When you create the Stream above, it only creates a Pure type constructor of a stream. It doesn’t run the program yet; you are describing your application. That means you need to explicitly execute it by either converting it to a List, by doing Stream(1,2,3).toList.

Creating an Effectful Stream

You can create a Stream with effect in it and execute it:

Stream(1,2,3).covary[IO].compile.toList.unsafeRunSync()

The above code describes you lift this stream to an IO effect type. Then, you want to execute the stream into an effect type by calling compile. Then, we convert the effect to a List, by running toList. By then, the nature of the stream is IO(List(1,2,3)). We still need to lift the IO by running unsafeRunSync to get the List(1,2,3).

Resource Safety

FS2 provided brackets or resources to interact with files. These are similar to cats effect IO, which can allocate resources and release them afterward, regardless of any problem that happens within the allocation.

Stream.brackets {
  IO {
    new BufferedReader(new FileReader(new File("yourfile.txt")))
  }
}(f => IO(f.close()))

The above code describes an operation on how to acquire the resource and release them afterward.

Dealing with Infinite Stream

To create an infinite amount of constant stream constant. To limit the amount of chunks or value, we want to pull from the source, take:

Stream.constant(42).take(5).toList
 // res0: List(42,42,42,42,42)

It will take the five elements from an infinite pure stream that emits 42.

Concurrent Processing

To create multiple concurrent processing, we can use parEvalMap(nWorker:Int)(f:A => F[A1]). It also has the same syntax for people who are familiar with Akka stream mapAsync.

It creates nWorker thread and process the stream. parEvalMap will return to downstream in order. Under the hood, it uses fs2.concurrent.Queue to execute concurrently and maintaining the request to the downstream.

There are a lot more operations that you can use in FS2. If you are interested, check out this guide.

Now, we can go forth and conquer on processing data with FS2.

Thought Process

We want to get the data from the weblog.csv. We sanitized the incoming data to filter out all the invalid IP. Then, we want to send it to the workers and process the data, converting it into a Log instance. Lastly, we count the HTTP status in the logs and write it to the out.txt file.

Stream Illustration

Execution

Let’s start from top to bottom. Here is the primary function:

object Main extends IOApp with Processor {

  def run(args: List[String]): IO[ExitCode] = {

    processData("/weblog.csv", "/out.txt", 100).compile.drain.map(_ => ExitCode.Success)
  }
}

processData will provide a source file, /weblog.csv, and the sink file, /out.txt with the number of parallelisms it will generate. Then, after describing the function processData, we will need to execute it by calling compile.drain.

drain removes all the values from the stream and just execute it. Once all the amounts are emitted from the stream, we will get a single Unit in the result, because we will be writing it to the out.txt inside processData function.

We will use Stream.resource to acquire and release resources:

Stream.resource(Blocker[IO]).flatMap {  blocker =>
  val inResource = getClass.getResource(in)
  val outResource = getClass.getResource(out)
  io.file
    .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    ..... // our processing logic here
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))
  
}

The code above creates an inResource and outResource to read and write data to the file. It reads the data from the file line by line, by first decoding the bytes to UTF8, and then encoding it again to UTF8 and writes incrementally to out.txt`.

This is line of logs will look like:

10.128.2.1,[29/Nov/2017:06:58:55,GET /login.php HTTP/1.1,200

There is a ... which we will fill in for all the operations that we will do in the thought process.

Before that, this is the Log and Date model that each worker will convert into:

  case class Log(ip: String, time: Date, url: String, status: String)
  case class Date(year: Int, month: String, date: Int, time: String)

Processing Logic

Filter invalid IP

Once we get each line from the file, we will filter out the invalid IP address:

filter(isValidIp) // filter out valid IP

isValidIp function looks like this:

def isValidIp(line: String): Boolean = {
    val ipRegex: Regex = """.*?(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3}).*""".r
    ipRegex.pattern.matcher(line.split(",")(0)).matches()
  }

Send lines to Workers

We send each of the filtered lines to the workers so they can convert them to Log model.

.parEvalMapUnordered(parallelism)(convertToLog)

convertToLog function looks like this:

def convertToLog(line: String): IO[Log] = line.split(",").toList match {
    case ip :: time :: url :: status :: _ =>
      IO(Log(ip, convertToDate(time), url, status))
  }

Count HTTP Status Code

Once the worker finished converting to Logs, it will go through the folding phase. Inside the folding phase, we group the number of HTTP status to a Map with status -> number of status.

flatMap { m =>
          Stream.fromIterator[IO](m.keys.map { key =>
            s"Status : ${key} has a total of ${m(key)} amount "
          }.iterator)
        }

All Together

To sum up if we put all the logic together, it will become something like this:

io.file
        .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
        .through(text.utf8Decode)
        .through(text.lines)
        .filter(isValidIp)
        .parEvalMapUnordered(parallelism)(convertToLog)
        .fold(Map.empty[String, Int]) { (map, currLog) =>
          val updatedStatus = map.getOrElse(currLog.status, 0) + 1
          map + (currLog.status -> updatedStatus)
        }
        .flatMap { m =>
          Stream.fromIterator[IO](m.keys.map { key =>
            s"Status : ${key} has a total of ${m(key)} amount "
          }.iterator)
        }
        .through(text.utf8Encode)
        .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))

We can also do fold with the outside of the stream then write to the out.txt file. In that sense, we will need to acquire and release the resource for that writeAll operation.

Summary

In these articles, we touch upon complicated Stream processing library, FS2.

We discuss what a Stream is and how it is different than List. We also talk about the essential operation of the stream. Then, we dive deep into how you can create a data processing application with FS2.

Here is the full source code.

If you are interested to learn more about FS2, check out these resources:

Like this Article?

Sign up for my newsletter to get notified for new articles!


Related Posts

5 Anti Pattern for Writing Code in a Functional Programming Language

No 1. Nested Asynchronous Function

Why Do Functional Programmers Prefer For-Comprehension Over Imperative Code Block

Short Answer - Readability

How to Turn Domain Model into DynamoDB AttributeValue

A brief introduction about Dynosaur

Functional Programming has made My Job Easier as a Software Engineer. Here's Why.

Type level system able to let me sleep well at night

This is the Main Difference of Writing Applications in Functional Programming vs. Object-Oriented Programming

It is not immutability or inheritance, but more on the structure of the application if you use functional programming vs object-oriented