1. Introduction

Groovy adds parallel collection methods directly onto Collection, enabling data parallelism with familiar Groovy idioms. These methods use Java parallel streams under the hood, with pool isolation via ForkJoinPool to prevent interference with the common pool.

All methods use java.util.function types (Consumer, Function, Predicate, etc.) and Groovy closures are automatically converted via SAM coercion.

2. Getting Started

Parallel methods are available on any Collection without imports:

def result = (1..1000).toList().collectParallel { it * 2 }
assert result.sort() == (2..2000).step(2).toList()

By default, operations run on ForkJoinPool.commonPool(). For pool isolation, use ParallelScope.withPool:

import groovy.concurrent.ParallelScope
import groovy.concurrent.Pool

ParallelScope.withPool(Pool.cpu()) { scope ->
    def bigList = (1..100_000).toList()
    def sum = bigList.injectParallel(0) { a, b -> a + b }
    println "Sum: $sum"
}

3. Pool and ParallelScope

3.1. Pool

Pool is a managed thread pool extending Executor and AutoCloseable:

import groovy.concurrent.Pool

def pool = Pool.cpu()       // Sized to available processors (ForkJoinPool)
def pool2 = Pool.fixed(8)   // Fixed-size ForkJoinPool
def pool3 = Pool.io()       // Virtual threads on JDK 21+, sized pool otherwise
def pool4 = Pool.virtual()  // Virtual-thread-per-task (JDK 21+)

Pool.cpu() and Pool.fixed() create ForkJoinPool-backed pools, ideal for CPU-bound parallel work. Pool.io() and Pool.virtual() create virtual thread pools for I/O-bound tasks.

3.2. ParallelScope

ParallelScope.withPool binds a pool for the duration of a block. Parallel collection methods inside the block automatically use it:

import groovy.concurrent.ParallelScope
import groovy.concurrent.Pool

ParallelScope.withPool(4) { scope ->
    // All *Parallel methods use a 4-thread ForkJoinPool
    def results = bigList.collectParallel { transform(it) }
    def filtered = results.findAllParallel { it > threshold }
    filtered.eachParallel { process(it) }
}
// Pool is shut down automatically

3.3. ConcurrentConfig

Global defaults are managed by ConcurrentConfig:

import groovy.concurrent.ConcurrentConfig

// Default parallelism: processors + 1 (GPars convention)
println ConcurrentConfig.defaultParallelism

// Override via system property: -Dgroovy.concurrent.poolsize=16
// Or programmatically:
ConcurrentConfig.defaultParallelism = 16

4. Parallel Collection Methods

4.1. Transformation

Method Description

collectParallel { }

Transforms each element (parallel map)

collectManyParallel { }

Transforms and flattens (parallel flatMap)

def names = people.collectParallel { it.name }
def allTags = articles.collectManyParallel { it.tags }

4.2. Filtering

Method Description

findAllParallel { }

Elements matching predicate

findParallel { }

First element matching predicate

findAnyParallel { }

Any element matching (may be faster, no order guarantee)

grepParallel(filter)

Pattern match via isCase (Class, regex, Range, etc.)

splitParallel { }

Partition into [matching, non-matching]

def adults = people.findAllParallel { it.age >= 18 }
def firstMatch = data.findParallel { it.isValid() }
def strings = mixed.grepParallel(String)
def (evens, odds) = numbers.splitParallel { it % 2 == 0 }

4.3. Iteration

Method Description

eachParallel { }

Side-effect iteration

eachWithIndexParallel { item, idx → }

Iteration with index

files.eachParallel { process(it) }
items.eachWithIndexParallel { item, idx -> println "$idx: $item" }

4.4. Predicates

Method Description

anyParallel { }

True if any element matches

everyParallel { }

True if all elements match

countParallel { }

Count of matching elements

if (orders.anyParallel { it.isPriority() }) { ... }
assert positiveNumbers.everyParallel { it > 0 }
def errorCount = logs.countParallel { it.level == 'ERROR' }

4.5. Aggregation

Method Description

sumParallel { a, b → }

Reduce with binary operator

injectParallel(seed) { a, b → }

Reduce with seed (accumulator must be associative)

minParallel { a, b → }

Minimum by comparator

maxParallel { a, b → }

Maximum by comparator

groupByParallel { }

Group into map by classifier

def total = amounts.sumParallel { a, b -> a + b }
def product = numbers.injectParallel(1) { a, b -> a * b }
def youngest = people.minParallel { a, b -> a.age <=> b.age }
def byDept = employees.groupByParallel { it.department }
injectParallel requires an associative accumulator for correct parallel results. Non-associative accumulators produce undefined results.

5. @Parallel for loops

The @Parallel annotation converts a for-in loop into a parallel operation with structured completion — all iterations finish before the next statement executes:

import groovy.transform.Parallel
import java.util.concurrent.CopyOnWriteArrayList

def results = new CopyOnWriteArrayList()

@Parallel
for (item in [1, 2, 3, 4, 5]) {
    results << item * 10
}

// All iterations complete before this line
assert results.sort() == [10, 20, 30, 40, 50]

Inside a ParallelScope.withPool block, @Parallel uses the bound pool:

ParallelScope.withPool(Pool.cpu()) { scope ->
    @Parallel
    for (n in data) {
        process(n)
    }
}

Under the hood, @Parallel rewrites the for loop body into a closure passed to eachParallel. This means the loop body is no longer a traditional loop — it is a closure invocation. The loop variable is also internally renamed (e.g., item becomes $parallel_item) to avoid scope conflicts in the generated code.

Because the body becomes a closure:

  • break and continue are not supported — they are not valid inside closures and will cause a compile error.

  • return inside the body returns from the closure (skipping the current iteration), not from the enclosing method.

Shared mutable state inside @Parallel loops must be thread-safe (e.g., AtomicInteger, CopyOnWriteArrayList, ConcurrentHashMap).
If you need the best debugging experience (e.g., stepping through loop bodies or inspecting variable names), consider using the *Parallel collection methods directly (such as eachParallel, collectParallel) instead of @Parallel, or be aware of the previously mentioned internal variable renaming.

6. Best Practices

6.1. Choosing between parallel collections and async/await

Parallel collections and @Parallel run on a ForkJoinPool with a small number of worker threads (typically matching your CPU core count). This is ideal for CPU-bound work — computation, transformation, aggregation — where each task keeps the CPU busy.

For I/O-bound or latency-bound work (network calls, database queries, sleeps, delays), use async/await with virtual threads instead. Virtual threads can scale to millions of concurrent tasks because blocking one doesn’t consume an OS thread.

Workload Right tool Why

CPU-bound (computation, parsing, math)

@Parallel, collectParallel, etc.

ForkJoinPool work-stealing maximises CPU utilisation

I/O-bound (HTTP calls, DB queries, file I/O)

async/await with virtual threads

Millions of concurrent waits without blocking OS threads

Mixed

async/await with Pool.io()

Virtual threads handle both compute and I/O

To illustrate, consider processing 100 items where each involves a short delay (simulating I/O or latency):

// CPU-bound work — @Parallel is the right tool
@Parallel
for (item in bigList) {
    results << expensiveComputation(item)  // keeps CPU busy, no blocking
}
// I/O-bound work — async/await is the right tool
AsyncScope.withScope { scope ->
    urls.each { url ->
        scope.async { fetchUrl(url) }  // virtual thread blocks on I/O (free)
    }
}
// All fetches complete here — structured concurrency
// Anti-pattern — blocking inside parallel collections
@Parallel
for (item in 1..100) {
    Thread.sleep(50)    // blocks a ForkJoinPool worker!
    process(item)
}
// Only ~8 items run concurrently (pool size), rest queue behind.
// Total time: ~625ms instead of ~50ms with virtual threads.

The anti-pattern looks parallel but the ForkJoinPool has only ~8 workers. Each Thread.sleep ties up a worker, so at most ~8 items sleep concurrently. The remaining 92 wait in the queue.

await Awaitable.delay(ms) inside a parallel collection body has the same problem — await blocks the calling ForkJoinPool worker thread while waiting for the delay to complete. Use async/await outside parallel collections for delay-based work.

6.2. Use thread-safe collections for shared state

The parallel body runs on multiple threads simultaneously. Use concurrent collections or atomics for any shared mutable state:

// Bad — ArrayList is not thread-safe
def results = []
items.eachParallel { results << transform(it) }  // race condition!

// Good — use CopyOnWriteArrayList or collectParallel
def results = items.collectParallel { transform(it) }