I'm trying to write a solution for this problem:
Simulate a simple stock exchange comprising a 4 threads, a stock generator, incrementor, and 2 broadcasters. The generator continually iterates through a stock vector, sending stocks to the incrementor. The incrementor receives the stock and adjusts the value/price of the stock, sending it to one of the 2 broadcasters. The broadcaster prints out the name and current value of the stock.
I'm always stuck at the part where I'm unable to update a stock. The main issue I always get is that the variable containing a mutable reference to stock is dropped too early. I've tried to do signalling with a condvar but my lecturer says that it is more complicated that it needs to be. So, I'm reworking it from the ground up. I understand that my entire concept/model on how to solve the problem is wrong but I am lost in what I'm supposed to actually do.
This is my code so far
fn main() {
let mut stocks = vec![
Stock::new("Apple"),
Stock::new("Microsoft"),
Stock::new("Tesla"),
Stock::new("Amazon"),
];
let stocks = Arc::new(Mutex::new(stocks.as_mut_slice()));
let (stock_tx, stock_rx) = unbounded(); // from crossbeam_channel
loop {
stock_generator(stocks.clone(), stock_tx.clone());
stock_incrementor(stocks_rx.clone());
}
}
fn stock_generator(stocks: Arc<Mutex<&mut [Stock]>>, stock_tx: CrossSender<&mut Stock>) {
thread::spawn(move || {
if let Ok(mut guard) = stocks.lock() {
for stock in guard.iter_mut() {
stock_tx.send(stock).unwrap();
}
}
});
}
fn stock_incrementor(stock_rx: CrossReceiver<&mut Stock>) {
thread::spawn(move || loop {
let stock = stock_rx.recv().unwrap();
stock.price += 1;
});
}
This is the crate I'm using for the unbounded
call where I create stock_tx
and stock_rx
.html
I also tried other things such as this:
fn main() {
let stocks = vec![
Stock::new("Apple"),
Stock::new("Microsoft"),
Stock::new("Tesla"),
Stock::new("Amazon"),
];
let stocks = Arc::new(Mutex::new(stocks));
let (stock_tx, stock_rx) = unbounded();
stock_generator(stocks.clone(), stock_tx.clone(), stock_rx.clone());
stock_incrementor(stock_tx.clone(), stock_rx.clone());
broadcaster(1, stock_rx.clone());
broadcaster(2, stock_rx.clone());
loop {}
}
fn stock_generator(
stocks: Arc<Mutex<Vec<Stock>>>,
stock_tx: CrossSender<Stock>,
stock_rx: CrossReceiver<Stock>,
) {
thread::spawn(move || loop {
// get updated stock if there is one
// otherwise use original vec
if let Ok(updated_stock) = stock_rx.try_recv() {
let mut guard = stocks.lock().unwrap();
for stock in guard.iter_mut() {
if stock.name == updated_stock.name {
*stock = updated_stock.clone();
}
}
} // guard is dropped here
let guard = stocks.lock().unwrap();
for stock in &*guard {
// send over to incrementor
stock_tx.send(stock.clone()).unwrap();
} // guard is dropped here
});
}
fn stock_incrementor(stock_tx: CrossSender<Stock>, stock_rx: CrossReceiver<Stock>) {
thread::spawn(move || loop {
let mut stock = stock_rx.recv().unwrap();
stock.price = stock.price.saturating_add(1);
stock_tx.send(stock).unwrap(); // send updated stock price back to generator and broadcaster
});
}
fn broadcaster(id: u8, stock_rx: CrossReceiver<Stock>) {
thread::spawn(move || loop {
if let Ok(stock) = stock_rx.try_recv() {
println!("{id}: {} at {}", stock.name, stock.price);
}
});
}
I get output which is wrong. Or it's the code that's wrong. I'm not sure because threads don't go in order so I have no idea what's going on right now.
1: Microsoft at 109
2: Apple at 136
1: Microsoft at 109 <--
2: Apple at 136
1: Apple at 136
2: Apple at 137
1: Microsoft at 107 <--
2: Tesla at 239
1: Amazon at 129
So Microsoft was at 109 but at the bottom it appears as 107. As to why this happens I have my guess. So what I have in my head is like this. The first iteration.
try_recv()
fails.guard
is obtained- Looping through the
Vec
inside theguard
and each item is sent over tostock_incrementor
via a channel.
So here stock_incrementor
gest to work. It receives stock
updates the price and sends it to the broadcaster via another channel. It also sends it to the stock_generator
thread.
guard
is dropped when the for loop instock_generator
finishes.
stock_generator
's work is done, it can just loop again. So it does. This is iteration two:
try_recv()
succeeds (assuming it does)- Repeat.
This second iteration only works properly if the stock_incrementor
has finished it's operation of basically += 1
. But what happens if it doesn't? It basically gets duplicate data sent from stock_generator
.
I edited the code a few more times and it didn't help much. I'm very much lost and I'm not sure exactly I should be correcting if there needs to be any correction. Any help is greatly appreciated.
I'm trying to write a solution for this problem:
Simulate a simple stock exchange comprising a 4 threads, a stock generator, incrementor, and 2 broadcasters. The generator continually iterates through a stock vector, sending stocks to the incrementor. The incrementor receives the stock and adjusts the value/price of the stock, sending it to one of the 2 broadcasters. The broadcaster prints out the name and current value of the stock.
I'm always stuck at the part where I'm unable to update a stock. The main issue I always get is that the variable containing a mutable reference to stock is dropped too early. I've tried to do signalling with a condvar but my lecturer says that it is more complicated that it needs to be. So, I'm reworking it from the ground up. I understand that my entire concept/model on how to solve the problem is wrong but I am lost in what I'm supposed to actually do.
This is my code so far
fn main() {
let mut stocks = vec![
Stock::new("Apple"),
Stock::new("Microsoft"),
Stock::new("Tesla"),
Stock::new("Amazon"),
];
let stocks = Arc::new(Mutex::new(stocks.as_mut_slice()));
let (stock_tx, stock_rx) = unbounded(); // from crossbeam_channel
loop {
stock_generator(stocks.clone(), stock_tx.clone());
stock_incrementor(stocks_rx.clone());
}
}
fn stock_generator(stocks: Arc<Mutex<&mut [Stock]>>, stock_tx: CrossSender<&mut Stock>) {
thread::spawn(move || {
if let Ok(mut guard) = stocks.lock() {
for stock in guard.iter_mut() {
stock_tx.send(stock).unwrap();
}
}
});
}
fn stock_incrementor(stock_rx: CrossReceiver<&mut Stock>) {
thread::spawn(move || loop {
let stock = stock_rx.recv().unwrap();
stock.price += 1;
});
}
This is the crate I'm using for the unbounded
call where I create stock_tx
and stock_rx
https://docs.rs/crossbeam/latest/crossbeam/channel/index.html
I also tried other things such as this:
fn main() {
let stocks = vec![
Stock::new("Apple"),
Stock::new("Microsoft"),
Stock::new("Tesla"),
Stock::new("Amazon"),
];
let stocks = Arc::new(Mutex::new(stocks));
let (stock_tx, stock_rx) = unbounded();
stock_generator(stocks.clone(), stock_tx.clone(), stock_rx.clone());
stock_incrementor(stock_tx.clone(), stock_rx.clone());
broadcaster(1, stock_rx.clone());
broadcaster(2, stock_rx.clone());
loop {}
}
fn stock_generator(
stocks: Arc<Mutex<Vec<Stock>>>,
stock_tx: CrossSender<Stock>,
stock_rx: CrossReceiver<Stock>,
) {
thread::spawn(move || loop {
// get updated stock if there is one
// otherwise use original vec
if let Ok(updated_stock) = stock_rx.try_recv() {
let mut guard = stocks.lock().unwrap();
for stock in guard.iter_mut() {
if stock.name == updated_stock.name {
*stock = updated_stock.clone();
}
}
} // guard is dropped here
let guard = stocks.lock().unwrap();
for stock in &*guard {
// send over to incrementor
stock_tx.send(stock.clone()).unwrap();
} // guard is dropped here
});
}
fn stock_incrementor(stock_tx: CrossSender<Stock>, stock_rx: CrossReceiver<Stock>) {
thread::spawn(move || loop {
let mut stock = stock_rx.recv().unwrap();
stock.price = stock.price.saturating_add(1);
stock_tx.send(stock).unwrap(); // send updated stock price back to generator and broadcaster
});
}
fn broadcaster(id: u8, stock_rx: CrossReceiver<Stock>) {
thread::spawn(move || loop {
if let Ok(stock) = stock_rx.try_recv() {
println!("{id}: {} at {}", stock.name, stock.price);
}
});
}
I get output which is wrong. Or it's the code that's wrong. I'm not sure because threads don't go in order so I have no idea what's going on right now.
1: Microsoft at 109
2: Apple at 136
1: Microsoft at 109 <--
2: Apple at 136
1: Apple at 136
2: Apple at 137
1: Microsoft at 107 <--
2: Tesla at 239
1: Amazon at 129
So Microsoft was at 109 but at the bottom it appears as 107. As to why this happens I have my guess. So what I have in my head is like this. The first iteration.
try_recv()
fails.guard
is obtained- Looping through the
Vec
inside theguard
and each item is sent over tostock_incrementor
via a channel.
So here stock_incrementor
gest to work. It receives stock
updates the price and sends it to the broadcaster via another channel. It also sends it to the stock_generator
thread.
guard
is dropped when the for loop instock_generator
finishes.
stock_generator
's work is done, it can just loop again. So it does. This is iteration two:
try_recv()
succeeds (assuming it does)- Repeat.
This second iteration only works properly if the stock_incrementor
has finished it's operation of basically += 1
. But what happens if it doesn't? It basically gets duplicate data sent from stock_generator
.
I edited the code a few more times and it didn't help much. I'm very much lost and I'm not sure exactly I should be correcting if there needs to be any correction. Any help is greatly appreciated.
Share edited Mar 13 at 0:40 cafce25 28k5 gold badges45 silver badges58 bronze badges asked Mar 11 at 8:09 YJH16120YJH16120 5431 gold badge7 silver badges19 bronze badges 1- If you have a solution to your problem you should post it as an answer to the question. Do not add such information to the question, that gets confusing quickly. – cafce25 Commented Mar 13 at 0:41
2 Answers
Reset to default 1Your queue sequencing makes no sense: the exercise talks about a sequence of generator -> incrementor -> (broadcast1 | broadcast2)
, but you're using a single queue for all communication, and the generator pulls back from that queue as well, so they're all competing on that and the entire thing races itself depending on the OS's scheduling decisions: your stocks could cycle multiple times through the incrementor or bounce between the generator and incrementor, or go straight from generator to broadcaster.
So Microsoft was at 109 but at the bottom it appears as 107. As to why this happens I have my guess.
There's a trivial way for that to happen: the generator sends it out, the incrementor pulls it, increments it twice (because it increments, sends, then immediately retrieves the same stock), then sends it back out and a broadcaster grabs it and prints it. Since it never went back through the generator that never got the information, so next time it sends that stock it's with the original value, and if that is pulled directly by a broadcaster you get the original value.
Thanks to everyone's help I was able to get the solution I wished. I'll repeat the goal I wish to achieve for convenience and what my code does. If there are any inconsistencies in the code, feel free to point it out.
- The stock generator sends stocks to stock incrementor.
- The stock incrementor increments the price of the stock.
- The stock incrementor broadcasts the updated stock.
- The two broadcasters will "fight" for the right to announce the stock (basically print it).
This is what my code does to achieve it. In this case instead of sending the stock, the channels will instead send a receive the index to each stock. Each thread will have a reference to a Mutex<Vec<Stock>>
through an Arc
.
use std::{
sync::{Arc, Mutex},
thread::{self, sleep},
time::Duration,
};
use crossbeam_channel::{unbounded, Receiver as CrossReceiver, Sender as CrossSender};
#[derive(Clone, Debug)]
struct Stock {
name: String,
price: usize,
}
impl Stock {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
price: 1,
}
}
}
fn main() {
let stocks = vec![
Stock::new("Apple"),
Stock::new("Microsoft"),
Stock::new("Tesla"),
Stock::new("Amazon"),
];
let stocks = Arc::new(Mutex::new(stocks));
let (stock_tx, stock_rx) = unbounded();
let (broadcaster_tx, broadcaster_rx) = unbounded();
generator(stocks.clone(), stock_tx);
incrementor(stocks.clone(), stock_rx, broadcaster_tx);
broadcaster(1, stocks.clone(), broadcaster_rx.clone());
broadcaster(2, stocks, broadcaster_rx);
loop {}
}
fn generator(stocks: Arc<Mutex<Vec<Stock>>>, stock_tx: CrossSender<usize>) {
thread::spawn(move || loop {
let guard = stocks.lock().unwrap();
for (index, _) in guard.iter().enumerate() {
stock_tx.send(index).unwrap();
}
});
}
fn incrementor(
stocks: Arc<Mutex<Vec<Stock>>>,
stock_rx: CrossReceiver<usize>,
broadcaster_tx: CrossSender<usize>,
) {
thread::spawn(move || loop {
let mut guard = stocks.lock().unwrap();
let index = stock_rx.recv().unwrap();
let stock = guard.get_mut(index).unwrap();
stock.price += 1;
broadcaster_tx.send(index).unwrap();
sleep(Duration::from_millis(500));
});
}
fn broadcaster(id: u8, stocks: Arc<Mutex<Vec<Stock>>>, broadcaster_rx: CrossReceiver<usize>) {
thread::spawn(move || loop {
let guard_result = stocks.try_lock();
if let Ok(mut guard) = guard_result {
if let Ok(index) = broadcaster_rx.try_recv() {
let stock = guard.get_mut(index).unwrap();
println!("{}: {} {}", id, stock.name, stock.price);
}
}
});
}