Concurrency in Rust: A Deep Dive into Async/Await and Futures

• 45 min read
Programming Concepts

Rust’s async/await syntax provides a powerful, zero-cost abstraction for writing concurrent code. Unlike traditional threading models, Rust’s async model is built on top of futures - lazy, poll-based computations that enable efficient concurrency without the overhead of OS threads. This comprehensive guide explores Rust’s async/await in depth, covering futures, executors, the tokio runtime, and how to build high-performance concurrent applications.

Understanding Rust’s Concurrency Model

Why Async/Await in Rust?

Goals:

  • Zero-cost abstractions (no runtime overhead)
  • Memory safety without garbage collection
  • High performance concurrent I/O
  • Explicit concurrency control

Comparison with Threading:

  • Threads: OS-managed, expensive context switches, limited scalability
  • Async: User-space, cooperative multitasking, scales to millions of tasks

The Future Trait

Future: A value that will be available at some point

trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Key Characteristics:

  • Lazy: Futures don’t execute until polled
  • Non-blocking: Returns Pending when not ready
  • Composable: Can combine futures together

Async/Await Syntax

Basic Async Functions

Defining Async Functions:

async fn fetch_data() -> Result<String, Error> {
    // Async operations
    Ok("data".to_string())
}

What async fn Returns:

  • async fn returns a Future, not the actual value
  • Must be awaited to get the value

Calling Async Functions:

// Inside async function
let result = fetch_data().await?;

// Or using block_on (blocking)
let result = futures::executor::block_on(fetch_data())?;

Async Blocks

Creating Futures from Blocks:

let future = async {
    let data1 = fetch_data1().await?;
    let data2 = fetch_data2().await?;
    Ok(data1 + &data2)
};

Combining Futures

Sequential Execution:

async fn sequential() {
    let result1 = task1().await;
    let result2 = task2().await;
    // task2 waits for task1
}

Concurrent Execution:

use futures::future;

async fn concurrent() {
    let (result1, result2) = future::join(task1(), task2()).await;
    // Both execute concurrently
}

Select First to Complete:

use futures::future;

async fn select_first() {
    match future::select(task1(), task2()).await {
        future::Either::Left((result1, task2_remaining)) => {
            // task1 completed first
        }
        future::Either::Right((result2, task1_remaining)) => {
            // task2 completed first
        }
    }
}

Understanding Futures

How Futures Work

Poll-Based Execution:

1. Executor calls poll() on future
2. Future checks if ready:
   - Ready(value) → Return value
   - Pending → Return Pending, store Waker
3. When ready, Waker notifies executor
4. Executor polls again

Example: Simple Future:

struct SimpleFuture {
    value: Option<String>,
}

impl Future for SimpleFuture {
    type Output = String;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if let Some(value) = self.value.take() {
            Poll::Ready(value)
        } else {
            // Store waker for later notification
            // In real implementation, would set up to be woken
            Poll::Pending
        }
    }
}

Waker and Context

Waker: Mechanism to notify executor when future is ready

Context: Provides access to Waker

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    // Get waker from context
    let waker = cx.waker().clone();
    
    // Store waker, will be called when ready
    self.waker.store(Some(waker));
    
    Poll::Pending
}

Executors and Runtimes

What is an Executor?

Executor: Drives futures to completion by polling them

Responsibilities:

  • Poll futures when they might be ready
  • Manage task scheduling
  • Handle waker notifications
  • Provide runtime services

Tokio Runtime

Tokio: Most popular async runtime for Rust

Features:

  • Multi-threaded work-stealing scheduler
  • Async I/O (epoll, kqueue, IOCP)
  • Timers and time utilities
  • Channels and synchronization primitives

Creating Runtime:

// Single-threaded runtime
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
    // Async code
});

// Multi-threaded runtime
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
    tokio::spawn(async {
        // Concurrent task
    }).await?;
});

Using Tokio Macros:

#[tokio::main]
async fn main() {
    // Runtime created automatically
    let result = fetch_data().await?;
}

Other Runtimes

async-std: Alternative runtime, similar API to std

smol: Small, fast runtime

glommio: Thread-per-core runtime for high performance

Async I/O

Tokio Async I/O

Async TCP Server:

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    
    loop {
        let (mut socket, _) = listener.accept().await?;
        
        tokio::spawn(async move {
            let mut buf = [0; 1024];
            
            loop {
                match socket.read(&mut buf).await {
                    Ok(0) => return, // Connection closed
                    Ok(n) => {
                        if socket.write_all(&buf[0..n]).await.is_err() {
                            return;
                        }
                    }
                    Err(_) => return,
                }
            }
        });
    }
}

Key Points:

  • accept() returns a Future
  • read() and write() are async
  • tokio::spawn() creates concurrent tasks
  • Each connection handled concurrently

Async File I/O

use tokio::fs;

async fn read_file() -> Result<String, std::io::Error> {
    let contents = fs::read_to_string("file.txt").await?;
    Ok(contents)
}

Concurrency Patterns

Spawning Tasks

tokio::spawn: Spawn concurrent task

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Concurrent task
        "result".to_string()
    });
    
    let result = handle.await.unwrap();
}

Key Points:

  • Returns JoinHandle
  • Task runs concurrently
  • Must await handle to get result
  • Handle can be cancelled

Channels

mpsc (Multi-Producer Single-Consumer):

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    
    tokio::spawn(async move {
        tx.send("message").await.unwrap();
    });
    
    while let Some(message) = rx.recv().await {
        println!("Received: {}", message);
    }
}

oneshot: Single value channel

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    tx.send("value").unwrap();
});

let value = rx.await.unwrap();

broadcast: Multiple consumers

use tokio::sync::broadcast;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

tx.send("message").unwrap();

let msg1 = rx1.recv().await.unwrap();
let msg2 = rx2.recv().await.unwrap();

Synchronization Primitives

Mutex: Async mutex

use tokio::sync::Mutex;

let mutex = Mutex::new(0);

let mut guard = mutex.lock().await;
*guard += 1;
// Guard dropped here, lock released

RwLock: Async read-write lock

use tokio::sync::RwLock;

let lock = RwLock::new(0);

// Multiple readers
let read_guard = lock.read().await;

// Single writer
let mut write_guard = lock.write().await;
*write_guard += 1;

Semaphore: Limit concurrent access

use tokio::sync::Semaphore;

let semaphore = Semaphore::new(3); // Max 3 concurrent

for i in 0..10 {
    let permit = semaphore.acquire().await.unwrap();
    tokio::spawn(async move {
        // Critical section
        drop(permit); // Release permit
    });
}

Error Handling

Result and Error Propagation

Using ? Operator:

async fn fetch_data() -> Result<String, Error> {
    let response = http_client.get("url").await?;
    let body = response.text().await?;
    Ok(body)
}

Custom Error Types:

use thiserror::Error;

#[derive(Error, Debug)]
enum MyError {
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("HTTP error: {0}")]
    Http(#[from] reqwest::Error),
}

Error Handling Patterns

Combinators:

let result = fetch_data()
    .await
    .map_err(|e| CustomError::from(e))
    .and_then(|data| process_data(data));

Early Returns:

async fn process() -> Result<(), Error> {
    let data = fetch_data().await?;
    let processed = process_data(data).await?;
    save_data(processed).await?;
    Ok(())
}

Performance Considerations

Pin and Memory Layout

Pin: Ensures data doesn’t move in memory

Why Needed: Futures may contain self-referential data

use std::pin::Pin;

fn poll_future(mut future: Pin<&mut MyFuture>) -> Poll<String> {
    future.as_mut().poll(cx)
}

Zero-Cost Abstractions

Compile-Time Optimization:

  • Async/await compiles to state machines
  • No runtime overhead
  • Same performance as hand-written state machines

Example Transformation:

// Async code
async fn example() {
    let a = task1().await;
    let b = task2().await;
    a + b
}

// Compiles to state machine
enum ExampleFuture {
    State1(Task1Future),
    State2(Task1Future, Task2Future),
    Done,
}

Avoiding Blocking

Problem: Blocking operations block entire runtime

// BAD: Blocks runtime thread
async fn bad() {
    std::thread::sleep(Duration::from_secs(1));
}

// GOOD: Non-blocking sleep
async fn good() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

Blocking in Spawn Blocking:

// For CPU-bound or blocking I/O
let result = tokio::task::spawn_blocking(|| {
    // Blocking operation
    heavy_computation()
}).await?;

Advanced Patterns

Streams

Stream Trait: Async iterator

use futures::stream::{self, StreamExt};

let mut stream = stream::iter(vec![1, 2, 3]);

while let Some(item) = stream.next().await {
    println!("{}", item);
}

Processing Streams:

stream
    .map(|x| x * 2)
    .filter(|x| *x > 4)
    .collect::<Vec<_>>()
    .await;

Select and Race

select! Macro: Wait for first future to complete

use tokio::select;

select! {
    result = task1() => {
        println!("Task1 completed: {:?}", result);
    }
    result = task2() => {
        println!("Task2 completed: {:?}", result);
    }
}

race! Macro: Race multiple futures

use futures::future;

let result = future::race(task1(), task2()).await;

Timeouts and Cancellation

Timeout:

use tokio::time::{timeout, Duration};

match timeout(Duration::from_secs(5), slow_task()).await {
    Ok(result) => println!("Completed: {:?}", result),
    Err(_) => println!("Timeout"),
}

Cancellation:

use tokio::time::{sleep, Duration};

let handle = tokio::spawn(async {
    loop {
        sleep(Duration::from_secs(1)).await;
        println!("Working...");
    }
});

sleep(Duration::from_secs(5)).await;
handle.abort(); // Cancel task

Real-World Examples

HTTP Client

use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::Client::new();
    
    let response = client
        .get("https://api.example.com/data")
        .send()
        .await?;
    
    let body = response.text().await?;
    println!("{}", body);
    
    Ok(())
}

Concurrent Requests

use futures::future;

async fn fetch_multiple() -> Result<Vec<String>, Error> {
    let urls = vec!["url1", "url2", "url3"];
    
    let futures = urls.into_iter()
        .map(|url| fetch_url(url))
        .collect::<Vec<_>>();
    
    let results = future::join_all(futures).await;
    results.into_iter().collect()
}

Database Queries

use sqlx;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = sqlx::PgPool::connect("postgres://...").await?;
    
    let row = sqlx::query("SELECT * FROM users WHERE id = $1")
        .bind(1)
        .fetch_one(&pool)
        .await?;
    
    Ok(())
}

Best Practices

1. Use Async Throughout

Don’t Mix: Avoid mixing async and blocking code

// BAD
async fn mixed() {
    std::thread::sleep(Duration::from_secs(1)); // Blocks!
    async_operation().await;
}

// GOOD
async fn async_only() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    async_operation().await;
}

2. Handle Errors Properly

Use Result Types:

async fn can_fail() -> Result<String, MyError> {
    let data = fetch().await?;
    Ok(process(data)?)
}

3. Avoid Blocking the Runtime

Use spawn_blocking for CPU-bound work:

let result = tokio::task::spawn_blocking(|| {
    cpu_intensive_task()
}).await?;

4. Use Appropriate Concurrency

Don’t Over-Spawn:

// BAD: Too many tasks
for i in 0..1_000_000 {
    tokio::spawn(async move { /* ... */ });
}

// GOOD: Limit concurrency
let semaphore = Semaphore::new(100);
for i in 0..1_000_000 {
    let permit = semaphore.acquire().await?;
    tokio::spawn(async move {
        // Work
        drop(permit);
    });
}

Comparison with Other Languages

Rust vs JavaScript Async/Await

Similarities:

  • Both use async/await syntax
  • Both handle I/O efficiently

Differences:

  • Rust: Zero-cost, compile-time checked
  • JavaScript: Runtime overhead, dynamic

Rust vs Go Goroutines

Goroutines:

  • Runtime-managed
  • M:N threading
  • Simpler model

Rust Async:

  • Compile-time
  • More control
  • Better performance

Common Pitfalls

1. Blocking the Runtime

Problem: Blocking operations block entire runtime

Solution: Use async versions or spawn_blocking

2. Holding Locks Across Await

Problem: Deadlocks possible

// BAD: Lock held across await
let mut guard = mutex.lock().await;
async_operation().await; // May deadlock
drop(guard);

// GOOD: Release lock before await
{
    let mut guard = mutex.lock().await;
    // Use guard
} // Guard dropped
async_operation().await;

3. Forgetting to Await

Problem: Future created but not executed

// BAD: Future not awaited
async_task(); // Does nothing!

// GOOD: Await the future
async_task().await;

Conclusion

Rust’s async/await provides a powerful, zero-cost abstraction for concurrent programming. By understanding futures, executors, and the tokio runtime, developers can build high-performance concurrent applications that scale efficiently.

Key takeaways:

  1. Futures are lazy - must be polled to execute
  2. Use tokio runtime - most common async runtime
  3. Avoid blocking - use async versions of operations
  4. Handle errors - use Result types properly
  5. Use appropriate concurrency - don’t over-spawn tasks
  6. Understand Pin - needed for self-referential futures

Rust’s async/await model combines the safety of Rust’s type system with the performance of zero-cost abstractions, making it an excellent choice for building high-performance concurrent systems.