Using Akka Streams For Creating A Pipeline From A Paged API — Evolving Scenario

Roy Ashcenazi
7 min readMay 12, 2021

I have recently started a new job (transitioning from a corporate to a startup
(might lead to a different post)), and one of my first tasks was building a service to retrieve a list of user׳s data from an exterior system that has several APIs exposed (or in other words — a pipeline).

When brainstormed with the team on how to do it, a suggestion to use Akka-stream as the infrastructure for the pipeline came up, as we are working with Scala & Play, this seemed like a good fit.

Some important points to take in mind before reading further

  • I am new to the framework and might have an “imperfect” understanding of it.
  • You might see my solution and have a better one in mind — that's great, feel free to share :)
  • This article is not about Big-Data, but a single application pipeline for using reactive streams, where the amount of data is not the issue, but other topics like backpressure, flow control, throttling that Ill touch more on later.
  • This article will demonstrate a specific Akka-stream usage and is not a beginner's tutorial (there are many good sources including the Akka-streams documentation itself, I really liked this post by Colin Breck).
  • I am using drawings to depict the evolving graph of the pipeline — as I haven't found any good visualization tools for the framework, those are based on my understanding and should be taken with a grain of salt.
  • I assume you have read a bit about the framework and have very basic graph theory knowledge (understanding what is a node, edge, directed graph, source, and sink is enough)

Let's begin, and start with a naive approach

We are starting with the basic use-case, retrieve numeral pages for a specific user from a single API.
Since we don't know how many pages the API contains for some id, and we are using Scala, we will solve it with recursion. (semi pseudo code)

def fetchUserData(api: String, userId: Int, pageNum = 0) = {
callApi(api, userId, pageNum).map { resp =>
if(resp \ "endPage").validate[Boolean].get == true) resp
else resp::fetchUserData(api, userId, pageNum + 1)
}

After we defined the method to fetch pages, let's create the pipeline to fetch all of the pages, and write them to the DB of our choice.

val source = Source.single(fetchUserData)val sink = Sink.foreach(db.insert("tableName", _)))val graph = source.runWith(sink)graph.run()

This is a very naive usage of Akka-stream, without using many of its abilities, but still, some say this is better than calling the API directly and writing to the DB, as we have our flow framed. don't worry, we will start improving from here :)

But first, let's dig in a bit into the code.
We defined a single source, meaning a node that outputs a single element downstream.
We used the sink foreach builder, meaning that every given element (only one in our current graph) will be processed with the method we used to initialize it.

For now, this is how the graph looks like-

The first node processes all of the pages and aggregates them, and the second node saves the data to the database.
For keeping stuff gradual, let's say the next step is some kind of document creation of the aggregated data, before saving it to the database.

//a simple "stupid" parser
def myParser(value: String): Int = value.toInt
val parsingFlow = Flow.fromFunction(aggUserData => myParser(aggUserData)val graph = source.via(parsingFlow).runWith(sink)
graph.run()

The Flow.fromFunction builder will infer the types from the given parsing method, let's say your parser converts a String to an Int (though probably you will use case-classes) when adding type annotation you will see that the type is Flow[String, Int, NotUsed] .
That means the input to the flow is a String, its output is an Int, and the materialized value (we will touch a bit about it later) is NotUsed which is like Unit/Void.

Our bit improved graph-

Bear with me if this feels simple, things are now starting to get complicated.
Have you asked yourself, why do we use recursion? isn't the whole purpose of using Akka-stream is for it to handle the flow?

Introducing the unfold magic

Akka-Streams has many APIs for building sources, sinks, and flows, including the Graph DSL (which many of the “simpler” APIs use internally) one of them that is relevant for our use case is the unfold method for creating a source, which we are going to dive into.

case class PageInfo(shouldStop: Boolean, userId: Int, pageNum: Int)def createSource(userId: Int) = {val initialState = PageInfo(false, userId, 0) Source.unfold(initialState) { pageInfo =>
if (pageInfo.shouldStop) Future.successful(None)
else {
val resp = callApi(api, userId, pageNum)
val nextPageInfo = {
if(resp \ "endOfPage".validate[Boolean]) PageInfo(true, 0)
else PageInfo(false, pageInfo.pageNum + 1)
}
Some(Tuple2(nextPageInfo, resp)
}
}
}
}

The unfold method expects a zero state (in our case the first-page number) and each “iteration” or more correctly, for each state it is either downstream an element, or stops.
The contract is when the state causes a None value to be returned, the source stops to downstream elements, otherwise, a Tuple2 is returned, with the new state and the downstream element produced from the current state.
You need to be careful here and make sure you will get to the final state (in case this is your intention).

So, we have a more clever usage of the framework, but since we do not aggregate the pages but just emitting them downstream, we need to add the aggregation logic that was before in our recursive method.

val aggregationFlow = parseFlow.fold(Seq[SomeCaseClass]())((acc: Seq[SomeCaseClass], data: SomeCaseClass) => acc :+ data)source.via(aggregationFlow).runWith(sink).run()

We have already defined the parsingFlow above, and we are using it to create the aggregationFlow by using the fold method that more experienced scala developers probably already know, it receives the initial accumulator, and a method to accumulate it with each folded element.

Current graph-

That's cool, our pipeline is starting to get more complex, but wait — this pipeline only indexes data for a single user, in a common scenario we will want to index data for multiple entities.

Multiple entities flow

We want to retrieve pages for a list of users, but our source fetches data for a single userId .
We will solve it by creating a single source, compiled from a list of sources, each source for a different userId .

def createCombinedSource(userIds: Seq[Int]) = {
val sourceList = userIds.map(userId => createSource(userId)
Source.combine(sourceList(0), sourceList(1), sourceList.drop(2):_*)
(Merge(_))
}
val combinedSource = createCombinedSources(userIds)

We have created a unified source, using the Merge(_) strategy, meaning that downstream elements order is not pre-determined (unlike the Concat(_) strategy), which causes pages associated with different users to be emitted downstream.

Since we are now emitting pages for different users, we need to group them according to their ids-

val source = source.via(parsingFlow)
.groupBy(userIds.size, someObject.userId)
.fold(Seq[SomeCaseClass]())((acc:Seq[SomeCaseClass],data: SomeCaseClass) => acc :+ data))

And finally, we want to save each aggregated user data to the DB

source.mergeSubStreams
.mapAsync(userIds.size)(aggData => db.insert(aggData))
.run()

Assuming the saving to DB action is asynchronous ( future anybody?) , we will use mapAsync for transactions to happen concurrently. (I defined the parallelism according to the number of ids, but you can think of different numbers according to the use-case).
This parameter is used for Akka-stream to backpressure (asking the source to stop emitting data — one of the key elements of the framework) and for concurrency.

Finally, we have a nice looking graph :)

As I promised, we have a concurrent, with backpressure support, framed pipeline.
There are many other features to improve this, but one we talked about before is throttling.
this can be added simply to our combined sources (each of them) -

source.throttle(1, 500.millis))

Please notice where you want to do the throttling since our source(s) is approaching a different server, it seems like the appropriate place.
Of course, you can add throttling in any other flow .

A few points to conclude

  • Remember the NotUsed type we saw in Flow[String, Int, NotUsed] ? this is the MaterializedValue of our graph, meaning the processing result, but since we do not have a final value (as the last step is saving an object to the DB) we (and possibly many other pipeline implementations) ignore it.
  • The code is Semi Pseudo, and might not work with a copy-paste, feel free to reach out for assistance.
  • I have ignored in order to simplify the pipeline the use of Future s, of course, Akka-stream has full support for processing those, including the unfoldAsync method which is equivalent to the one we used, only that it uses an async function to generate next state tuples.

Thanks for reading, happy pipelining :)

--

--