Interface AsyncChannel<T>

Type Parameters:
T - the payload type
All Superinterfaces:
Iterable<T>
All Known Implementing Classes:
DefaultAsyncChannel

public interface AsyncChannel<T> extends Iterable<T>
An asynchronous channel for inter-task communication with optional buffering.

A channel coordinates producers and consumers without exposing explicit locks or shared mutable state, following the CSP (Communicating Sequential Processes) paradigm popularized by Go's channels.

Channels support both unbuffered (rendezvous) and buffered modes:

  • Unbufferedcreate() or create(0). Each send suspends until a matching receive arrives.
  • Bufferedcreate(n). Values are enqueued until the buffer fills, then senders suspend.

Channels implement Iterable, so they work with for await and regular for loops — iteration yields received values until the channel is closed and drained:


 def ch = AsyncChannel.create(2)
 async { ch.send('a'); ch.send('b'); ch.close() }
 for await (item in ch) {
     println item   // prints 'a', then 'b'
 }
 
Since:
6.0.0
See Also:
  • Method Summary

    Modifier and Type
    Method
    Description
    boolean
    Closes this channel.
    static <T> AsyncChannel<T>
    Creates an unbuffered (rendezvous) channel.
    static <T> AsyncChannel<T>
    create(int capacity)
    Creates a channel with the specified buffer capacity.
    default AsyncChannel<T>
    filter(Predicate<T> predicate)
    Returns a new channel that passes only elements matching the predicate.
    int
    Returns the number of values currently buffered.
    int
    Returns this channel's buffer capacity.
    boolean
    Returns true if this channel has been closed.
    default <R> AsyncChannel<R>
    map(Function<T,R> transform)
    Returns a new channel that transforms each element using the function.
    default AsyncChannel<T>
    merge(AsyncChannel<? extends T> other)
    Returns a new channel that receives values from both this channel and the other channel.
    Receives the next value from this channel.
    send(T value)
    Sends a value through this channel.
    default List<AsyncChannel<T>>
    split(Predicate<T> predicate)
    Returns two new channels: elements matching the predicate go to the first, non-matching to the second.
    default AsyncChannel<T>
    Returns a new channel that receives all values from this channel while also sending a copy of each value to the tap channel.

    Methods inherited from interface java.lang.Iterable

    forEach, iterator, spliterator
  • Method Details

    • create

      static <T> AsyncChannel<T> create()
      Creates an unbuffered (rendezvous) channel.
    • create

      static <T> AsyncChannel<T> create(int capacity)
      Creates a channel with the specified buffer capacity.
      Parameters:
      capacity - the maximum buffer size; 0 for unbuffered
    • getCapacity

      int getCapacity()
      Returns this channel's buffer capacity.
    • getBufferedSize

      int getBufferedSize()
      Returns the number of values currently buffered.
    • isClosed

      boolean isClosed()
      Returns true if this channel has been closed.
    • send

      Awaitable<Void> send(T value)
      Sends a value through this channel.

      The returned Awaitable completes when the value has been delivered to a receiver or buffered. Sending to a closed channel fails immediately with ChannelClosedException.

      Parameters:
      value - the value to send; must not be null
      Returns:
      an Awaitable that completes when the send succeeds
      Throws:
      NullPointerException - if value is null
    • receive

      Awaitable<T> receive()
      Receives the next value from this channel.

      The returned Awaitable completes when a value is available. Receiving from a closed, empty channel fails with ChannelClosedException.

      Returns:
      an Awaitable that yields the next value
    • close

      boolean close()
      Closes this channel. Idempotent.

      Buffered values remain receivable. Pending senders fail with ChannelClosedException. After all buffered values are drained, subsequent receives also fail.

      Returns:
      true if this call actually closed the channel
    • filter

      default AsyncChannel<T> filter(Predicate<T> predicate)
      Returns a new channel that passes only elements matching the predicate.
      Parameters:
      predicate - the filter function
      Returns:
      a new filtered channel
      Since:
      6.0.0
    • map

      default <R> AsyncChannel<R> map(Function<T,R> transform)
      Returns a new channel that transforms each element using the function.
      Type Parameters:
      R - the output element type
      Parameters:
      transform - the mapping function
      Returns:
      a new transformed channel
      Since:
      6.0.0
    • merge

      default AsyncChannel<T> merge(AsyncChannel<? extends T> other)
      Returns a new channel that receives values from both this channel and the other channel. Values are interleaved as they arrive. The output closes when both inputs are exhausted.
      Parameters:
      other - the channel to merge with
      Returns:
      a new merged channel
      Since:
      6.0.0
    • split

      default List<AsyncChannel<T>> split(Predicate<T> predicate)
      Returns two new channels: elements matching the predicate go to the first, non-matching to the second. Both are closed when this channel is exhausted.
      Parameters:
      predicate - the split condition
      Returns:
      a list of two channels: [matching, non-matching]
      Since:
      6.0.0
    • tap

      default AsyncChannel<T> tap(AsyncChannel<T> tap)
      Returns a new channel that receives all values from this channel while also sending a copy of each value to the tap channel. Useful for logging, monitoring, or forking a side pipeline.
      Parameters:
      tap - the channel to send copies to
      Returns:
      a new pass-through channel
      Since:
      6.0.0