Interface Actor<T>

Type Parameters:
T - the message type
All Superinterfaces:
AutoCloseable
All Known Implementing Classes:
DefaultActor

public interface Actor<T> extends AutoCloseable
A lightweight message-passing actor for concurrent state management.

Each actor has a dedicated thread that processes messages sequentially from a queue. This guarantees that the actor's state is never accessed concurrently — no locks needed.

Two factory patterns provide the common shapes:

Each pattern has a context-aware overload (reactor(ReactorHandler) and stateful(Object, StatefulHandler)) whose handler receives an ActorContext as a first argument. The context exposes self() so the handler can stop the actor without a captured self-reference.

 // Reactor: stateless message processing
 def doubler = Actor.reactor { msg -> msg * 2 }
 assert await(doubler.sendAndGet(5)) == 10

 // Stateful: accumulates state across messages
 def counter = Actor.stateful(0) { state, msg ->
     switch (msg) {
         case 'increment': return state + 1
         case 'decrement': return state - 1
         default: return state
     }
 }
 counter.send('increment')
 counter.send('increment')
 assert await(counter.sendAndGet('increment')) == 3

 // Self-stop from a handler via the context
 def bot = Actor.stateful(0) { ctx, count, msg ->
     def next = count + 1
     if (next >= 3) ctx.self().stop()
     next
 }
 

For FSM-style actors, the context-aware handler shapes (ReactorHandler / StatefulHandler) receive an ActorContext that supports ctx.become(...) to swap the active handler, and ctx.stash() / ctx.unstashAll() to defer messages received in the wrong phase and replay them later. See ActorContext for the full semantics.

Actors use virtual threads on JDK 21+ for efficient scheduling. Millions of actors can coexist without pool tuning.

Inspired by GPars actors, Erlang processes, and Clojure agents.

Since:
6.0.0
See Also:
  • Method Details

    • send

      void send(T message)
      Sends a message to this actor. The message is queued and processed asynchronously. Fire-and-forget — no reply is expected.

      Bounded-mailbox interaction (see ActorOptions.Overflow):

      • BLOCK: the calling thread blocks until queue capacity is available, then enqueues.
      • FAIL: throws IllegalStateException when the mailbox is full.
      • DROP_NEWEST: the message is silently dropped — there is no reply to carry the failure, so a fire-and-forget overflow is invisible to the sender. If you need drop visibility, prefer sendAndGet(T) (which binds the dropped reply to IllegalStateException) or a different overflow policy.
      Parameters:
      message - the message to send
      Throws:
      IllegalStateException - if the actor has been stopped, or if the mailbox is bounded with ActorOptions.Overflow.FAIL and is full
    • sendAndGet

      <R> Awaitable<R> sendAndGet(T message)
      Sends a message and returns an Awaitable that completes with the reply. For reactors, the reply is the handler's return value. For stateful actors, the reply is the new state.

      Bounded-mailbox interaction (see ActorOptions.Overflow):

      Type Parameters:
      R - the reply type
      Parameters:
      message - the message to send
      Returns:
      an awaitable reply
      Throws:
      IllegalStateException - if the actor has been stopped, or if the mailbox is bounded with ActorOptions.Overflow.FAIL and is full
    • isActive

      boolean isActive()
      Returns true while the actor is accepting new sends.

      The actor lifecycle has three states, expressed via this method and isTerminated():

      • acceptingisActive() == true, isTerminated() == false: the actor accepts new sends and is processing them.
      • drainingisActive() == false, isTerminated() == false: entered immediately when stop() is called. Further sends throw IllegalStateException, but messages already queued (or sent in a race with stop) continue to run.
      • terminatedisActive() == false, isTerminated() == true: the worker has exited; the queue and any stash have been processed (or, for stashed sendAndGet replies, rejected).
    • isTerminated

      default boolean isTerminated()
      Returns true once the worker has fully exited — i.e. the queue and any stashed messages have been processed (or rejected, in the case of stashed sendAndGet replies). Always false while isActive() is true; becomes true some time after stop() is called, once draining completes.
      Since:
      6.0.0
    • stop

      void stop()
      Stops this actor gracefully. Messages already in the queue are processed before the actor shuts down. New sends after stop throw IllegalStateException.
    • close

      default void close()
      Stops this actor. Equivalent to stop().
      Specified by:
      close in interface AutoCloseable
    • onError

      default Actor<T> onError(BiConsumer<Throwable,? super T> handler)
      Registers a handler invoked when the message processor throws.

      Fire-and-forget send(T) otherwise has no way to surface a handler exception; this hook is the supported way to log, record metrics for, or react to those failures. For sendAndGet(T) the failure is still reported through the returned Awaitable; the onError handler runs in addition.

      To stop the actor from inside an error handler, prefer the context-aware overload onError(TriConsumer)ctx.self().stop() works on any actor. Alternatively, if the actor was built with withCurrentSelf(true), call Actor.currentSelf().stop(); without that opt-in currentSelf() throws.

      Exceptions thrown from the handler itself are caught and discarded so the actor's processing loop is not destabilised. Replacing a previously registered handler replaces it wholesale — there is no chaining.

      Register before the first send. An onError call that happens after a message is already in flight may not see that message's failure: only handlers visible to the worker by the time it dispatches a given message are invoked for it. To guarantee coverage, chain onError into actor construction — as in the example below — and avoid registering it later.

      
       def actor = Actor.reactor(handler).onError { Throwable t, msg ->
           log.warn("actor failed processing {}", msg, t)
       }
       
      Parameters:
      handler - invoked as (throwable, message)
      Returns:
      this actor, for chaining
      Throws:
      UnsupportedOperationException - if the implementation does not support custom error handling
      Since:
      6.0.0
    • onError

      default Actor<T> onError(TriConsumer<ActorContext<T>,Throwable,? super T> handler)
      Context-aware variant of onError(BiConsumer). The handler receives an ActorContext that can be used to stop the actor via ctx.self().stop().
      Parameters:
      handler - invoked as (context, throwable, message)
      Returns:
      this actor, for chaining
      Throws:
      UnsupportedOperationException - if the implementation does not support custom error handling
      Since:
      6.0.0
    • currentSelf

      static <T> Actor<T> currentSelf()
      Returns the actor whose handler is currently executing on this thread.

      This convenience lets callers using the simple Function / BiFunction factories self-stop without restructuring to the context-aware overloads. Prefer the context-aware overloads where possible.

      Support is opt-in per actor: the actor must be configured with ActorOptions.withCurrentSelf(true). The default options do not publish the thread-local, so this method throws IllegalStateException unless the actor was constructed with the flag enabled.

      Type Parameters:
      T - the actor's message type, inferred at the call site
      Returns:
      the actor currently executing a handler on this thread
      Throws:
      IllegalStateException - if called outside an actor handler, or from an actor not configured with ActorOptions.withCurrentSelf(boolean)
      Since:
      6.0.0
    • reactor

      static <T, R> Actor<T> reactor(Function<T,R> handler)
      Creates a stateless reactor actor. Each message is passed to the handler function, and the return value becomes the reply for sendAndGet(T) callers.
      
       var doubler = Actor.reactor(n -> (int) n * 2);
       System.out.println(AsyncSupport.await(doubler.sendAndGet(5))); // 10
       
      Type Parameters:
      T - the message type
      R - the reply type
      Parameters:
      handler - the message processing function
      Returns:
      a started actor
    • reactor

      static <T, R> Actor<T> reactor(Function<T,R> handler, ActorOptions options)
      Creates a stateless reactor actor with explicit ActorOptions controlling the mailbox and executor.
      Since:
      6.0.0
    • reactor

      static <T, R> Actor<T> reactor(ReactorHandler<T,R> handler)
      Context-aware variant of reactor(Function). The handler receives an ActorContext alongside the message and can stop the actor via ctx.self().stop().
      Since:
      6.0.0
    • reactor

      static <T, R> Actor<T> reactor(ReactorHandler<T,R> handler, ActorOptions options)
      Context-aware variant of reactor(Function, ActorOptions).
      Since:
      6.0.0
    • stateful

      static <T, S> Actor<T> stateful(S initialState, BiFunction<S,T,S> handler)
      Creates a stateful actor. The handler receives the current state and the message, and returns the new state. For sendAndGet(T) callers, the new state is the reply.
      
       var counter = Actor.stateful(0, (state, msg) -> {
           if ("increment".equals(msg)) return (int) state + 1;
           return state;
       });
       counter.send("increment");
       System.out.println(AsyncSupport.await(counter.sendAndGet("increment"))); // 2
       

      Note: the state type S is fixed at construction; if a later ctx.become(...) call swaps in a StatefulHandler expecting an incompatible S, the resulting ClassCastException surfaces on the next dispatch rather than at the swap site.

      Type Parameters:
      T - the message type
      S - the state type
      Parameters:
      initialState - the initial state
      handler - receives (state, message), returns new state
      Returns:
      a started actor
    • stateful

      static <T, S> Actor<T> stateful(S initialState, BiFunction<S,T,S> handler, ActorOptions options)
      Creates a stateful actor with explicit ActorOptions controlling the mailbox and executor.
      Since:
      6.0.0
    • stateful

      static <T, S> Actor<T> stateful(S initialState, StatefulHandler<S,T> handler)
      Context-aware variant of stateful(Object, BiFunction). The handler receives an ActorContext alongside the state and message and can stop the actor via ctx.self().stop().
      Since:
      6.0.0
    • stateful

      static <T, S> Actor<T> stateful(S initialState, StatefulHandler<S,T> handler, ActorOptions options)
      Since:
      6.0.0