线程同步:消息传递
注意:在 rust 线程中借用外部的引用必须拥有 'static
生命周期。
在多线程间有多种方式可以共享和传递数据,最常用有两种:
对于消息传递,在编程界有一个大名鼎鼎的 Actor 线程模型为其背书,典型的有 Erlang 语言,还有 Go 语言。
在 Go 语言中有一句很经典的话:
Do not communicate by sharing memory; instead, share memory by communicating
不要通过共享内存来进行通信,而是通过通信来共享内存
简单理解:尽量避免访问同一块内存空间来通信,因为它会造成的并发问题如竞争条件(Race condition),死锁(Deadlocks)等。
而是应该通过消息通知(触发)进行数据传递,例如消息队列、Socket 等方法。不同进程或线程之间通过这些通信机制共享数据,避免共享内存造成的并发问题。
与 Go 语言直接内置 chan 关键字不同,rust 通过标准库的 channel
提供消息通道。
消息常常被视为信息的反映形式之一,是信息的外壳。但消息/信息没有个统一认可的定义。在香浓的《通信数学理论》中,他认为:
从通信角度看,信息是通信的内容。通信的目的就是要减少或消除接收端(信宿)对于发出端(信源)可能会发出哪些消息的不确定性。
所谓不确定性,就是指人们对客观事物的不了解或不清楚程度。
人们通过某种方式或手段,获取了新的情况或知识,就可从对客观事物的不清楚变为较清楚或完全清楚,不确定性也就减少或消除了。
这种使人们减少或消除不确定性的东西就是信息。
简单理解,消息是发送者发信息给接收者的音讯,它更多的是指一个音讯整体,包含发送者和接收者。
消息通过消息通道进行传播,一个消息通道可以传播多个消息,因此消息通道应该支持多个发送者和接收者。
在实际使用中,需要使用不同的库来满足诸如:多发送者 -> 单接收者
,多发送者 -> 多接收者
等场景形式。
消息管道一般不区分单发送者和多发送者,因为支持多发送者就是支持单发送者。
- 在实际应用中,通常需要多个发送者向同一个接收者发送消息,单发送者的场景相对较少。
- 多发送者形式更加灵活和通用,能满足单个发送者功能。
- 从设计的角度来看,多发送者形式更加符合消息管道的本质。消息管道的目的是将消息从发送者传递到接收者,而不管发送者和接收者的数量。
当发送者或接收者任一被丢弃时可以认为通道被关闭(closed)了。
多发送者,单接收者
标准库提供了通道 std::sync::mpsc
,其中 mpsc
是 multiple producer, single consumer
的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。
当然,支持多个发送者也意味着支持单个发送者。
在实际使用过程中,发送者 transmitter
常被简写为 tx
,接收者 receiver
被简写为 rx
。
单发送者,单接收者
1 2 3 4 5 6 7 8 9 10 11
|
let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || { tx.send(1).unwrap();
}); println!("{}", rx.recv().unwrap());
|
以上代码并不复杂,但仍有几点需要注意:
- tx,rx 对应发送者和接收者,它们的类型由编译器自动推导: 因为 tx.send(1) 发送了整数,所以编译器推导它们分别是
mpsc::Sender<i32>
和 mpsc::Receiver<i32>
类型
- 由于通道内部是泛型实现,一旦类型被推导确定,该通道就只能传递对应类型的值,否则会导致类型错误。
- 接收消息的操作 rx.recv() 会阻塞当前线程,直到读取到值,或者通道被关闭
- 需要使用 move 将 tx 的所有权转移到子线程的闭包中
send 方法返回一个 Result<T,E>
,说明它有可能返回一个错误,例如接收者被 drop 导致发送的值不会被任何人接收,此时继续发送毫无意义,因此返回一个错误最为合适。
同样的,对于 recv 方法来说,当发送者关闭时,它也会接收到一个错误,用于说明不会再有任何值被发送过来。
不阻塞的 try_recv
recv 方法在通道中没有消息时会阻塞当前线程,如果不希望阻塞线程,可以使用 try_recv,try_recv 会尝试接收一次消息,如果通道中没有消息,会立刻返回一个错误。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| let (tx, rx) = mpsc::channel(); thread::spawn(move || { tx.send(1); }); match rx.try_recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), } match rx.recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), } match rx.try_recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), }
|
由于子线程的创建需要时间,第一个 match rx.try_recv
执行时子线程的消息还未发出。因为消息没有发出,try_recv 在立即尝试读取一次消息后就会报错,返回 empty channel 错误。
当子线程创建成功且发送消息后,主线程会接收到 Ok(1) 的消息内容,紧接着子线程结束,发送者也随着被 drop,此时接收者又会报错,但是这次错误原因有所不同:closed channel 代表发送者已经被关闭。
传输数据的所有权
使用通道来传输数据,一样要遵循 Rust 的所有权规则:
- 若值的类型实现了 Copy 特征,则直接复制一份该值,然后传输
- 若值没有实现 Copy 特征,则它的所有权会被转移给接收端,在发送端继续使用该值将报错
1 2 3 4 5 6 7 8
| let (tx, rx) = mpsc::channel(); thread::spawn(move || { let s = String::from("Hello World"); tx.send(s); }); println!("{}", rx.recv().unwrap());
|
假如没有所有权的保护,String 字符串将被两个线程同时持有,任何一个线程对字符串内容的修改都会导致另外一个线程持有的字符串被改变,除非故意这么设计,否则这就是不安全的隐患。
循环接收消息
消息通道中的消息数量是不确定的,为了方便接收所有消息以及在通道关闭时自动停止接收者接收消息,rust 为接收者 Receiver 实现了可迭代特征协议(IntoIterator)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| impl<T> Iterator for IntoIter<T> { type Item = T; fn next(&mut self) -> Option<T> { self.rx.recv().ok() } }
impl<T> IntoIterator for Receiver<T> { type Item = T; type IntoIter = IntoIter<T>;
fn into_iter(self) -> IntoIter<T> { IntoIter { rx: self } } }
|
rx.recv()
阻塞当前线程直到发送者或通道关闭,结合迭代器说明可以对 rx
进行循环操作,即可取出通道内的所有消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| let (tx, rx) = mpsc::channel(); thread::spawn(move || { let msgs = vec![ String::from("Test"), String::from("Hello"), String::from("World"), String::from("!"), ]; for msg in msgs { tx.send(msg); } });
match rx.recv() { Ok(msg) => println!("{msg}"), Err(e) => eprintln!("{e}"), }
for msg in rx { print!("{msg}"); }
|
mpsc 的多发送者
发送者 Sender 和 Arc 一样实现了 Send 特征,可以在多线程中共享数据。
使用多发送者时,和在多线程中使用 Arc 一样,复制一份引用即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| let (tx, rx) = mpsc::channel(); let count = 5; let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(count); for i in 0..count { let _tx = Sender::clone(&tx); handles.push(thread::spawn(move || { _tx.send(i).unwrap(); })); }
drop(tx);
for msg in rx { println!("{}", msg); }
|
有几点需要注意:
- 需要所有的发送者都被 drop 掉后,接收者 rx 才会收到错误,进而跳出 for 循环,最终结束主线程,因此要提前销毁 tx
- 由于子线程谁先创建完成是未知的,因此哪条消息先发送也是未知的,最终主线程消息的输出顺序也不确定
同步和异步通道
Rust 标准库的 mpsc 通道其实分为两种类型:同步和异步。
异步通道
异步:发送操作不会阻塞当前线程,无论消息是否被接收,继续执行当前线程。即无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| let (tx, rx) = mpsc::channel(); let count = 2; let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(count); for i in 0..count { let _tx = Sender::clone(&tx); handles.push(thread::spawn(move || { println!("发送之前"); _tx.send(i).unwrap(); println!("发送之后"); })); }
thread::sleep(Duration::from_secs(2));
drop(tx);
for msg in rx { println!("{}", msg); }
|
主线程因为睡眠阻塞了 2 秒,并没有进行消息接收,而子线程却在此期间轻松完成了消息的发送。发送之前和发送之后是连续输出的,没有受到接收端主线程的任何影响,
等睡眠结束后,主线程才姗姗来迟的从通道中接收了子线程老早之前发送的消息。因此通过 mpsc::channel
创建的通道是异步通道。
同步通道
与异步通道相反,同步通道的发送者发送操作是可以阻塞当前线程的,只有等发送者发出的消息被接收后,发送者所在的线程才会解除阻塞并继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| let (tx, rx) = mpsc::sync_channel(0); let count = 3; for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { println!("同步通道,发送之前,idx = {i}"); _tx.send(i).unwrap(); println!("同步通道,发送之后,idx = {i}"); }); }
thread::sleep(Duration::from_secs(2)); drop(tx); for msg in rx { println!("同步通道,接收消息,idx = {}", msg); }
|
在同步通道中,当发送者发出消息时,只有等发出的消息被接收后,发送者所在的线程才会解除阻塞并继续执行。
消息缓存
同步通道可以指定通道长度,即同步通道的消息缓存条数。
当设定为 N 时,发送者可以无阻塞的往通道中发送 N 条消息。
当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么从第 N+1 条消息开始将触发发送阻塞)。
在同步通道中,在生成通道时指定通道长度:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
let (tx, rx) = mpsc::sync_channel(2); let count = 3; for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { println!("前,idx = {i}"); _tx.send(i).unwrap(); println!("后,idx = {i}"); }); }
drop(tx); for msg in rx { println!("接收,idx = {}", msg); }
|
异步通道没有这个缓冲值参数 mpsc::channel(),事实上异步通道的缓冲上限取决于内存大小。
虽然异步消息非常高效不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。
因此,在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。
关闭通道
所有发送者被 drop 或者所有接收者被 drop 后,引用计数归 0,rust 根据发送者和接收者实现的 Drop 特征自动关闭通道。
这种设计平衡了性能和灵活性,维护开销非常低并且是在线程安全的基础上进行的。
传输多种类型的数据
一个消息通道只能传输一种类型的数据,如果需要传输多种类型的数据,可以为每个类型创建一个通道,或者使用枚举类型来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #[derive(Debug)] enum Fruit { Apple(u8), Orange(String), } let count = 5; let (tx, rx) = mpsc::sync_channel(3); for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { if i < 3 { _tx.send(Fruit::Apple(i as u8)).unwrap(); } else { _tx.send(Fruit::Orange(i.to_string())); } }); } drop(tx); for msg in rx { println!("{:?}", msg); }
|
有一点需要注意,Rust 会按照枚举中占用内存最大的那个成员进行内存对齐,这意味着即使传输枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同, 因此会造成内存上的浪费。
可能遇到的问题
在以上使用循环生成线程、发送消息的代码中,都会在调用接收者之前手动释放发送者的一个引用。
如果去掉手动释放这个引用的过程,会发现运行后主线程会一直阻塞,最后一行打印输出也不会被执行。
原因在于:子线程拿走的是复制后 send 的所有权,这些拷贝会在子线程结束后被 drop,因此无需担心,但是 send 本身要到 main 函数的结束才会被 drop。
mpsc 通道关闭的两种方式:发送者全部 drop 或接收者被 drop,要结束 for 循环显然是要求发送者全部 drop,但是由于 send 自身没有被 drop,会导致该循环永远无法结束,最终主线程会一直阻塞。
更好的性能
如果需要 mpmc(多发送者,多接收者)或者需要更高的性能,可以考虑第三方库:
- crossbeam-channel, 老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了 crossbeam 主仓库中
- flume, 官方给出的性能数据某些场景要比 crossbeam 更好些
Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
| fn main() { let (tx, rx) = mpsc::channel();
let handle = thread::spawn(move || { tx.send(1).unwrap();
}); println!("{}", rx.recv().unwrap());
let (tx, rx) = mpsc::channel(); thread::spawn(move || { tx.send(1); }); match rx.try_recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), } match rx.recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), } match rx.try_recv() { Ok(n) => println!("{n}"), Err(e) => eprintln!("{}", e), }
let (tx, rx) = mpsc::channel(); thread::spawn(move || { let s = String::from("Hello World"); tx.send(s); }); println!("{}", rx.recv().unwrap());
let (tx, rx) = mpsc::channel(); thread::spawn(move || { let msgs = vec![ String::from("Test"), String::from("Hello"), String::from("World"), String::from("!"), ]; for msg in msgs { tx.send(msg); } }); match rx.recv() { Ok(msg) => println!("{msg}"), Err(e) => eprintln!("{e}"), } for msg in rx { println!("{msg}"); }
let (tx, rx) = mpsc::channel(); let count = 5; let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(count); for i in 0..count { let _tx = Sender::clone(&tx); handles.push(thread::spawn(move || { _tx.send(i).unwrap(); })); } drop(tx); for msg in rx { println!("{}", msg); }
let (tx, rx) = mpsc::channel(); let count = 2; let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(count); for i in 0..count { let _tx = Sender::clone(&tx); handles.push(thread::spawn(move || { println!("发送之前"); _tx.send(i).unwrap(); println!("发送之后"); })); } thread::sleep(Duration::from_secs(2)); drop(tx); for msg in rx { println!("{}", msg); }
let (tx, rx) = mpsc::sync_channel(0); let count = 3; for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { println!("同步通道,发送之前,idx = {i}"); _tx.send(i).unwrap(); println!("同步通道,发送之后,idx = {i}"); }); } thread::sleep(Duration::from_secs(2)); drop(tx); for msg in rx { println!("同步通道,接收消息,idx = {}", msg); }
println!("====================================================="); let (tx, rx) = mpsc::sync_channel(2); let count = 3; for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { println!("前,idx = {i}"); _tx.send(i).unwrap(); println!("后,idx = {i}"); }); } drop(tx); for msg in rx { println!("接收,idx = {}", msg); }
#[derive(Debug)] enum Fruit { Apple(u8), Orange(String), } let count = 5; let (tx, rx) = mpsc::sync_channel(3); for i in 0..count { let _tx = SyncSender::clone(&tx); thread::spawn(move || { if i < 3 { _tx.send(Fruit::Apple(i as u8)).unwrap(); } else { _tx.send(Fruit::Orange(i.to_string())); } }); } drop(tx); for msg in rx { println!("{:?}", msg); } }
|