Structured Concurrency in Kotlin using Arrow's parMap
Learn how to implement parallel processing in Kotlin using Arrow's parMap for efficient, structured concurrency with proper error handling
Arrow's parMap
provides a powerful way to handle concurrent operations in Kotlin while maintaining structured concurrency. Let's explore how to use it effectively.
First, add Arrow dependencies to your build.gradle.kts
:
dependencies {
implementation("io.arrow-kt:arrow-core:1.2.0")
implementation("io.arrow-kt:arrow-fx-coroutines:1.2.0")
}
Let's create a practical example where we need to fetch user data and their associated posts from different APIs concurrently:
import arrow.fx.coroutines.parMap
import kotlinx.coroutines.delay
data class User(val id: Int, val name: String)
data class UserPosts(val userId: Int, val posts: List<String>)
data class EnrichedUser(val user: User, val posts: List<String>)
class UserService {
// Simulate API call to fetch user details
suspend fun fetchUser(id: Int): User {
delay(1000) // Simulate network delay
return User(id, "User $id")
}
// Simulate API call to fetch user posts
suspend fun fetchUserPosts(userId: Int): UserPosts {
delay(1500) // Simulate network delay
return UserPosts(userId, listOf("Post 1", "Post 2", "Post 3"))
}
}
suspend fun main() {
val userService = UserService()
val userIds = listOf(1, 2, 3, 4, 5)
// Process users concurrently with parMap
val enrichedUsers = userIds.parMap { userId ->
val user = userService.fetchUser(userId)
val posts = userService.fetchUserPosts(userId)
EnrichedUser(user, posts.posts)
}
enrichedUsers.forEach { user ->
println("User ${user.user.name} has ${user.posts.size} posts")
}
}
Here's a more advanced example incorporating error handling and timeouts:
import arrow.core.Either
import arrow.fx.coroutines.parMap
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withContext
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
class UserProcessor(
private val userService: UserService,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
suspend fun processUsersWithErrorHandling(
userIds: List<Int>
): List<Either<ProcessingError, EnrichedUser>> {
return userIds.parMap { userId ->
Either.catch {
withContext(dispatcher) {
withTimeout(3000L) { // 3 seconds timeout
val user = userService.fetchUser(userId)
val posts = userService.fetchUserPosts(userId)
EnrichedUser(user, posts.posts)
}
}
}.mapLeft { error ->
when (error) {
is kotlinx.coroutines.TimeoutCancellationException ->
ProcessingError.Timeout(userId)
else -> ProcessingError.Unknown(userId, error.message ?: "Unknown error")
}
}
}
}
}
sealed class ProcessingError {
data class Timeout(val userId: Int) : ProcessingError()
data class Unknown(val userId: Int, val message: String) : ProcessingError()
}
// Usage
suspend fun main() {
val userProcessor = UserProcessor(UserService())
val userIds = listOf(1, 2, 3, 4, 5)
val results = userProcessor.processUsersWithErrorHandling(userIds)
results.forEach { result ->
when (result) {
is Either.Right -> {
val user = result.value
println("Successfully processed ${user.user.name}")
}
is Either.Left -> {
when (val error = result.value) {
is ProcessingError.Timeout ->
println("Timeout processing user ${error.userId}")
is ProcessingError.Unknown ->
println("Error processing user ${error.userId}: ${error.message}")
}
}
}
}
}
parMap
call.Either
type.Dispatchers.IO
for I/O operations and Dispatchers.Default
for CPU-intensive tasks.Either
type for proper error handling.parMapN
when you need to limit the number of concurrent operations.// Traditional sequential approach
val sequentialResults = userIds.map { userId ->
userService.fetchUser(userId)
}
// Using async/await (less structured)
val asyncResults = userIds.map { userId ->
async { userService.fetchUser(userId) }
}.awaitAll()
// Using parMap (structured concurrency)
val parMapResults = userIds.parMap { userId ->
userService.fetchUser(userId)
}
The parMap
approach provides a clean, structured way to handle concurrency while maintaining error handling and resource management.
To get started right away, you can find the complete source code for this tutorial in our GitHub repository. If you have any questions or run into issues, feel free to open a GitHub issue!
Learn how to implement parallel processing in Kotlin using Arrow's parMap for efficient, structured concurrency with proper error handling