Structured Concurrency in Kotlin using Arrow's parMap

December 21, 2024 3 min read Intermediate

Arrow's parMap provides a powerful way to handle concurrent operations in Kotlin while maintaining structured concurrency. Let's explore how to use it effectively.

Setup

First, add Arrow dependencies to your build.gradle.kts:

dependencies {
    implementation("io.arrow-kt:arrow-core:1.2.0")
    implementation("io.arrow-kt:arrow-fx-coroutines:1.2.0")
}

Basic Example: Processing User Data

Let's create a practical example where we need to fetch user data and their associated posts from different APIs concurrently:

import arrow.fx.coroutines.parMap
import kotlinx.coroutines.delay

data class User(val id: Int, val name: String)
data class UserPosts(val userId: Int, val posts: List<String>)
data class EnrichedUser(val user: User, val posts: List<String>)

class UserService {
    // Simulate API call to fetch user details
    suspend fun fetchUser(id: Int): User {
        delay(1000) // Simulate network delay
        return User(id, "User $id")
    }

    // Simulate API call to fetch user posts
    suspend fun fetchUserPosts(userId: Int): UserPosts {
        delay(1500) // Simulate network delay
        return UserPosts(userId, listOf("Post 1", "Post 2", "Post 3"))
    }
}

suspend fun main() {
    val userService = UserService()
    val userIds = listOf(1, 2, 3, 4, 5)

    // Process users concurrently with parMap
    val enrichedUsers = userIds.parMap { userId ->
        val user = userService.fetchUser(userId)
        val posts = userService.fetchUserPosts(userId)
        EnrichedUser(user, posts.posts)
    }

    enrichedUsers.forEach { user ->
        println("User ${user.user.name} has ${user.posts.size} posts")
    }
}

Advanced Example: Error Handling and Timeouts

Here's a more advanced example incorporating error handling and timeouts:

import arrow.core.Either
import arrow.fx.coroutines.parMap
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withContext
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers

class UserProcessor(
    private val userService: UserService,
    private val dispatcher: CoroutineDispatcher = Dispatchers.IO
) {
    suspend fun processUsersWithErrorHandling(
        userIds: List<Int>
    ): List<Either<ProcessingError, EnrichedUser>> {
        return userIds.parMap { userId ->
            Either.catch {
                withContext(dispatcher) {
                    withTimeout(3000L) { // 3 seconds timeout
                        val user = userService.fetchUser(userId)
                        val posts = userService.fetchUserPosts(userId)
                        EnrichedUser(user, posts.posts)
                    }
                }
            }.mapLeft { error ->
                when (error) {
                    is kotlinx.coroutines.TimeoutCancellationException -> 
                        ProcessingError.Timeout(userId)
                    else -> ProcessingError.Unknown(userId, error.message ?: "Unknown error")
                }
            }
        }
    }
}

sealed class ProcessingError {
    data class Timeout(val userId: Int) : ProcessingError()
    data class Unknown(val userId: Int, val message: String) : ProcessingError()
}

// Usage
suspend fun main() {
    val userProcessor = UserProcessor(UserService())
    val userIds = listOf(1, 2, 3, 4, 5)

    val results = userProcessor.processUsersWithErrorHandling(userIds)
    
    results.forEach { result ->
        when (result) {
            is Either.Right -> {
                val user = result.value
                println("Successfully processed ${user.user.name}")
            }
            is Either.Left -> {
                when (val error = result.value) {
                    is ProcessingError.Timeout -> 
                        println("Timeout processing user ${error.userId}")
                    is ProcessingError.Unknown -> 
                        println("Error processing user ${error.userId}: ${error.message}")
                }
            }
        }
    }
}

Key Benefits of parMap

  1. Structured Concurrency: All concurrent operations are contained within the scope of the parMap call.
  2. Resource Management: Resources are automatically cleaned up when the scope completes.
  3. Error Handling: Provides clean error handling with Arrow's Either type.
  4. Performance: Efficiently processes items in parallel while maintaining control over concurrency.

Best Practices

  1. Choose Appropriate Dispatcher: Use Dispatchers.IO for I/O operations and Dispatchers.Default for CPU-intensive tasks.
  2. Handle Errors Gracefully: Use Arrow's Either type for proper error handling.
  3. Set Timeouts: Implement timeouts to prevent operations from hanging indefinitely.
  4. Monitor Resources: Be mindful of resource usage when processing large collections.
  5. Limit Concurrency: Consider using parMapN when you need to limit the number of concurrent operations.

Comparison with Other Approaches

// Traditional sequential approach
val sequentialResults = userIds.map { userId ->
    userService.fetchUser(userId)
}

// Using async/await (less structured)
val asyncResults = userIds.map { userId ->
    async { userService.fetchUser(userId) }
}.awaitAll()

// Using parMap (structured concurrency)
val parMapResults = userIds.parMap { userId ->
    userService.fetchUser(userId)
}

The parMap approach provides a clean, structured way to handle concurrency while maintaining error handling and resource management.

To get started right away, you can find the complete source code for this tutorial in our GitHub repository. If you have any questions or run into issues, feel free to open a GitHub issue!

Latest Articles

Structured Concurrency in Kotlin using Arrow's parMap

Structured Concurrency in Kotlin using Arrow's parMap

3 min read Arrow · Functional Programming

Learn how to implement parallel processing in Kotlin using Arrow's parMap for efficient, structured concurrency with proper error handling