public final class BroadcastChannel<T>
extends Object
A one-to-many broadcast channel where each value sent is delivered to all subscribers.
Unlike AsyncChannel (point-to-point, each value consumed by
one receiver), a BroadcastChannel delivers every value to
every subscriber that has called subscribe().
def broadcast = BroadcastChannel.create()
def sub1 = broadcast.subscribe()
def sub2 = broadcast.subscribe()
async {
broadcast.send('hello')
broadcast.send('world')
broadcast.close()
// Both subscribers receive both values
for await (msg in sub1) { println "Sub1: $msg" }
for await (msg in sub2) { println "Sub2: $msg" }
}
Inspired by GPars' DataflowBroadcast.
T - the value type| Type Params | Return Type | Name and description |
|---|---|---|
|
public Flow.Publisher<T> |
asPublisher()Returns a Flow.Publisher view of this broadcast channel. |
|
public void |
close()Closes this broadcast channel and all subscriber channels. |
<T> |
public static BroadcastChannel<T> |
create()Creates a new broadcast channel. |
|
public int |
getSubscriberCount()Returns the number of current subscribers. |
|
public boolean |
isClosed()Returns true if this broadcast channel has been closed. |
|
public Awaitable<Void> |
send(T value)Sends a value to all current subscribers. |
|
public AsyncChannel<T> |
subscribe()Creates a new subscriber channel. |
|
public AsyncChannel<T> |
subscribe(int bufferSize)Creates a new subscriber channel with the specified buffer capacity. |
Returns a Flow.Publisher view of this broadcast channel. Each call to Flow.Publisher#subscribe(Flow.Subscriber) on the returned publisher creates a new AsyncChannel subscriber under the hood, draining values to the downstream subscriber according to its requested demand.
Semantics:
request(n); the worker blocks the
broadcast send when no demand exists (sender-side backpressure).onComplete when the broadcast channel
is closed and the per-subscriber buffer drained.
Backpressure policy (important). This bridge uses lossless,
sender-gated backpressure: send(Object) awaits delivery to
every live subscriber, and each per-subscriber channel has a bounded
buffer (default 16). A subscriber that never calls request(n),
or that requests slowly, will fill its buffer; once full, the
subscriber's channel suspends its backing send, which in turn
stalls BroadcastChannel.send(...) for all
subscribers. In other words, the slowest subscriber controls producer
throughput.
This is intentional and matches the point-to-point semantics of
subscribe(): values are neither dropped nor reordered. If
you need decoupled per-subscriber policies (drop-newest, drop-oldest,
latest-only, or unbounded buffering), wrap the publisher with a
Reactive Streams operator of your choice, or use a subscriber that
drains promptly with request(Long.MAX_VALUE).
Flow.Publisher backed by per-subscriber channelsCloses this broadcast channel and all subscriber channels.
Creates a new broadcast channel.
T - the value typeReturns the number of current subscribers.
Returns true if this broadcast channel has been closed.
Sends a value to all current subscribers.
value - the value to broadcastCreates a new subscriber channel. The returned AsyncChannel will receive all values sent to this broadcast from this point forward. Each subscriber is independent — values are buffered per subscriber.
Creates a new subscriber channel with the specified buffer capacity.
bufferSize - the buffer capacity for this subscriber