Structured Concurrency
Structured Concurrency
Section titled “Structured Concurrency”Build concurrent programs where tasks never leak and errors never hide.
Time: 60 minutes
Level: Intermediate
Prerequisites: Error Handling tutorial, basic Janus syntax
What you’ll learn: Nurseries, spawn, await, channels, select, and concurrent error handling
Profile: :service
Why Structured Concurrency?
Section titled “Why Structured Concurrency?”If you come from Go, you know the pattern: go func() launches a goroutine and you hope nothing goes wrong. If the parent returns, the goroutine becomes an orphan. If it panics, the stack trace is useless. If you forget to drain a channel, you deadlock at 3 AM.
Janus rejects this. In Janus, every concurrent task belongs to a nursery — a structured scope that enforces a simple guarantee:
No task can outlive its nursery. When a nursery exits, all children are done.
This is not a convention. It is enforced by the compiler.
The Mental Model
Section titled “The Mental Model”Think of a nursery as a supervisor at a swimming pool. Children can run around inside the pool area (the nursery scope), but when the supervisor says “time to go” — every child must be accounted for before anyone leaves. No child is left behind. No orphans.
Step 1: Your First Nursery (10 min)
Section titled “Step 1: Your First Nursery (10 min)”The Nursery Pattern
Section titled “The Nursery Pattern”A nursery creates a structured concurrency scope. You spawn tasks inside it. The nursery waits for all tasks to complete before continuing.
func main() do println("Before nursery")
nursery do spawn say_hello("Alice") spawn say_hello("Bob") spawn say_hello("Charlie") end
// This line only executes after ALL spawned tasks finish println("After nursery -- all tasks complete")end
func say_hello(name: String) do println("Hello from ", name)end$ janus run hello_concurrent.janBefore nurseryHello from AliceHello from BobHello from CharlieAfter nursery -- all tasks completeThe order of “Hello from” messages may vary between runs — they execute concurrently on the M:N fiber scheduler. But the final line always prints last. That is the nursery guarantee.
What Happens Under the Hood
Section titled “What Happens Under the Hood”nursery do...endcreates a concurrency scope on the scheduler- Each
spawnlaunches a fiber — a lightweight green thread (~4KB stack) - The scheduler distributes fibers across OS worker threads (work-stealing)
- When all spawned fibers finish, execution continues past
end
The fibers are cooperative — they yield at natural points (channel operations, await, explicit yield). No preemption surprises.
Step 2: Getting Results with Await (10 min)
Section titled “Step 2: Getting Results with Await (10 min)”Task Handles and Await
Section titled “Task Handles and Await”spawn returns a task handle. Use await to yield the current fiber until that task completes and retrieve its result.
func main() do nursery do let handle_a = spawn compute(21) let handle_b = spawn compute(11)
// await yields the current fiber until the target completes let result_a = await handle_a let result_b = await handle_b
println("Results: ", result_a, " + ", result_b, " = ", result_a + result_b) endend
func compute(x: i64) -> i64 do // Simulate work return x * 2end$ janus run compute.janResults: 42 + 22 = 64Await Semantics
Section titled “Await Semantics”await is a fiber yield — not a busy-wait. When you await handle_a:
- If the task already finished,
awaitreturns immediately (fast path — atomic CAS check) - If the task is still running, the current fiber suspends and the scheduler runs other work
- When the awaited task completes, the scheduler wakes the waiting fiber
This means await is cheap. You are not blocking a thread; you are parking a fiber.
Await with Error Handling
Section titled “Await with Error Handling”If the spawned task can fail, await propagates the error:
func main() do nursery do let handle = spawn fetch_data("https://api.example.com/data")
let data = await handle catch |err| do println("Fetch failed: ", err) return end
println("Got data: ", data) endend
func fetch_data(url: String) !String do // ... network request that might fail if url == "" do fail EmptyUrl end return "response body"endStep 3: Channels — Communicating Between Tasks (15 min)
Section titled “Step 3: Channels — Communicating Between Tasks (15 min)”The Problem: Shared State
Section titled “The Problem: Shared State”Shared mutable state is the root of most concurrency bugs. Two fibers writing to the same variable without synchronization produces data races. Locks solve this but introduce deadlocks and priority inversion.
The Solution: CSP Channels
Section titled “The Solution: CSP Channels”Janus uses Communicating Sequential Processes (CSP) — the same model behind Go channels and Erlang message passing. Instead of sharing memory, tasks communicate by sending messages through typed channels.
“Don’t communicate by sharing memory. Share memory by communicating.”
Channel Basics
Section titled “Channel Basics”func main() do // Create a channel with capacity 1 let ch = channel(1)
nursery do spawn producer(ch) spawn consumer(ch) endend
func producer(ch: Channel(String)) do ch.send("Hello") ch.send("World") ch.close()end
func consumer(ch: Channel(String)) do while let msg = ch.recv() do println("Received: ", msg) end println("Channel closed, consumer done")end$ janus run channels.janReceived: HelloReceived: WorldChannel closed, consumer doneChannel Capacity
Section titled “Channel Capacity”The capacity parameter controls buffering:
channel(0)— Unbuffered.sendblocks until a receiver is ready. Tight synchronization.channel(1)— Single buffer.sendblocks only if the buffer is full.channel(N)— Buffered. Up to N messages queued beforesendblocks.
The Producer-Consumer Pattern
Section titled “The Producer-Consumer Pattern”This is the bread and butter of concurrent programming. One task produces work items, another consumes them:
func main() do let jobs = channel(10) let results = channel(10)
nursery do // Producer: generate work spawn func() do for i in 0..20 do jobs.send(i) end jobs.close() end
// Workers: process jobs concurrently for w in 0..4 do spawn worker(w, jobs, results) end
// Collector: gather results spawn func() do var total: i64 = 0 for i in 0..20 do let result = results.recv() total = total + result end println("Total: ", total) results.close() end endend
func worker(id: i64, jobs: Channel(i64), results: Channel(i64)) do while let job = jobs.recv() do // Simulate processing let result = job * job results.send(result) endendChannel API Reference
Section titled “Channel API Reference”| Method | Description |
|---|---|
channel(cap) | Create a new typed channel with given capacity |
ch.send(val) | Send a value. Blocks if buffer is full. |
ch.recv() | Receive a value. Blocks if buffer is empty. Returns null when closed. |
ch.trySend(val) | Non-blocking send. Returns false if buffer is full. |
ch.tryRecv() | Non-blocking receive. Returns null if buffer is empty. |
ch.close() | Close the channel. Pending recv() calls return null. |
ch.isClosed() | Check if the channel has been closed. |
Step 4: Select — Multiplexing Channels (10 min)
Section titled “Step 4: Select — Multiplexing Channels (10 min)”The Problem: Multiple Channels
Section titled “The Problem: Multiple Channels”When a task needs to listen on multiple channels simultaneously, you cannot just recv() on one — you would block and miss messages on the others.
The Solution: Select
Section titled “The Solution: Select”select waits on multiple channel operations and executes the first one that becomes ready:
func main() do let data_ch = channel(1) let quit_ch = channel(1)
nursery do // Producer sends data then quit signal spawn func() do for i in 0..5 do data_ch.send(i) end quit_ch.send(true) end
// Consumer uses select to multiplex spawn func() do var running = true while running do select do recv data_ch |val| do println("Data: ", val) end recv quit_ch |_| do println("Quit signal received") running = false end end end end endend$ janus run select.janData: 0Data: 1Data: 2Data: 3Data: 4Quit signal receivedSelect with Send
Section titled “Select with Send”You can also use select for non-blocking sends:
select do send output_ch(result) do println("Sent result") end recv cancel_ch |_| do println("Cancelled before send") endendSelect with Timeout
Section titled “Select with Timeout”Add a timeout case to avoid blocking forever:
select do recv data_ch |val| do process(val) end timeout 5000 do println("No data received in 5 seconds") endendSelect with Default
Section titled “Select with Default”A default case makes the entire select non-blocking:
select do recv ch |val| do process(val) end default do // No channel ready -- do something else do_other_work() endendStep 5: Error Handling in Nurseries (10 min)
Section titled “Step 5: Error Handling in Nurseries (10 min)”Fail-Fast Semantics
Section titled “Fail-Fast Semantics”When any task inside a nursery fails, the nursery cancels all remaining siblings and propagates the error to the parent scope. This is fail-fast behavior — no silent failures, no zombie tasks.
func main() do let result = nursery do spawn reliable_task() spawn failing_task() spawn another_task() end catch |err| do println("Nursery failed: ", err) endend
func reliable_task() do println("Reliable: started") // Simulated work println("Reliable: finished")end
func failing_task() !void do println("Failing: started") fail ConnectionTimeoutend
func another_task() do println("Another: started") // This task will be cancelled when failing_task fails println("Another: finished")endWhen failing_task fails:
- The nursery’s cancel token fires
- All siblings (
reliable_task,another_task) receive cancellation - The nursery waits for all fibers to wind down (via
defercleanup) - The error propagates to the
catchblock inmain()
Catching Errors at the Nursery Boundary
Section titled “Catching Errors at the Nursery Boundary”The nursery boundary is where you decide recovery strategy:
func process_batch(items: []Item) do for item in items do // Each item gets its own nursery -- isolated error domain nursery do spawn validate(item) spawn enrich(item) spawn store(item) end catch |err| do println("Failed to process item: ", item.id, " -- ", err) // Continue with next item -- the failed nursery is fully cleaned up end endendEach loop iteration creates a fresh nursery. If processing one item fails, the nursery cleans up its tasks, the error is caught, and the next item proceeds. This is an isolated error domain — failure in one nursery cannot corrupt another.
Nested Nurseries and Error Propagation
Section titled “Nested Nurseries and Error Propagation”Errors propagate outward through nested nurseries:
func service() do nursery |root| do spawn func() do // Inner nursery -- isolated scope nursery |inner| do spawn risky_operation() end // If risky_operation fails, inner nursery catches it // and propagates to THIS task, which fails root nursery end end catch |err| do println("Service error: ", err) endendStep 6: Real-World Pattern — Session Supervisor (5 min)
Section titled “Step 6: Real-World Pattern — Session Supervisor (5 min)”Here is a production-grade pattern that combines everything: a session supervisor that manages multiple client connections with isolated error domains, graceful shutdown, and channel-based coordination.
error SessionError { AuthFailed, Timeout, ProtocolViolation,}
func run_server(port: i64) do let shutdown_ch = channel(1) let connections = channel(100)
nursery |root| do // Acceptor: listens for incoming connections spawn func() do while not shutdown_ch.isClosed() do let conn = accept(port) connections.send(conn) end end
// Session supervisor: manages client sessions spawn func() do while let conn = connections.recv() do // Each session gets its own nursery -- isolated error domain spawn func() do handle_session(conn) catch |err| do match err { SessionError.AuthFailed => do log("Auth failed for ", conn.addr) end, SessionError.Timeout => do log("Session timed out: ", conn.addr) end, _ => do log("Session error: ", err) end, } end end end end
// Shutdown listener spawn func() do wait_for_signal() println("Shutting down...") shutdown_ch.close() connections.close() end end // root nursery ensures ALL connections are drained, // ALL sessions are cleaned up, and ALL fibers are done // before this line executes. println("Server stopped cleanly")end
func handle_session(conn: Connection) !void do let user = try authenticate(conn) defer conn.close()
nursery do // Reader fiber spawn func() do while let msg = conn.read() do process_message(user, msg) end end
// Heartbeat fiber spawn func() do while not conn.isClosed() do sleep(30_000) conn.ping() catch |_| do fail SessionError.Timeout end end end endendThis pattern gives you:
- No orphan connections — nursery guarantee
- Isolated failures — one client crash does not affect others
- Graceful shutdown — closing channels triggers orderly wind-down
- Resource cleanup —
defer conn.close()runs on all exit paths
What You Learned
Section titled “What You Learned”Concurrency Primitives
Section titled “Concurrency Primitives”| Primitive | Purpose |
|---|---|
nursery do...end | Structured concurrency scope — no orphan tasks |
spawn fn(args) | Launch a fiber task, returns a handle |
await handle | Yield fiber until target completes, get result |
channel(cap) | Typed CSP channel for inter-task communication |
select do...end | Multiplex over multiple channel operations |
Key Principles
Section titled “Key Principles”- Nurseries enforce structure — all tasks complete before the scope exits
- Spawn returns handles —
awaitretrieves results without blocking threads - Channels replace shared state — typed, bounded, closeable
- Select multiplexes — listen on many channels, act on the first ready
- Fail-fast in nurseries — one failure cancels siblings, error propagates up
- Nested nurseries isolate — each nursery is its own error domain
Challenges
Section titled “Challenges”- Spawn 10 fibers that each print their index. Observe non-deterministic ordering.
- Create a channel-based ping-pong between two fibers.
- Use
awaitto fan out 5 HTTP requests and collect results.
Medium
Section titled “Medium”- Build a concurrent file processor: one fiber reads lines, another processes them, a third writes output.
- Implement a rate limiter using a channel as a token bucket.
- Create a timeout wrapper: spawn a task and a timer,
selecton whichever finishes first.
Advanced
Section titled “Advanced”- Build a connection pool with bounded capacity using channels.
- Implement a pub/sub broker where publishers and subscribers are fibers communicating through channels.
- Design a pipeline of processing stages connected by channels, with backpressure propagation.
Next Steps
Section titled “Next Steps”- Concurrency Reference — complete API documentation for all primitives
- Scheduler Internals — how the M:N work-stealing scheduler operates
- Janus vs Elixir — how Janus concurrency compares to BEAM
“A nursery is a promise: no child left behind, no task forgotten, no error silenced.”