Applications are often design with a throttling mechanism involved. Sometimes, we want to limit the number of requests to improve our application’s security and performance. Sometimes your application cannot respond to more than a certain number of connections. For instance, if you polled the message as fast as your can from a queue and did not limit the number of connection, your application will soon exhaust its connection fool and face Denial of Service error. In this scenario, having a mechanism that limits the amount of the number of applications being processed at the same time will help improve your application performance.
In this article, I want to share how we can create a throttling mechanism with FS2 concurrent primitive Queue.
Before we start, I want you to imagine an application that will continuously poll elements from upstream. Then, we leverage FS2 to handle throttling/back-pressure by providing the maximum buffer size that the resources can operate simultaneously. When the internal Queue is full, it won’t enqueue the message until some of the tasks are finished.
Essentially the user of the application can use it like this:
Therefore, when the maximum size exceeds 100, it will stop poll the element until the internal Queue has some space.
There are 2 parts to the process of creating this:
- Consumer is a type of class that will subscribe to an upstream by constantly enqueuing the value to an internal Queue.
- Subscriber is a type class that will wrap the Consumer and dequeues the chunk from the internal Queue and process that value.
Depending on the use-case of the application, we can encapsulate the Subscriber portion or the Consumer portion. In this article, it will be the Consumer. Meaning the user can specify what upstream their function wants to poll from, and they get to access the result from the Subscriber. The last article was encapsulating the subscriber portion.
The Consumer will subscribe to the upstream. Therefore, we want a function that is like this:
The function receives any value from
enqueue the matter to the internal Queue.
Let’s create the initial subscribe method. We need to create the instance of the
Consumer type class by letting our caller initialized the internal Queue and inject them into the
We will use NoneTerminatedQueue for terminating the Queue once the upstream stops are sending a message to the Consumer. The Consumer can tell the Subscriber to stop the stream.
Does that sound like resource acquisition?
You are right! Essentially, we want to acquire a resource and want to guarantee that some cleanup action is run if the resource is acquired. Therefore, we will create the resource helper method for
We will get the value from
enqueue1 to our internal Queue. Then, we will
compile.drain and drain all the input coming from
upStream. If all the information is a drain or any errors occur during the computation, the resource will clean up by
None to our internal Queue (Subscriber). The Subscriber then stops its stream.
This is how we call the
start here will start a
fiber. If you didn’t put a
start, then the entire process will be sequential, meaning it will enqueue all the value then dequeue. If our internal Queue is full, it will hang there. Therefore, having
start will execute the
subscribe in another IO thread.
We want the
Subscriber to poll repeatedly and return a
Stream[F, A] back to the caller.
Therefore, we can create a type class that has will
Consumer we will need to create an instance of
Subscriber by having the maximum queue size and the upstream as the parameter:
We need to subscribe to the upstream, fire off the Subscriber into another thread, and instantiate the Subscriber.
We create the
boundedNoneTerminated internal Queue with the
maxBufferSize that the caller provides. Then, we make our
Consumer with the help of the
Consumer type class, and subscribe to upstream and start off on another fiber. We return the Queue back so that we can wire it with the
Then as we create the
consumer, we can connect the consumer with the
Subscriber. Like this:
This is the program to use the subscriber instance and use it as any Throttling mechanism in your application:
You will need to call
unsafeRunSync at the end:
Adding a throttling mechanism can be challenging, especially if you need to do it in a concurrent environment. Luckily, with the help of FS2, constructing a throttling mechanism on any application can be done in a few lines of codes.
We create the
Consumer type class to subscribe to any source. Then, we use
Subscriber to constantly
dequeue in a concurrent manner. We can guarantee the cleanup of the Queue data before we stop the stream with resource acquisition.
I hope you find this post useful in learning more about FS2, Scala, or Functional Programming in general. If there is anything that might cause any error to feel free to point it out so I can also learn from you.
All source code is in github.