mirror of
https://github.com/fluencelabs/wasm-bindgen
synced 2025-04-03 02:41:06 +00:00
Merge pull request #1380 from alexcrichton/rayon-example
Rewrite the parallel raytracing example with `rayon`
This commit is contained in:
commit
3aa803c55c
@ -164,14 +164,11 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- template: ci/azure-install-rust.yml
|
- template: ci/azure-install-rust.yml
|
||||||
parameters:
|
parameters:
|
||||||
toolchain: nightly-2019-04-01
|
toolchain: nightly
|
||||||
- template: ci/azure-install-sccache.yml
|
- template: ci/azure-install-sccache.yml
|
||||||
- script: rustup component add rust-src
|
- script: rustup component add rust-src
|
||||||
displayName: "install rust-src"
|
displayName: "install rust-src"
|
||||||
- script: |
|
- script: cargo install xargo
|
||||||
set -e
|
|
||||||
curl -L https://github.com/japaric/xargo/releases/download/v0.3.13/xargo-v0.3.13-x86_64-unknown-linux-musl.tar.gz | tar xzf -
|
|
||||||
echo "##vso[task.prependpath]$PWD"
|
|
||||||
displayName: "install xargo"
|
displayName: "install xargo"
|
||||||
- script: |
|
- script: |
|
||||||
set -e
|
set -e
|
||||||
|
@ -11,6 +11,8 @@ crate-type = ["cdylib"]
|
|||||||
console_error_panic_hook = "0.1"
|
console_error_panic_hook = "0.1"
|
||||||
futures = "0.1"
|
futures = "0.1"
|
||||||
js-sys = "0.3.22"
|
js-sys = "0.3.22"
|
||||||
|
rayon = "1.1.0"
|
||||||
|
rayon-core = "1.5.0"
|
||||||
raytracer = { git = 'https://github.com/alexcrichton/raytracer', branch = 'update-deps' }
|
raytracer = { git = 'https://github.com/alexcrichton/raytracer', branch = 'update-deps' }
|
||||||
wasm-bindgen = { version = "0.2.45", features = ['serde-serialize'] }
|
wasm-bindgen = { version = "0.2.45", features = ['serde-serialize'] }
|
||||||
wasm-bindgen-futures = "0.3.22"
|
wasm-bindgen-futures = "0.3.22"
|
||||||
|
@ -44,6 +44,7 @@ function run() {
|
|||||||
|
|
||||||
// Configure various buttons and such.
|
// Configure various buttons and such.
|
||||||
button.onclick = function() {
|
button.onclick = function() {
|
||||||
|
button.disabled = true;
|
||||||
console.time('render');
|
console.time('render');
|
||||||
let json;
|
let json;
|
||||||
try {
|
try {
|
||||||
@ -82,22 +83,28 @@ class State {
|
|||||||
this.running = true;
|
this.running = true;
|
||||||
this.counter = 1;
|
this.counter = 1;
|
||||||
|
|
||||||
this.interval = setInterval(() => this.updateTimer(), 100);
|
this.interval = setInterval(() => this.updateTimer(true), 100);
|
||||||
|
|
||||||
wasm.promise()
|
wasm.promise()
|
||||||
.then(() => {
|
.then(data => {
|
||||||
this.updateTimer();
|
this.updateTimer(false);
|
||||||
|
this.updateImage(data);
|
||||||
this.stop();
|
this.stop();
|
||||||
})
|
})
|
||||||
.catch(console.error);
|
.catch(console.error);
|
||||||
}
|
}
|
||||||
|
|
||||||
updateTimer() {
|
updateTimer(updateImage) {
|
||||||
const dur = performance.now() - this.start;
|
const dur = performance.now() - this.start;
|
||||||
timingVal.innerText = `${dur}ms`;
|
timingVal.innerText = `${dur}ms`;
|
||||||
this.counter += 1;
|
this.counter += 1;
|
||||||
if (this.wasm && this.counter % 3 == 0)
|
|
||||||
this.wasm.requestUpdate();
|
if (updateImage && this.wasm && this.counter % 3 == 0)
|
||||||
|
this.updateImage(this.wasm.imageSoFar());
|
||||||
|
}
|
||||||
|
|
||||||
|
updateImage(data) {
|
||||||
|
ctx.putImageData(data, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
@ -105,9 +112,9 @@ class State {
|
|||||||
return;
|
return;
|
||||||
console.timeEnd('render');
|
console.timeEnd('render');
|
||||||
this.running = false;
|
this.running = false;
|
||||||
pool = this.wasm.cancel(); // this frees `wasm`, returning the worker pool
|
|
||||||
this.wasm = null;
|
this.wasm = null;
|
||||||
clearInterval(this.interval);
|
clearInterval(this.interval);
|
||||||
|
button.disabled = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,6 +123,5 @@ function render(scene) {
|
|||||||
rendering.stop();
|
rendering.stop();
|
||||||
rendering = null;
|
rendering = null;
|
||||||
}
|
}
|
||||||
rendering = new State(scene.render(parseInt(concurrency.value), pool, ctx));
|
rendering = new State(scene.render(parseInt(concurrency.value), pool));
|
||||||
pool = null; // previous call took ownership of `pool`, zero it out here too
|
|
||||||
}
|
}
|
||||||
|
@ -1,25 +1,21 @@
|
|||||||
use std::cell::RefCell;
|
|
||||||
use std::cmp;
|
|
||||||
use std::rc::Rc;
|
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst};
|
|
||||||
use std::sync::{Arc, Mutex, MutexGuard};
|
|
||||||
|
|
||||||
use futures::sync::oneshot;
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use js_sys::{Array, Error, Promise, Uint8ClampedArray, WebAssembly};
|
use js_sys::{Promise, Uint8ClampedArray, WebAssembly};
|
||||||
|
use rayon::prelude::*;
|
||||||
use wasm_bindgen::prelude::*;
|
use wasm_bindgen::prelude::*;
|
||||||
use wasm_bindgen::JsCast;
|
use wasm_bindgen::JsCast;
|
||||||
use web_sys::{CanvasRenderingContext2d, ErrorEvent, Event, Worker};
|
|
||||||
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
|
|
||||||
|
|
||||||
macro_rules! console_log {
|
macro_rules! console_log {
|
||||||
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
|
($($t:tt)*) => (crate::log(&format_args!($($t)*).to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mod pool;
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#[wasm_bindgen(js_namespace = console)]
|
#[wasm_bindgen(js_namespace = console)]
|
||||||
fn log(s: &str);
|
fn log(s: &str);
|
||||||
|
#[wasm_bindgen(js_namespace = console, js_name = log)]
|
||||||
|
fn logv(x: &JsValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
@ -27,10 +23,10 @@ pub struct Scene {
|
|||||||
inner: raytracer::scene::Scene,
|
inner: raytracer::scene::Scene,
|
||||||
}
|
}
|
||||||
|
|
||||||
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
|
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
impl Scene {
|
impl Scene {
|
||||||
|
/// Creates a new scene from the JSON description in `object`, which we
|
||||||
|
/// deserialize here into an actual scene.
|
||||||
#[wasm_bindgen(constructor)]
|
#[wasm_bindgen(constructor)]
|
||||||
pub fn new(object: &JsValue) -> Result<Scene, JsValue> {
|
pub fn new(object: &JsValue) -> Result<Scene, JsValue> {
|
||||||
console_error_panic_hook::set_once();
|
console_error_panic_hook::set_once();
|
||||||
@ -41,304 +37,118 @@ impl Scene {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Renders this scene with the provided concurrency and worker pool.
|
||||||
|
///
|
||||||
|
/// This will spawn up to `concurrency` workers which are loaded from or
|
||||||
|
/// spawned into `pool`. The `RenderingScene` state contains information to
|
||||||
|
/// get notifications when the render has completed.
|
||||||
pub fn render(
|
pub fn render(
|
||||||
self,
|
self,
|
||||||
concurrency: usize,
|
concurrency: usize,
|
||||||
pool: WorkerPool,
|
pool: &pool::WorkerPool,
|
||||||
ctx: CanvasRenderingContext2d,
|
|
||||||
) -> Result<RenderingScene, JsValue> {
|
) -> Result<RenderingScene, JsValue> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let scene = self.inner;
|
||||||
let rx = rx.then(|_| Ok(JsValue::undefined()));
|
let height = scene.height;
|
||||||
|
let width = scene.width;
|
||||||
|
|
||||||
let data = Rc::new(RefCell::new(None::<Render>));
|
// Allocate the pixel data which our threads will be writing into.
|
||||||
|
let pixels = (width * height) as usize;
|
||||||
|
let mut rgb_data = vec![0; 4 * pixels];
|
||||||
|
let base = rgb_data.as_ptr() as usize;
|
||||||
|
let len = rgb_data.len();
|
||||||
|
|
||||||
let pixels = (self.inner.width * self.inner.height) as usize;
|
// Configure a rayon thread pool which will pull web workers from
|
||||||
let mut r = Render {
|
// `pool`.
|
||||||
tx: Some(tx),
|
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||||
callback: None,
|
.num_threads(concurrency - 1)
|
||||||
shared: Arc::new(Shared {
|
.spawn_handler(|thread| Ok(pool.run(|| thread.run()).unwrap()))
|
||||||
id: NEXT_ID.fetch_add(1, SeqCst),
|
.build()
|
||||||
need_update: AtomicBool::new(false),
|
.unwrap();
|
||||||
scene: self.inner,
|
|
||||||
next_pixel: AtomicUsize::new(0),
|
|
||||||
remaining: AtomicUsize::new(concurrency),
|
|
||||||
rgb_data: Mutex::new(vec![0; 4 * pixels]),
|
|
||||||
}),
|
|
||||||
ctx,
|
|
||||||
};
|
|
||||||
|
|
||||||
let data2 = data.clone();
|
// And now execute the render! The entire render happens on our worker
|
||||||
let callback = Closure::wrap(Box::new(move |msg: Event| -> Result<(), JsValue> {
|
// threads so we don't lock up the main thread, so we ship off a thread
|
||||||
let mut slot = data2.borrow_mut();
|
// which actually does the whole rayon business. When our returned
|
||||||
if let Some(mut data) = slot.take() {
|
// future is resolved we can pull out the final version of the image.
|
||||||
match data.event(&msg) {
|
let done = pool
|
||||||
Ok(true) => {}
|
.run_notify(move || {
|
||||||
Ok(false) => *slot = Some(data),
|
thread_pool.install(|| {
|
||||||
Err(e) => {
|
rgb_data
|
||||||
*slot = Some(data);
|
.par_chunks_mut(4)
|
||||||
return Err(e);
|
.enumerate()
|
||||||
}
|
.for_each(|(i, chunk)| {
|
||||||
}
|
let i = i as u32;
|
||||||
}
|
let x = i % width;
|
||||||
Ok(())
|
let y = i / width;
|
||||||
}) as Box<dyn FnMut(_) -> Result<(), JsValue>>);
|
let ray = raytracer::Ray::create_prime(x, y, &scene);
|
||||||
|
let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba();
|
||||||
for worker in &pool.workers[..concurrency] {
|
chunk[0] = result.data[0];
|
||||||
let ptr_to_send = Arc::into_raw(r.shared.clone()) as u32;
|
chunk[1] = result.data[1];
|
||||||
let ptr_to_send = JsValue::from(ptr_to_send);
|
chunk[2] = result.data[2];
|
||||||
worker.post_message(&ptr_to_send)?;
|
chunk[3] = result.data[3];
|
||||||
worker.set_onmessage(Some(callback.as_ref().unchecked_ref()));
|
});
|
||||||
worker.set_onerror(Some(callback.as_ref().unchecked_ref()));
|
});
|
||||||
}
|
rgb_data
|
||||||
|
})?
|
||||||
r.callback = Some(callback);
|
.map(move |_data| image_data(base, len, width, height).into());
|
||||||
*data.borrow_mut() = Some(r);
|
|
||||||
|
|
||||||
Ok(RenderingScene {
|
Ok(RenderingScene {
|
||||||
inner: data,
|
promise: wasm_bindgen_futures::future_to_promise(done),
|
||||||
promise: wasm_bindgen_futures::future_to_promise(rx),
|
base,
|
||||||
pool,
|
len,
|
||||||
|
height,
|
||||||
|
width,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[wasm_bindgen]
|
|
||||||
pub struct WorkerPool {
|
|
||||||
workers: Vec<Worker>,
|
|
||||||
callback: Closure<dyn FnMut(Event)>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasm_bindgen]
|
|
||||||
impl WorkerPool {
|
|
||||||
#[wasm_bindgen(constructor)]
|
|
||||||
pub fn new(max: u32) -> Result<WorkerPool, JsValue> {
|
|
||||||
let callback = Closure::wrap(Box::new(|event: Event| {
|
|
||||||
console_log!("unhandled event: {}", event.type_());
|
|
||||||
}) as Box<dyn FnMut(Event)>);
|
|
||||||
let mut workers = Vec::new();
|
|
||||||
for _ in 0..max {
|
|
||||||
// TODO: what do do about `./worker.js`:
|
|
||||||
//
|
|
||||||
// * the path is only known by the bundler. How can we, as a
|
|
||||||
// library, know what's going on?
|
|
||||||
// * How do we not fetch a script N times? It internally then
|
|
||||||
// causes another script to get fetched N times...
|
|
||||||
let worker = Worker::new("./worker.js")?;
|
|
||||||
let array = js_sys::Array::new();
|
|
||||||
array.push(&wasm_bindgen::module());
|
|
||||||
|
|
||||||
// TODO: memory allocation error handling here is hard:
|
|
||||||
//
|
|
||||||
// * How to we make sure that our strong ref made it to a client
|
|
||||||
// thread?
|
|
||||||
// * Need to handle the `?` on `post_message` as well.
|
|
||||||
array.push(&wasm_bindgen::memory());
|
|
||||||
worker.post_message(&array)?;
|
|
||||||
worker.set_onmessage(Some(callback.as_ref().unchecked_ref()));
|
|
||||||
worker.set_onerror(Some(callback.as_ref().unchecked_ref()));
|
|
||||||
workers.push(worker);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(WorkerPool { workers, callback })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for WorkerPool {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
for worker in self.workers.iter() {
|
|
||||||
worker.terminate();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
pub struct RenderingScene {
|
pub struct RenderingScene {
|
||||||
inner: Rc<RefCell<Option<Render>>>,
|
base: usize,
|
||||||
|
len: usize,
|
||||||
promise: Promise,
|
promise: Promise,
|
||||||
pool: WorkerPool,
|
width: u32,
|
||||||
}
|
height: u32,
|
||||||
|
|
||||||
#[wasm_bindgen]
|
|
||||||
impl RenderingScene {
|
|
||||||
pub fn promise(&self) -> Promise {
|
|
||||||
self.promise.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasm_bindgen(js_name = requestUpdate)]
|
|
||||||
pub fn request_update(&self) {
|
|
||||||
if let Some(render) = self.inner.borrow().as_ref() {
|
|
||||||
render.shared.need_update.store(true, SeqCst);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cancel(self) -> WorkerPool {
|
|
||||||
if let Some(render) = self.inner.borrow_mut().take() {
|
|
||||||
// drain the rest of the pixels to cause all workers to cancel ASAP.
|
|
||||||
let pixels = render.shared.scene.width * render.shared.scene.height;
|
|
||||||
render.shared.next_pixel.fetch_add(pixels as usize, SeqCst);
|
|
||||||
}
|
|
||||||
for worker in self.pool.workers.iter() {
|
|
||||||
worker.set_onmessage(Some(&self.pool.callback.as_ref().unchecked_ref()));
|
|
||||||
worker.set_onerror(Some(&self.pool.callback.as_ref().unchecked_ref()));
|
|
||||||
}
|
|
||||||
self.pool
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Render {
|
|
||||||
callback: Option<Closure<dyn FnMut(Event) -> Result<(), JsValue>>>,
|
|
||||||
tx: Option<oneshot::Sender<()>>,
|
|
||||||
shared: Arc<Shared>,
|
|
||||||
ctx: CanvasRenderingContext2d,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Shared {
|
|
||||||
id: usize,
|
|
||||||
need_update: AtomicBool,
|
|
||||||
scene: raytracer::scene::Scene,
|
|
||||||
next_pixel: AtomicUsize,
|
|
||||||
remaining: AtomicUsize,
|
|
||||||
rgb_data: Mutex<Vec<u8>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inline the definition of `ImageData` here because `web_sys` uses
|
||||||
|
// `&Clamped<Vec<u8>>`, whereas we want to pass in a JS object here.
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
extern "C" {
|
extern "C" {
|
||||||
type ImageData;
|
pub type ImageData;
|
||||||
|
|
||||||
#[wasm_bindgen(constructor, catch)]
|
#[wasm_bindgen(constructor, catch)]
|
||||||
fn new(data: &Uint8ClampedArray, width: f64, height: f64) -> Result<ImageData, JsValue>;
|
fn new(data: &Uint8ClampedArray, width: f64, height: f64) -> Result<ImageData, JsValue>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Render {
|
|
||||||
fn event(&mut self, event: &Event) -> Result<bool, JsValue> {
|
|
||||||
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
|
||||||
let msg = format!("error in worker: {}", error.message());
|
|
||||||
return Err(Error::new(&msg).into());
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(msg) = event.dyn_ref::<MessageEvent>() {
|
|
||||||
let data = msg.data();
|
|
||||||
if let Some(data) = data.dyn_ref::<Array>() {
|
|
||||||
let id = data.pop();
|
|
||||||
let done = data.pop();
|
|
||||||
let image = data.pop();
|
|
||||||
if let Some(id) = id.as_f64() {
|
|
||||||
if id == self.shared.id as f64 {
|
|
||||||
self.ctx.put_image_data(image.unchecked_ref(), 0.0, 0.0)?;
|
|
||||||
return Ok(done.as_bool() == Some(true));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
console_log!("unhandled message: {:?}", data);
|
|
||||||
return Ok(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
console_log!("unhandled event: {}", event.type_());
|
|
||||||
|
|
||||||
Ok(false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[wasm_bindgen]
|
#[wasm_bindgen]
|
||||||
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
|
impl RenderingScene {
|
||||||
let ptr = unsafe { Arc::from_raw(ptr as *const Shared) };
|
/// Returns the JS promise object which resolves when the render is complete
|
||||||
assert_send(&ptr);
|
pub fn promise(&self) -> Promise {
|
||||||
|
self.promise.clone()
|
||||||
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
|
|
||||||
ptr.work(&global)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
|
|
||||||
fn assert_send<T: Send + 'static>(_: &T) {}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Shared {
|
/// Return a progressive rendering of the image so far
|
||||||
fn work(&self, global: &DedicatedWorkerGlobalScope) -> Result<(), JsValue> {
|
#[wasm_bindgen(js_name = imageSoFar)]
|
||||||
// Once we're done raytracing a pixel we need to actually write its rgb
|
pub fn image_so_far(&self) -> ImageData {
|
||||||
// value into the shared memory buffer for our image. This, however,
|
image_data(self.base, self.len, self.width, self.height)
|
||||||
// requires synchronization with other threads (as currently
|
|
||||||
// implemented). To help amortize the cost of synchronization each
|
|
||||||
// thread processes a chunk of pixels at a time, and this number is how
|
|
||||||
// many pixes will be rendered synchronously before committing them back
|
|
||||||
// to memory.
|
|
||||||
const BLOCK: usize = 1024;
|
|
||||||
|
|
||||||
let width = self.scene.width as usize;
|
|
||||||
let height = self.scene.height as usize;
|
|
||||||
let end = width * height;
|
|
||||||
|
|
||||||
// Thread-local storage for our RGB data, commited back in one batch to
|
|
||||||
// the main image memory.
|
|
||||||
let mut local_rgb = [0; BLOCK * 4];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// First up, grab a block of pixels to render using an atomic add.
|
|
||||||
// If we're beyond the end then we're done!
|
|
||||||
let start = self.next_pixel.fetch_add(BLOCK, SeqCst);
|
|
||||||
if start >= end {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Raytrace all our pixels synchronously, writing all the results
|
|
||||||
// into our local memory buffer.
|
|
||||||
let len = cmp::min(end, start + BLOCK) - start;
|
|
||||||
for (i, dst) in local_rgb.chunks_mut(4).enumerate().take(len) {
|
|
||||||
let x = (start + i) % width;
|
|
||||||
let y = (start + i) / width;
|
|
||||||
let ray = raytracer::Ray::create_prime(x as u32, y as u32, &self.scene);
|
|
||||||
let result = raytracer::cast_ray(&self.scene, &ray, 0).to_rgba();
|
|
||||||
dst[0] = result.data[0];
|
|
||||||
dst[1] = result.data[1];
|
|
||||||
dst[2] = result.data[2];
|
|
||||||
dst[3] = result.data[3];
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ok, time to synchronize and commit this data back into the main
|
|
||||||
// image buffer for other threads and the main thread to see.
|
|
||||||
let mut data = self.rgb_data.lock().unwrap();
|
|
||||||
data[start * 4..(start + len) * 4].copy_from_slice(&mut local_rgb[..len * 4]);
|
|
||||||
|
|
||||||
// As a "nifty feature" we try to have a live progressive rendering.
|
|
||||||
// That means that we need to periodically send an `ImageData` to
|
|
||||||
// the main thread. Do so whenever the main thread requests it.
|
|
||||||
if self.need_update.swap(false, SeqCst) {
|
|
||||||
self.update_image(false, data, global)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we're the last thread out, be sure to update the main thread's
|
fn image_data(base: usize, len: usize, width: u32, height: u32) -> ImageData {
|
||||||
// image as this is the last chance we'll get!
|
// Use the raw access available through `memory.buffer`, but be sure to
|
||||||
if self.remaining.fetch_sub(1, SeqCst) == 1 {
|
// use `slice` instead of `subarray` to create a copy that isn't backed
|
||||||
let data = self.rgb_data.lock().unwrap();
|
// by `SharedArrayBuffer`. Currently `ImageData` rejects a view of
|
||||||
self.update_image(true, data, global)?;
|
// `Uint8ClampedArray` that's backed by a shared buffer.
|
||||||
}
|
//
|
||||||
|
// FIXME: that this may or may not be UB based on Rust's rules. For example
|
||||||
Ok(())
|
// threads may be doing unsynchronized writes to pixel data as we read it
|
||||||
}
|
// off here. In the context of wasm this may or may not be UB, we're
|
||||||
|
// unclear! In any case for now it seems to work and produces a nifty
|
||||||
fn update_image(
|
// progressive rendering. A more production-ready application may prefer to
|
||||||
&self,
|
// instead use some form of signaling here to request an update from the
|
||||||
done: bool,
|
// workers instead of synchronously acquiring an update, and that way we
|
||||||
data: MutexGuard<'_, Vec<u8>>,
|
// could ensure that even on the Rust side of things it's not UB.
|
||||||
global: &DedicatedWorkerGlobalScope,
|
|
||||||
) -> Result<(), JsValue> {
|
|
||||||
// This is pretty icky. We can't create an `ImageData` backed by
|
|
||||||
// `SharedArrayBuffer`, so we need to copy the memory into a local
|
|
||||||
// JS array using `slice`. This means we can't use
|
|
||||||
// `web_sys::ImageData` right now but rather we have to use our own
|
|
||||||
// binding.
|
|
||||||
let mem = wasm_bindgen::memory().unchecked_into::<WebAssembly::Memory>();
|
let mem = wasm_bindgen::memory().unchecked_into::<WebAssembly::Memory>();
|
||||||
let mem = Uint8ClampedArray::new(&mem.buffer()).slice(
|
let mem = Uint8ClampedArray::new(&mem.buffer()).slice(base as u32, (base + len) as u32);
|
||||||
data.as_ptr() as u32,
|
ImageData::new(&mem, width as f64, height as f64).unwrap()
|
||||||
data.as_ptr() as u32 + data.len() as u32,
|
|
||||||
);
|
|
||||||
drop(data); // unlock the lock, we've copied the data now
|
|
||||||
let data = ImageData::new(&mem, self.scene.width as f64, self.scene.height as f64)?;
|
|
||||||
let arr = Array::new();
|
|
||||||
arr.push(&data);
|
|
||||||
arr.push(&JsValue::from(done));
|
|
||||||
arr.push(&JsValue::from(self.id as f64));
|
|
||||||
global.post_message(&arr)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
295
examples/raytrace-parallel/src/pool.rs
Normal file
295
examples/raytrace-parallel/src/pool.rs
Normal file
@ -0,0 +1,295 @@
|
|||||||
|
//! A small module that's intended to provide an example of creating a pool of
|
||||||
|
//! web workers which can be used to execute `rayon`-style work.
|
||||||
|
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use futures::Future;
|
||||||
|
use std::cell::{RefCell, UnsafeCell};
|
||||||
|
use std::mem;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use wasm_bindgen::prelude::*;
|
||||||
|
use wasm_bindgen::JsCast;
|
||||||
|
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
|
||||||
|
use web_sys::{ErrorEvent, Event, Worker};
|
||||||
|
|
||||||
|
#[wasm_bindgen]
|
||||||
|
pub struct WorkerPool {
|
||||||
|
state: Rc<PoolState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PoolState {
|
||||||
|
workers: RefCell<Vec<Worker>>,
|
||||||
|
callback: Closure<dyn FnMut(Event)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Work {
|
||||||
|
func: Box<dyn FnOnce() + Send>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[wasm_bindgen]
|
||||||
|
impl WorkerPool {
|
||||||
|
/// Creates a new `WorkerPool` which immediately creates `initial` workers.
|
||||||
|
///
|
||||||
|
/// The pool created here can be used over a long period of time, and it
|
||||||
|
/// will be initially primed with `initial` workers. Currently workers are
|
||||||
|
/// never released or gc'd until the whole pool is destroyed.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns any error that may happen while a JS web worker is created and a
|
||||||
|
/// message is sent to it.
|
||||||
|
#[wasm_bindgen(constructor)]
|
||||||
|
pub fn new(initial: usize) -> Result<WorkerPool, JsValue> {
|
||||||
|
let pool = WorkerPool {
|
||||||
|
state: Rc::new(PoolState {
|
||||||
|
workers: RefCell::new(Vec::with_capacity(initial)),
|
||||||
|
callback: Closure::wrap(Box::new(|event: Event| {
|
||||||
|
console_log!("unhandled event: {}", event.type_());
|
||||||
|
crate::logv(&event);
|
||||||
|
}) as Box<dyn FnMut(Event)>),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
for _ in 0..initial {
|
||||||
|
let worker = pool.spawn()?;
|
||||||
|
pool.state.push(worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Unconditionally spawns a new worker
|
||||||
|
///
|
||||||
|
/// The worker isn't registered with this `WorkerPool` but is capable of
|
||||||
|
/// executing work for this wasm module.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns any error that may happen while a JS web worker is created and a
|
||||||
|
/// message is sent to it.
|
||||||
|
fn spawn(&self) -> Result<Worker, JsValue> {
|
||||||
|
console_log!("spawning new worker");
|
||||||
|
// TODO: what do do about `./worker.js`:
|
||||||
|
//
|
||||||
|
// * the path is only known by the bundler. How can we, as a
|
||||||
|
// library, know what's going on?
|
||||||
|
// * How do we not fetch a script N times? It internally then
|
||||||
|
// causes another script to get fetched N times...
|
||||||
|
let worker = Worker::new("./worker.js")?;
|
||||||
|
|
||||||
|
// With a worker spun up send it the module/memory so it can start
|
||||||
|
// instantiating the wasm module. Later it might receive further
|
||||||
|
// messages about code to run on the wasm module.
|
||||||
|
let array = js_sys::Array::new();
|
||||||
|
array.push(&wasm_bindgen::module());
|
||||||
|
array.push(&wasm_bindgen::memory());
|
||||||
|
worker.post_message(&array)?;
|
||||||
|
|
||||||
|
Ok(worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetches a worker from this pool, spawning one if necessary.
|
||||||
|
///
|
||||||
|
/// This will attempt to pull an already-spawned web worker from our cache
|
||||||
|
/// if one is available, otherwise it will spawn a new worker and return the
|
||||||
|
/// newly spawned worker.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns any error that may happen while a JS web worker is created and a
|
||||||
|
/// message is sent to it.
|
||||||
|
fn worker(&self) -> Result<Worker, JsValue> {
|
||||||
|
match self.state.workers.borrow_mut().pop() {
|
||||||
|
Some(worker) => Ok(worker),
|
||||||
|
None => self.spawn(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Executes the work `f` in a web worker, spawning a web worker if
|
||||||
|
/// necessary.
|
||||||
|
///
|
||||||
|
/// This will acquire a web worker and then send the closure `f` to the
|
||||||
|
/// worker to execute. The worker won't be usable for anything else while
|
||||||
|
/// `f` is executing, and no callbacks are registered for when the worker
|
||||||
|
/// finishes.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns any error that may happen while a JS web worker is created and a
|
||||||
|
/// message is sent to it.
|
||||||
|
fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result<Worker, JsValue> {
|
||||||
|
let worker = self.worker()?;
|
||||||
|
let work = Box::new(Work { func: Box::new(f) });
|
||||||
|
let ptr = Box::into_raw(work);
|
||||||
|
match worker.post_message(&JsValue::from(ptr as u32)) {
|
||||||
|
Ok(()) => Ok(worker),
|
||||||
|
Err(e) => {
|
||||||
|
unsafe {
|
||||||
|
drop(Box::from_raw(ptr));
|
||||||
|
}
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures an `onmessage` callback for the `worker` specified for the
|
||||||
|
/// web worker to be reclaimed and re-inserted into this pool when a message
|
||||||
|
/// is received.
|
||||||
|
///
|
||||||
|
/// Currently this `WorkerPool` abstraction is intended to execute one-off
|
||||||
|
/// style work where the work itself doesn't send any notifications and
|
||||||
|
/// whatn it's done the worker is ready to execute more work. This method is
|
||||||
|
/// used for all spawned workers to ensure that when the work is finished
|
||||||
|
/// the worker is reclaimed back into this pool.
|
||||||
|
fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) {
|
||||||
|
let state = Rc::downgrade(&self.state);
|
||||||
|
let worker2 = worker.clone();
|
||||||
|
let reclaim_slot = Rc::new(RefCell::new(None));
|
||||||
|
let slot2 = reclaim_slot.clone();
|
||||||
|
let mut on_finish = Some(on_finish);
|
||||||
|
let reclaim = Closure::wrap(Box::new(move |event: Event| {
|
||||||
|
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
||||||
|
console_log!("error in worker: {}", error.message());
|
||||||
|
// TODO: this probably leaks memory somehow? It's sort of
|
||||||
|
// unclear what to do about errors in workers right now.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is a completion event then we can execute our `on_finish`
|
||||||
|
// callback and we can also deallocate our own callback by clearing
|
||||||
|
// out `slot2` which contains our own closure.
|
||||||
|
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
|
||||||
|
on_finish.take().unwrap()();
|
||||||
|
if let Some(state) = state.upgrade() {
|
||||||
|
state.push(worker2.clone());
|
||||||
|
}
|
||||||
|
*slot2.borrow_mut() = None;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
console_log!("unhandled event: {}", event.type_());
|
||||||
|
crate::logv(&event);
|
||||||
|
// TODO: like above, maybe a memory leak here?
|
||||||
|
}) as Box<dyn FnMut(Event)>);
|
||||||
|
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
|
||||||
|
*reclaim_slot.borrow_mut() = Some(reclaim);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerPool {
|
||||||
|
/// Executes `f` in a web worker.
|
||||||
|
///
|
||||||
|
/// This pool manages a set of web workers to draw from, and `f` will be
|
||||||
|
/// spawned quickly into one if the worker is idle. If no idle workers are
|
||||||
|
/// available then a new web worker will be spawned.
|
||||||
|
///
|
||||||
|
/// Once `f` returns the worker assigned to `f` is automatically reclaimed
|
||||||
|
/// by this `WorkerPool`. This method provides no method of learning when
|
||||||
|
/// `f` completes, and for that you'll need to use `run_notify`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// If an error happens while spawning a web worker or sending a message to
|
||||||
|
/// a web worker, that error is returned.
|
||||||
|
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
|
||||||
|
let worker = self.execute(f)?;
|
||||||
|
self.reclaim_on_message(worker, || {});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Executes the closure `f` in a web worker, returning a future of the
|
||||||
|
/// value that `f` produces.
|
||||||
|
///
|
||||||
|
/// This method is the same as `run` execept that it allows recovering the
|
||||||
|
/// return value of the closure `f` in a nonblocking fashion with the future
|
||||||
|
/// returned.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// If an error happens while spawning a web worker or sending a message to
|
||||||
|
/// a web worker, that error is returned.
|
||||||
|
pub fn run_notify<T>(
|
||||||
|
&self,
|
||||||
|
f: impl FnOnce() -> T + Send + 'static,
|
||||||
|
) -> Result<impl Future<Item = T, Error = JsValue> + 'static, JsValue>
|
||||||
|
where
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
// FIXME(#1379) we should just use the `oneshot` directly as the future,
|
||||||
|
// but we have to use JS callbacks to ensure we don't have futures cross
|
||||||
|
// threads as that's currently not safe to do so.
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let storage = Arc::new(AtomicValue::new(None));
|
||||||
|
let storage2 = storage.clone();
|
||||||
|
let worker = self.execute(move || {
|
||||||
|
assert!(storage2.replace(Some(f())).is_ok());
|
||||||
|
})?;
|
||||||
|
self.reclaim_on_message(worker, move || match storage.replace(None) {
|
||||||
|
Ok(Some(val)) => drop(tx.send(val)),
|
||||||
|
_ => unreachable!(),
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(rx.map_err(|_| JsValue::undefined()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A small helper struct representing atomic access to an internal value `T`
|
||||||
|
///
|
||||||
|
/// This struct only supports one API, `replace`, which will either succeed and
|
||||||
|
/// replace the internal value with another (returning the previous one), or it
|
||||||
|
/// will fail returning the value passed in. Failure happens when two threads
|
||||||
|
/// try to `replace` at the same time.
|
||||||
|
///
|
||||||
|
/// This is only really intended to help safely transfer information between
|
||||||
|
/// threads, it doesn't provide any synchronization capabilities itself other
|
||||||
|
/// than a guaranteed safe API.
|
||||||
|
struct AtomicValue<T> {
|
||||||
|
modifying: AtomicBool,
|
||||||
|
slot: UnsafeCell<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<T: Send> Send for AtomicValue<T> {}
|
||||||
|
unsafe impl<T: Send> Sync for AtomicValue<T> {}
|
||||||
|
|
||||||
|
impl<T> AtomicValue<T> {
|
||||||
|
fn new(val: T) -> AtomicValue<T> {
|
||||||
|
AtomicValue {
|
||||||
|
modifying: AtomicBool::new(false),
|
||||||
|
slot: UnsafeCell::new(val),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replace(&self, val: T) -> Result<T, T> {
|
||||||
|
if self.modifying.swap(true, SeqCst) {
|
||||||
|
return Err(val);
|
||||||
|
}
|
||||||
|
let ret = unsafe { mem::replace(&mut *self.slot.get(), val) };
|
||||||
|
self.modifying.store(false, SeqCst);
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PoolState {
|
||||||
|
fn push(&self, worker: Worker) {
|
||||||
|
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
|
||||||
|
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
|
||||||
|
let mut workers = self.workers.borrow_mut();
|
||||||
|
for prev in workers.iter() {
|
||||||
|
let prev: &JsValue = prev;
|
||||||
|
let worker: &JsValue = &worker;
|
||||||
|
assert!(prev != worker);
|
||||||
|
}
|
||||||
|
workers.push(worker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Entry point invoked by `worker.js`, a bit of a hack but see the "TODO" above
|
||||||
|
/// about `worker.js` in general.
|
||||||
|
#[wasm_bindgen]
|
||||||
|
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
|
||||||
|
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
|
||||||
|
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
|
||||||
|
(ptr.func)();
|
||||||
|
global.post_message(&JsValue::undefined())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
@ -28,5 +28,9 @@ function run(ptr) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
lastPtr = null;
|
lastPtr = null;
|
||||||
|
try {
|
||||||
wasm_bindgen.child_entry_point(ptr);
|
wasm_bindgen.child_entry_point(ptr);
|
||||||
|
} catch (e) {
|
||||||
|
throw new Error(e.message + "\n\n" + e.stack);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user