std.sync.Channel
type pub inline Channel[T]
An unbounded multiple publisher multiple consumer channel, implemented using a
process and the Future
and Promise
types.
A Channel
is useful when you have multiple processes that need to consume
work from some sort of shared queue, with the work being balanced across those
processes automatically.
Values are sent and received in First Out First In (FIFO) order.
Getting values without blocking
Due to the shared state being implemented as a process, which requires
asynchronous message passing, there is no way to see if a value is present and
return it without blocking (e.g. a try_get
method). If such a method were
provided, it would likely return no value (e.g. an Option.None
) even if
values are in fact present, as the call to Future.try_get
would finish
before the underlying process has time to try and resolve the corresponding
Promise
.
If you need to retrieve a value without blocking indefinitely, use
Channel.try_receive
with an appropriate deadline instead.
Sharing channels
Channel
implements Clone
, and cloning a Channel
results in the copy
using the same underlying shared state. To send a Channel
to a different
process, combine clone
with recover
. For example:
import std.sync (Channel)
class async Example {
fn async example(channel: uni Channel[Int]) {}
}
let chan1 = Channel.new
let chan2 = recover chan1.clone
Example().example(chan2)
Static methods
new
Show source codeHide source code
fn pub static new -> Channel[uni T] {
Channel(ChannelState.new)
}
fn pub static new -> Channel[uni T]
Returns a new Channel
.
Instance methods
clone
Show source codeHide source code
fn pub clone -> Channel[T] {
Channel(@state)
}
fn pub clone -> Channel[T]
Creates a clone of self
.
receive
Show source codeHide source code
fn pub receive -> uni T {
match Future.new {
case (future, promise) -> {
@state.receive(promise)
future.get
}
}
}
fn pub receive -> uni T
Receives a value from the channel, blocking the calling process until a value is available.
Examples
import std.sync (Channel)
let chan = Channel.new
chan.send(42)
chan.receive # => 42
receive_until
Show source codeHide source code
fn pub receive_until[D: ToInstant](deadline: ref D) -> Option[uni T] {
match Future.new {
case (future, promise) -> {
@state.receive(promise)
future.get_until(deadline)
}
}
}
fn pub receive_until[D: ToInstant](deadline: ref D) -> Option[uni T]
Receives a value from the channel, blocking the calling process until a value is available or the deadline expires.
If a value is received within the given deadline, a Option.Some
is
returned containing the value, otherwise an Option.None
is returned.
Examples
import std.sync (Channel)
import std.time (Duration)
let chan = Channel.new
chan.receive_until(Duration.from_millis(10)) # => Option.None
chan.send(42)
chan.receive_until(Duration.from_millis(10)) # => Option.Some(42)
send
Show source codeHide source code
fn pub send(value: uni T) {
@state.send(value)
}
fn pub send(value: uni T)
Sends a new value to self
.
This method never blocks the calling process.
Examples
import std.sync (Channel)
let chan = Channel.new
chan.send(42)
Implemented traits
Clone
impl Clone[Channel[T]] for Channel