Skip to content

Concurrency

For concurrency Inko uses "lightweight processes", also known as green threads. Processes are isolated from each other and don't share memory, making race conditions impossible. Communication between processes is done by sending messages, which look like regular method calls. Messages are processed in FIFO order. Values passed with these messages have their ownership transferred to the receiving process.

Processes run concurrently, and Inko's scheduler takes care of balancing the workload across OS threads.

A process finishes when it has no more messages to process, and no references to the process exist.

Processes are cheap to spawn, with a single empty process needing less than 1 KiB of memory.

Note

The size of processes is subject to change, and we expect it to grow to 2 KiB in the future (similar to Go and Erlang).

A process is defined using the syntax class async:

class async Counter {}

The main process

Each program starts with a single process called "Main". This process runs on the main thread, while other processes run on different threads. For this the Inko runtime uses a pool of threads, balancing work between these threads automatically.

The main process must be defined explicitly, and must define the async method "main":

class async Main {
  fn async main {

  }
}

When the main process finishes and no references to it exist, the program is ended; even if other processes are still running.

Defining fields

A process can define zero or more fields:

class async Counter {
  let @value: Int
}

While the field types don't have to be sendable, when creating an instance the assigned value must be sendable. For example:

class async List {
  let @values: Array[Int]
}

To assign the @values field when creating an instance of List, we'd have to assign it a unique value:

List { @values = recover [10, 20] }

Since a uni Array[Int] can be moved into a Array[Int], this is valid. Had we assigned it a regular array the program would not compile, because Array[Int] isn't sendable.

Spawning processes

A process is spawned by creating an instance of its class. In the above example, List { ... } spawns the process for us, then gives us an owned value pointing to the process. When spawning a process it doesn't start running right away, instead it will wait for its first message.

Defining messages

Messages are defined by defining methods with the async keyword. A message is just an asynchronous method call, optionally writing its result to a future. The arguments, return type and throw type of an async method must be sendable (see Memory management for more information).

Here's how you'd define a message that just writes to STDOUT:

import std::stdio::STDOUT

class async Example {
  fn async write(message: String) {
    STDOUT.new.print(message)
  }
}

Sending messages

Sending messages uses the same syntax as regular method calls:

class async Counter {
  let @value: Int

  fn async mut increment {
    @value += 1
  }

  fn async value -> Int {
    @value.clone
  }
}

class async Main {
  fn async main {
    let counter = Counter { @value = 0 }

    counter.increment
    counter.value # => 1
  }
}

When using this syntax, the sender is suspended until the receiver produces a response. If we don't want to wait right away, we can do so using the async keyword:

let counter = Counter { @value = 0 }

async counter.increment # => Future[Nil, Never]
async counter.value     # => Future[Int, Never]

When using the async keyword we get back a value of type Future[T, E], where T is the message's return type, and E the message's throw type (defaulting to Never). To resolve the future you have to call await on it:

let counter = Counter { @value = 0 }

async counter.increment

let future = async counter.value

future.await # => 1

If the message may throw, the error has to be handled when calling await. This isn't needed if the future's error type is Never, such as in the above example.

await waits until a result is produced. If you want to limit the amount of time spent waiting, use Future.await_for instead:

import std::time::Duration

class async Counter {
  let @value: Int

  fn async mut increment {
    @value += 1
  }

  fn async value -> Int {
    @value.clone
  }
}

class async Main {
  fn async main {
    let counter = Counter { @value = 0 }

    async counter.increment

    let future = async counter.value

    future.await_for(Duration.from_secs(1))
  }
}

In this case the return type of await_for is Option[Int], and a None is produced if a result wasn't produced within the specified time limit.

Polling futures

Sometimes you have multiple futures, and you want to act as soon as the underlying message produces its result. For this there's the poll method from the std::process module. This method takes an array of futures and waits until one or more futures are ready. Its return value is an array of ready futures, and the input array is modified in place so it no longer contains these futures:

import std::process::(poll)

class async Runner {
  fn async run {
    # Just imagine this doing something that may take a long time
    # ...
  }
}

class async Main {
  fn async main {
    let runner1 = Runner {}
    let runner2 = Runner {}
    let pending = [async runner1.run, async runner2.run]

    while pending.length > 0 {
      poll(pending).into_iter.each fn (future) {
        # ...
      }
    }
  }
}

The run time of poll() is O(n) where n is the number of futures to poll. If you have a large list of futures to poll, it may be better to poll it in smaller chunks, or refactor your code such that polling isn't necessary in the first place.

Dropping processes

The owned value for a process is a value type. Internally processes use atomic reference counting to keep track of the number of incoming references. Imagine a process being a server, and each owned value/reference being a client. When the count reaches zero, the process is instructed to drop itself after it finishes running any remaining messages. This means that there may be some time between when the last reference to a process is dropped, and when the process itself is dropped.