blob: b103487849de7fd5df197246985c6da653041cac [file] [log] [blame] [edit]
#![cfg(feature = "bilock")]
use futures::executor::block_on;
use futures::future;
use futures::stream;
use futures::task::{Context, Poll};
use futures::Future;
use futures::StreamExt;
use futures_test::task::noop_context;
use futures_util::lock::BiLock;
use std::pin::Pin;
use std::thread;
#[test]
fn smoke() {
let future = future::lazy(|cx| {
let (a, b) = BiLock::new(1);
{
let mut lock = match a.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
assert_eq!(*lock, 1);
*lock = 2;
assert!(b.poll_lock(cx).is_pending());
assert!(a.poll_lock(cx).is_pending());
}
assert!(b.poll_lock(cx).is_ready());
assert!(a.poll_lock(cx).is_ready());
{
let lock = match b.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
assert_eq!(*lock, 2);
}
assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
Ok::<(), ()>(())
});
assert_eq!(block_on(future), Ok(()));
}
#[test]
fn concurrent() {
const N: usize = 10000;
let mut cx = noop_context();
let (a, b) = BiLock::new(0);
let a = Increment { a: Some(a), remaining: N };
let b = stream::iter(0..N).fold(b, |b, _n| async {
let mut g = b.lock().await;
*g += 1;
drop(g);
b
});
let t1 = thread::spawn(move || block_on(a));
let b = block_on(b);
let a = t1.join().unwrap();
match a.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
match b.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
struct Increment {
remaining: usize,
a: Option<BiLock<usize>>,
}
impl Future for Increment {
type Output = BiLock<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
loop {
if self.remaining == 0 {
return self.a.take().unwrap().into();
}
let a = self.a.as_mut().unwrap();
let mut a = match a.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => return Poll::Pending,
};
*a += 1;
drop(a);
self.remaining -= 1;
}
}
}
}