Rust Concurrency
Safe parallel programming with threads
๐งต What is Rust Concurrency?
Concurrency in Rust allows multiple threads to run simultaneously while maintaining memory safety. Rust prevents data races at compile time using ownership rules, making concurrent programming safer and more reliable.
use std::thread;
// Simple thread example
let handle = thread::spawn(|| {
println!("Hello from thread!");
});
handle.join().unwrap();
Concurrency Concepts
Threads
OS threads for parallel execution
use std::thread;
thread::spawn(|| {
println!("In a thread!");
});
Channels
Message passing between threads
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
tx.send("Hello").unwrap();
Mutex
Mutual exclusion for shared data
use std::sync::Mutex;
let m = Mutex::new(5);
let mut num = m.lock().unwrap();
Arc
Atomic reference counting for sharing
use std::sync::Arc;
let data = Arc::new(vec![1, 2, 3]);
let data_clone = Arc::clone(&data);
๐น Basic Thread Example
Creating and joining threads:
use std::thread;
use std::time::Duration;
fn main() {
// Spawn a new thread
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Thread: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// Main thread work
for i in 1..5 {
println!("Main: {}", i);
thread::sleep(Duration::from_millis(150));
}
// Wait for the spawned thread to finish
handle.join().unwrap();
println!("All threads finished!");
}
Output:
Main: 1
Thread: 1
Thread: 2
Main: 2
Thread: 3
Thread: 4
Main: 3
...
All threads finished!
๐น Message Passing with Channels
Communicate between threads using channels:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends messages
thread::spawn(move || {
let messages = vec![
"Hello",
"from",
"the",
"thread",
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
// Receive messages in main thread
for received in rx {
println!("Received: {}", received);
}
println!("All messages received!");
}
Output:
Received: Hello
Received: from
Received: the
Received: thread
All messages received!
๐น Shared State with Mutex
Share data safely between threads using Mutex:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Shared counter wrapped in Arc>
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
// Spawn 10 threads
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// Lock the mutex to access the data
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
// Wait for all threads to complete
for handle in handles {
handle.join().unwrap();
}
// Print the final result
println!("Result: {}", *counter.lock().unwrap());
}
Output:
Result: 10
๐น Producer-Consumer Pattern
Classic concurrency pattern with multiple producers:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Multiple producers
for id in 0..3 {
let tx = tx.clone();
thread::spawn(move || {
for i in 0..5 {
let message = format!("Producer {} - Message {}", id, i);
tx.send(message).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
}
// Drop the original sender
drop(tx);
// Consumer
thread::spawn(move || {
for received in rx {
println!("Consumer received: {}", received);
}
println!("Consumer finished!");
}).join().unwrap();
}
Output:
Consumer received: Producer 0 - Message 0
Consumer received: Producer 1 - Message 0
Consumer received: Producer 2 - Message 0
Consumer received: Producer 0 - Message 1
...
Consumer finished!
๐น Thread Pool Example
Manage multiple worker threads:
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
struct ThreadPool {
workers: Vec,
sender: mpsc::Sender,
}
type Job = Box;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} executing job", id);
job();
});
Worker { id, thread }
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} completed", i);
thread::sleep(std::time::Duration::from_millis(500));
});
}
thread::sleep(std::time::Duration::from_secs(3));
}