Search results

There are no results.

std.sync.Channel

type pub inline Channel[T]

An unbounded multiple publisher multiple consumer channel, implemented using a process and the Future and Promise types.

A Channel is useful when you have multiple processes that need to consume work from some sort of shared queue, with the work being balanced across those processes automatically.

Values are sent and received in First Out First In (FIFO) order.

Getting values without blocking

Due to the shared state being implemented as a process, which requires asynchronous message passing, there is no way to see if a value is present and return it without blocking (e.g. a try_get method). If such a method were provided, it would likely return no value (e.g. an Option.None) even if values are in fact present, as the call to Future.try_get would finish before the underlying process has time to try and resolve the corresponding Promise.

If you need to retrieve a value without blocking indefinitely, use Channel.try_receive with an appropriate deadline instead.

Sharing channels

Channel implements Clone, and cloning a Channel results in the copy using the same underlying shared state. To send a Channel to a different process, combine clone with recover. For example:

import std.sync (Channel)

class async Example {
  fn async example(channel: uni Channel[Int]) {}
}

let chan1 = Channel.new
let chan2 = recover chan1.clone

Example().example(chan2)

Static methods

new

Show source code
Hide source code
fn pub static new -> Channel[uni T] {
  Channel(ChannelState.new)
}
fn pub static new -> Channel[uni T]

Returns a new Channel.

Instance methods

clone

Show source code
Hide source code
fn pub clone -> Channel[T] {
  Channel(@state)
}
fn pub clone -> Channel[T]

Creates a clone of self.

receive

Show source code
Hide source code
fn pub receive -> uni T {
  match Future.new {
    case (future, promise) -> {
      @state.receive(promise)
      future.get
    }
  }
}
fn pub receive -> uni T

Receives a value from the channel, blocking the calling process until a value is available.

Examples

import std.sync (Channel)

let chan = Channel.new

chan.send(42)
chan.receive # => 42

receive_until

Show source code
Hide source code
fn pub receive_until[D: ToInstant](deadline: ref D) -> Option[uni T] {
  match Future.new {
    case (future, promise) -> {
      @state.receive(promise)
      future.get_until(deadline)
    }
  }
}
fn pub receive_until[D: ToInstant](deadline: ref D) -> Option[uni T]

Receives a value from the channel, blocking the calling process until a value is available or the deadline expires.

If a value is received within the given deadline, a Option.Some is returned containing the value, otherwise an Option.None is returned.

Examples

import std.sync (Channel)
import std.time (Duration)

let chan = Channel.new

chan.receive_until(Duration.from_millis(10)) # => Option.None
chan.send(42)
chan.receive_until(Duration.from_millis(10)) # => Option.Some(42)

send

Show source code
Hide source code
fn pub send(value: uni T) {
  @state.send(value)
}
fn pub send(value: uni T)

Sends a new value to self.

This method never blocks the calling process.

Examples

import std.sync (Channel)

let chan = Channel.new

chan.send(42)

Implemented traits

std.clone.

Clone

impl Clone[Channel[T]] for Channel