
Structured Concurrency in Kotlin using Arrow's parMap
Discover how to implement efficient parallel processing in Kotlin using Arrow's parMap. Learn structured concurrency patterns, proper error handling, and performance optimization techniques with real-world examples.
Arrow's parMap
provides a powerful way to handle concurrent operations in Kotlin while maintaining structured concurrency. Let's explore how to use it effectively.
Setup
First, add Arrow dependencies to your build.gradle.kts
:
dependencies {
implementation("io.arrow-kt:arrow-core:1.2.0")
implementation("io.arrow-kt:arrow-fx-coroutines:1.2.0")
}
Basic Example: Processing User Data
Let's create a practical example where we need to fetch user data and their associated posts from different APIs concurrently:
import arrow.fx.coroutines.parMap
import kotlinx.coroutines.delay
data class User(val id: Int, val name: String)
data class UserPosts(val userId: Int, val posts: List<String>)
data class EnrichedUser(val user: User, val posts: List<String>)
class UserService {
// Simulate API call to fetch user details
suspend fun fetchUser(id: Int): User {
delay(1000) // Simulate network delay
return User(id, "User $id")
}
// Simulate API call to fetch user posts
suspend fun fetchUserPosts(userId: Int): UserPosts {
delay(1500) // Simulate network delay
return UserPosts(userId, listOf("Post 1", "Post 2", "Post 3"))
}
}
suspend fun main() {
val userService = UserService()
val userIds = listOf(1, 2, 3, 4, 5)
// Process users concurrently with parMap
val enrichedUsers = userIds.parMap { userId ->
val user = userService.fetchUser(userId)
val posts = userService.fetchUserPosts(userId)
EnrichedUser(user, posts.posts)
}
enrichedUsers.forEach { user ->
println("User ${user.user.name} has ${user.posts.size} posts")
}
}
Advanced Example: Error Handling and Timeouts
Here's a more advanced example incorporating error handling and timeouts:
import arrow.core.Either
import arrow.fx.coroutines.parMap
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withContext
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
class UserProcessor(
private val userService: UserService,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
suspend fun processUsersWithErrorHandling(
userIds: List<Int>
): List<Either<ProcessingError, EnrichedUser>> {
return userIds.parMap { userId ->
Either.catch {
withContext(dispatcher) {
withTimeout(3000L) { // 3 seconds timeout
val user = userService.fetchUser(userId)
val posts = userService.fetchUserPosts(userId)
EnrichedUser(user, posts.posts)
}
}
}.mapLeft { error ->
when (error) {
is kotlinx.coroutines.TimeoutCancellationException ->
ProcessingError.Timeout(userId)
else -> ProcessingError.Unknown(userId, error.message ?: "Unknown error")
}
}
}
}
}
sealed class ProcessingError {
data class Timeout(val userId: Int) : ProcessingError()
data class Unknown(val userId: Int, val message: String) : ProcessingError()
}
// Usage
suspend fun main() {
val userProcessor = UserProcessor(UserService())
val userIds = listOf(1, 2, 3, 4, 5)
val results = userProcessor.processUsersWithErrorHandling(userIds)
results.forEach { result ->
when (result) {
is Either.Right -> {
val user = result.value
println("Successfully processed ${user.user.name}")
}
is Either.Left -> {
when (val error = result.value) {
is ProcessingError.Timeout ->
println("Timeout processing user ${error.userId}")
is ProcessingError.Unknown ->
println("Error processing user ${error.userId}: ${error.message}")
}
}
}
}
}
Key Benefits of parMap
- Structured Concurrency: All concurrent operations are contained within the scope of the
parMap
call. - Resource Management: Resources are automatically cleaned up when the scope completes.
- Error Handling: Provides clean error handling with Arrow's
Either
type. - Performance: Efficiently processes items in parallel while maintaining control over concurrency.
Best Practices
- Choose Appropriate Dispatcher: Use
Dispatchers.IO
for I/O operations andDispatchers.Default
for CPU-intensive tasks. - Handle Errors Gracefully: Use Arrow's
Either
type for proper error handling. - Set Timeouts: Implement timeouts to prevent operations from hanging indefinitely.
- Monitor Resources: Be mindful of resource usage when processing large collections.
- Limit Concurrency: Consider using
parMapN
when you need to limit the number of concurrent operations.
Comparison with Other Approaches
// Traditional sequential approach
val sequentialResults = userIds.map { userId ->
userService.fetchUser(userId)
}
// Using async/await (less structured)
val asyncResults = userIds.map { userId ->
async { userService.fetchUser(userId) }
}.awaitAll()
// Using parMap (structured concurrency)
val parMapResults = userIds.parMap { userId ->
userService.fetchUser(userId)
}
The parMap
approach provides a clean, structured way to handle concurrency while maintaining error handling and resource management.
To get started right away, you can find the complete source code for this tutorial in our GitHub repository. If you have any questions or run into issues, feel free to open a GitHub issue!