1. Introduction

Actors and agents provide thread-safe concurrent state management through message passing. Instead of protecting shared state with locks, you send messages to an actor or agent which processes them one at a time on a dedicated thread. This eliminates data races by design.

Groovy provides two levels of abstraction:

  • Agent — a thread-safe mutable value updated via functions

  • Actor — a message-processing entity with flexible dispatch

Both use virtual threads on JDK 21+ for efficient scaling.

2. Agent

An Agent wraps a value that can be read by any thread but modified only through serialized update functions:

import groovy.concurrent.Agent

def counter = Agent.create(0)
counter.send { it + 1 }
counter.send { it + 1 }
counter.send { it + 1 }

assert await(counter.getAsync()) == 3

2.1. Reading

// Snapshot read — non-blocking, returns current value
def current = counter.get()

// Consistent read — waits for pending updates to complete
def consistent = await(counter.getAsync())

2.2. Updating

// Fire-and-forget update
counter.send { it + 1 }

// Update and get new value
def newValue = await(counter.sendAndGet { it * 2 })

2.3. Complex state

Agents work with any value type:

def inventory = Agent.create([:])

inventory.send { state -> state + [apples: 10] }
inventory.send { state -> state + [bananas: 5] }
inventory.send { state ->
    state.collectEntries { k, v -> [k, v * 2] }
}

assert await(inventory.getAsync()) == [apples: 20, bananas: 10]

2.4. Observing state changes

An agent exposes a Flow.Publisher<T> of state transitions via changes(). Each successful update emits the new value to every subscriber that is already subscribed at the time of the update. The stream is hot (no replay of prior state), per-subscriber buffered, and closes with onComplete when shutdown() is called:

def agent = Agent.create(0)
try {
    async {
        3.times { agent.send { it + 1 } }
        Thread.sleep(50)
        agent.shutdown()
    }
    def seen = []
    for await (v in agent.changes()) {
        seen << v
    }
    assert seen == [1, 2, 3]
} finally {
    agent.shutdown()
}

Slow subscribers drop newly-offered values rather than backpressuring the agent’s update loop — buffered values are still delivered in order, but the most recent update may be skipped if a subscriber’s buffer is full (default size 256). If changes() is first called after shutdown(), the returned publisher is already closed and subscribers receive onComplete immediately.

3. Actor

An Actor processes messages from a queue on a dedicated thread. Two factory methods cover the common patterns:

3.1. Reactor (stateless)

A reactor applies a function to each message. The return value becomes the reply for sendAndGet callers:

import groovy.concurrent.Actor

def doubler = Actor.reactor { it * 2 }
assert await(doubler.sendAndGet(5)) == 10
assert await(doubler.sendAndGet(21)) == 42
doubler.stop()

Reactors are ideal for pure-function message processing — validators, transformers, calculators:

def validator = Actor.reactor { msg ->
    if (msg instanceof String && msg.length() > 0) 'valid'
    else 'invalid'
}

3.2. Stateful

A stateful actor maintains state across messages. The handler receives (state, message) and returns the new state:

def counter = Actor.stateful(0) { state, msg ->
    switch (msg) {
        case 'increment': return state + 1
        case 'decrement': return state - 1
        case 'reset':     return 0
        default:          return state
    }
}

counter.send('increment')
counter.send('increment')
counter.send('decrement')
assert await(counter.sendAndGet('increment')) == 2
counter.stop()

For sendAndGet, the new state is the reply. This makes it easy to query the current state:

// Send a no-op message to read the state
def currentState = await(counter.sendAndGet('query'))

3.3. Typed message dispatch

Use pattern matching for rich message protocols:

def account = Actor.stateful(0.0) { balance, msg ->
    switch (msg) {
        case { it instanceof Map && it.deposit }:
            return balance + msg.deposit
        case { it instanceof Map && it.withdraw }:
            if (msg.withdraw > balance)
                throw new RuntimeException('Insufficient funds')
            return balance - msg.withdraw
        default:
            return balance
    }
}

account.send([deposit: 100])
account.send([withdraw: 30])
def balance = await(account.sendAndGet([deposit: 0]))
assert balance == 70.0
account.stop()

4. Construction options

Actors take an optional ActorOptions configuring the mailbox, the executor, and a handful of opt-in behaviours. The builder is value-based; each with* method returns a new configuration.

import groovy.concurrent.Actor
import groovy.concurrent.ActorOptions

def options = ActorOptions.DEFAULTS
    .withBoundedMailbox(1000, ActorOptions.Overflow.BLOCK)

def actor = Actor.reactor(handler, options)

4.1. Bounded mailbox

By default the mailbox is unbounded. For backpressure — or to cap memory when producers can outrun the actor — configure a capacity and an overflow policy:

import groovy.concurrent.ActorOptions.Overflow

// BLOCK — sender blocks until capacity is free
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.BLOCK)

// FAIL — send throws IllegalStateException when the mailbox is full
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.FAIL)

// DROP_NEWEST — the incoming message is silently dropped; for
// sendAndGet, the returned Awaitable binds to IllegalStateException
ActorOptions.DEFAULTS.withBoundedMailbox(100, Overflow.DROP_NEWEST)

A handler that calls ctx.self().send(…​) on a full BLOCK mailbox would deadlock (the handler is the actor’s only consumer). The actor detects this and fails fast with IllegalStateException rather than parking the worker thread.

4.2. Per-actor executor

Actors default to a shared async executor (virtual threads on JDK 21+). For workload isolation, hand the actor its own executor:

import java.util.concurrent.Executors

def pool = Executors.newSingleThreadExecutor()
def actor = Actor.reactor(handler,
    ActorOptions.DEFAULTS.withExecutor(pool))

Other options — withStashBound and withCurrentSelf — are covered in the FSM section where they’re directly used.

5. Lifecycle

Both actors and agents support lifecycle management. An actor has a three-state lifecycle:

isActive() isTerminated() Meaning

true

false

Accepting new sends and processing them.

false

false

Draining — stop() was called, no new sends are accepted, but already-queued messages are still being processed.

false

true

Terminated — the worker thread has exited; all queued messages are processed and any stashed sendAndGet replies have been rejected.

def actor = Actor.reactor { it }
assert actor.isActive()
assert !actor.isTerminated()

actor.stop()                                  // flips isActive() to false immediately
assert !actor.isActive()                       // new sends fail from here (a send racing with stop() may still land)

// The worker may still be draining queued messages. Poll for terminated
// when you actually need to be sure the actor has finished shutting down.
while (!actor.isTerminated()) Thread.sleep(10)
assert actor.isTerminated()

Actors implement AutoCloseable, so they work with try-with-resources (Groovy or Java):

Actor.reactor { it * 2 }.withCloseable { actor ->
    assert await(actor.sendAndGet(5)) == 10
}
// actor is stopped

6. Error Handling

Exceptions thrown by a handler are delivered to sendAndGet callers through the awaited reply:

def risky = Actor.reactor { throw new RuntimeException('oops') }
try {
    await(risky.sendAndGet('anything'))
} catch (RuntimeException e) {
    assert e.message == 'oops'
}
risky.stop()

Fire-and-forget send calls have no reply to carry the exception. To observe handler failures from those messages — for logging, metrics, or shutting the actor down — register an onError callback:

def actor = Actor.reactor { msg -> process(msg) }
    .onError { Throwable t, msg ->
        log.warn("handler failed on {}: {}", msg, t.message)
    }

The context-aware overload receives an ActorContext first, useful when the callback needs to react beyond logging — for example, stopping the actor:

actor.onError { ctx, t, msg -> ctx.self().stop() }

Exceptions thrown from inside the onError callback itself are caught and discarded so they cannot destabilise the dispatch loop.

7. FSM-style actors

A stateful actor models a single behaviour. For multi-phase protocols — where the actor responds differently depending on which "state" it is in — three additional capabilities on ActorContext cover most of what you’d reach for from a dedicated FSM library:

  • ctx.become(newHandler) — swap the active handler at runtime; state is preserved across the swap.

  • ctx.stash() / ctx.unstashAll() — defer the current message for later replay (typically because it arrived in the wrong phase).

  • ctx.scheduleOnce(msg, delay) / ctx.scheduleAtFixedRate(msg, initialDelay, interval) — self-send a message after a delay, with a cancellable handle.

These capabilities are only available via the context-aware factory overloads, which give the handler an ActorContext parameter.

7.1. Swapping behaviour with become

import groovy.concurrent.Actor
import groovy.concurrent.ActorContext
import groovy.concurrent.StatefulHandler

// Phase A increments by 1; phase B increments by 10.
StatefulHandler phaseB = { ctx, s, m -> s + 10 } as StatefulHandler

def actor = Actor.stateful(0, { ActorContext ctx, int s, m ->
    if (m == 'swap') { ctx.become(phaseB); return s }
    s + 1
} as StatefulHandler)

assert await(actor.sendAndGet('inc'))  == 1
assert await(actor.sendAndGet('swap')) == 1    // state preserved
assert await(actor.sendAndGet('inc'))  == 11   // now phase B

The swap takes effect on the next message: the current handler invocation completes normally (including binding any reply). State is preserved verbatim; the new handler receives the same state value the current one would have returned.

7.2. Deferring messages with stash

When a message arrives while the actor is in the wrong phase, defer it with ctx.stash(). When the phase transitions to one that can handle it, call ctx.unstashAll() to replay all deferred messages in FIFO order at the head of the mailbox:

StatefulHandler connected = { ctx, s, m -> "got($m)" } as StatefulHandler

def actor = Actor.stateful('init', { ActorContext ctx, s, m ->
    if (m == 'ready') {
        ctx.become(connected)
        ctx.unstashAll()      // replay anything stashed while not ready
        return s
    }
    ctx.stash()               // any other message is deferred for now
    s
} as StatefulHandler)

def r1 = actor.sendAndGet('A')    // stashed
def r2 = actor.sendAndGet('B')    // stashed
actor.send('ready')               // → connected, then replays A and B
assert await(r1) == 'got(A)'
assert await(r2) == 'got(B)'

The stash is unbounded by default.

An actor that stashes messages from a source whose volume you do not control (network input, external clients, untrusted callers) can grow the stash without limit and exhaust the JVM heap if the phase transition that would call unstashAll() never arrives. For any such actor, set a bound via withStashBound at construction time.
def options = ActorOptions.DEFAULTS
    .withStashBound(64, ActorOptions.StashOverflow.REJECT)

Policies are FAIL (throw from stash()), DROP_OLDEST (evict the oldest stashed message, binding its sendAndGet reply to error), and REJECT (bind the current message’s reply to error without stashing it).

7.3. Scheduled self-sends

For state timeouts, heartbeats, retries, and periodic work, the context exposes one-shot and recurring schedules over the shared async scheduler:

import java.time.Duration

def actor = Actor.stateful(0, { ActorContext ctx, s, m ->
    if (m == 'start') {
        ctx.scheduleAtFixedRate('tick',
            Duration.ofSeconds(1), Duration.ofSeconds(5))
        return s
    }
    if (m == 'tick') return s + 1
    s
} as StatefulHandler)

scheduleOnce returns a Cancellable so a phase transition can call off a pending timer:

def timer = ctx.scheduleOnce('auth-timeout', Duration.ofSeconds(30))
// ... later, when AuthResult arrives:
timer.cancel()

Actor.stop() cancels every outstanding timer the actor scheduled — no zombies after shutdown.

7.4. End-to-end: a connection handshake

A common FSM shape — disconnected → authenticating → connected — combining all three capabilities:

import groovy.concurrent.Actor
import groovy.concurrent.ActorContext
import groovy.concurrent.StatefulHandler

StatefulHandler disconnected, authenticating, connected

disconnected = { ctx, s, m ->
    if (m == 'connect') { ctx.become(authenticating); return s }
    s
} as StatefulHandler

authenticating = { ctx, s, m ->
    if (m == 'auth-ok') {
        ctx.become(connected)
        ctx.unstashAll()
        return s
    }
    ctx.stash()                // commands arriving during auth are deferred
    s
} as StatefulHandler

connected = { ctx, s, m ->
    // Real connected handlers use s — here we bump an "ops processed"
    // counter to demonstrate that state survives every transition.
    [ops: s.ops + 1]
} as StatefulHandler

def actor = Actor.stateful([ops: 0], disconnected)
actor.send('connect')
def r1 = actor.sendAndGet([cmd: 'read'])    // stashed in authenticating
actor.send('auth-ok')                       // → connected; r1 now processes
assert await(r1) == [ops: 1]                // state preserved across phases
actor.stop()

8. Choosing Between Agent and Actor

Aspect Agent Actor

State

Single value, updated via function

Arbitrary, managed by handler

Messages

Update functions only

Any message type with dispatch

Reply

getAsync() / sendAndGet(fn) returns new value

sendAndGet(msg) returns handler result

Multi-phase behaviour

become / stash / scheduled self-sends

Use case

Thread-safe counters, caches, accumulators

State machines, services, typed protocols

Both guarantee sequential message processing — no locks needed.

9. @ActiveObject / @ActiveMethod

For a more OOP approach, annotate a class with @ActiveObject and its methods with @ActiveMethod. The compiler automatically routes annotated method calls through an internal actor — callers just see a normal class:

import groovy.transform.ActiveObject
import groovy.transform.ActiveMethod

@ActiveObject
class Account {
    private double balance = 0

    @ActiveMethod
    void deposit(double amount) { balance += amount }

    @ActiveMethod
    void withdraw(double amount) {
        if (amount > balance) throw new RuntimeException('Insufficient funds')
        balance -= amount
    }

    @ActiveMethod
    double getBalance() { balance }
}

def account = new Account()
account.deposit(100)         // async, serialized, blocks until done
account.deposit(50)
account.withdraw(30)
assert account.getBalance() == 120.0

Methods without @ActiveMethod run on the caller’s thread as normal.

9.1. Blocking vs non-blocking

By default, @ActiveMethod calls block until the actor processes them. For non-blocking calls, set blocking = false:

@ActiveObject
class Service {
    @ActiveMethod(blocking = false)
    def compute(int x) { x * x }
}

def svc = new Service()
def result = svc.compute(7)  // returns Awaitable immediately
assert await(result) == 49

9.2. Thread safety by annotation

The key benefit: you write normal-looking classes with normal methods. Thread safety is guaranteed by the annotation — all @ActiveMethod calls are serialized through the actor. No locks, no concurrent collections, no race conditions.

This also makes the code highly readable for AI tools — the @ActiveObject annotation explicitly declares the concurrency model, and each @ActiveMethod contains pure business logic without messaging boilerplate.