1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
//! Used by the other [`EventLoop`][super::EventLoop] implementations to spawn threads for running
//! tasks in the background without blocking the GUI thread.
//!
//! This is essentially a slimmed down version of the `LinuxEventLoop`.

use crossbeam::channel;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::{Arc, Weak};
use std::thread::{self, JoinHandle};

use super::MainThreadExecutor;
use crate::util::permit_alloc;

/// See the module's documentation. This is a background thread that can be used to run tasks on.
/// The implementation shares a single thread between all of a plugin's instances hosted in the same
/// process.
pub(crate) struct BackgroundThread<T, E> {
    /// The object that actually executes the task `T`. We'll send a weak reference to this to the
    /// worker thread whenever a task needs to be executed. This allows multiple plugin instances to
    /// share the same worker thread.
    executor: Weak<E>,
    /// A thread that act as our worker thread. When [`schedule()`][Self::schedule()] is called,
    /// this thread will be woken up to execute the task on the executor. When the last worker
    /// thread handle gets dropped the thread is shut down.
    worker_thread: WorkerThreadHandle<T, E>,
}

/// A handle for the singleton worker thread. This lets multiple instances of the same plugin share
/// a worker thread, and when the last instance gets dropped the worker thread gets terminated.
struct WorkerThreadHandle<T, E> {
    pub(self) tasks_sender: channel::Sender<Message<T, E>>,
    /// The thread's reference count. Shared between all handles to the same thread. This is
    /// decrased by one when the struct is dropped.
    reference_count: Arc<AtomicIsize>,
    /// The thread's join handle. Joined when the reference count reaches 0.
    join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}

/// A message for communicating with the worker thread.
enum Message<T, E> {
    /// A new task for the event loop to execute along with the executor that should execute the
    /// task. A reference to the executor is sent alongside because multiple plugin instances may
    /// share the same background thread.
    Task((T, Weak<E>)),
    /// Shut down the worker thread. Send when the last reference to the thread is dropped.
    Shutdown,
}

impl<T, E> BackgroundThread<T, E>
where
    T: Send + 'static,
    E: MainThreadExecutor<T> + 'static,
{
    pub fn get_or_create(executor: Weak<E>) -> Self {
        Self {
            executor,
            // The same worker thread can be shared by multiple instances. Lifecycle management
            // happens through reference counting.
            worker_thread: get_or_create_worker_thread(),
        }
    }

    pub fn schedule(&self, task: T) -> bool {
        // NOTE: This may check the current thread ID, which involves an allocation whenever this
        //       first happens on a new thread because of the way thread local storage works
        permit_alloc(|| {
            self.worker_thread
                .tasks_sender
                .try_send(Message::Task((task, self.executor.clone())))
                .is_ok()
        })
    }
}

// Rust does not allow us to use the `T` and `E` type variable in statics, so this is a
// workaround to have a singleton that also works if for whatever reason there arem ultiple `T`
// and `E`s in a single process (won't happen with normal plugin usage, but sho knwos).
lazy_static::lazy_static! {
    static ref HANDLE_MAP: Mutex<anymap::Map<dyn anymap::any::Any + Send + 'static>> =
        Mutex::new(anymap::Map::new());
}

impl<T, E> Clone for WorkerThreadHandle<T, E> {
    fn clone(&self) -> Self {
        self.reference_count.fetch_add(1, Ordering::SeqCst);

        Self {
            tasks_sender: self.tasks_sender.clone(),
            reference_count: self.reference_count.clone(),
            join_handle: self.join_handle.clone(),
        }
    }
}

impl<T, E> Drop for WorkerThreadHandle<T, E> {
    fn drop(&mut self) {
        // If the host for whatever reason instantiates and destroys a plugin at the same time from
        // different threads, we need to make sure this doesn't do anything weird.
        let _handle_map = HANDLE_MAP.lock();

        // The thread is shut down and joined when the last handle is dropped
        if self.reference_count.fetch_sub(1, Ordering::SeqCst) == 1 {
            self.tasks_sender
                .send(Message::Shutdown)
                .expect("Failed while sending worker thread shutdown request");
            let join_handle = self
                .join_handle
                .lock()
                .take()
                .expect("The thread has already been joined");
            join_handle.join().expect("Worker thread panicked");
        }
    }
}

/// Either acquire a handle for an existing worker thread or create one if it does not yet exists.
/// This allows multiple plugin instances to share a worker thread. Reference counting happens
/// automatically as part of this function and `WorkerThreadHandle`'s lifecycle.
fn get_or_create_worker_thread<T, E>() -> WorkerThreadHandle<T, E>
where
    T: Send + 'static,
    E: MainThreadExecutor<T> + 'static,
{
    // The map entry contains both the thread's reference count
    // NOTE: This uses `AtomicIsize` for a reason. The `HANDLE_MAP` also holds a reference to this
    //       thread handle, and its `Drop` implementation will also fire if the
    //       `Option<WorkerThreadHandle<T, E>>` is ever overwritten. This will cause the reference
    //       count to become -1 which is fine.
    let mut handle_map = HANDLE_MAP.lock();
    let (reference_count, worker_thread_handle) = handle_map
        .entry::<(Arc<AtomicIsize>, Option<WorkerThreadHandle<T, E>>)>()
        .or_insert_with(|| (Arc::new(AtomicIsize::new(0)), None));

    // When this is the first reference to the worker thread, the thread is (re)initialized
    if reference_count.fetch_add(1, Ordering::SeqCst) <= 0 {
        let (tasks_sender, tasks_receiver) = channel::bounded(super::TASK_QUEUE_CAPACITY);
        let join_handle = thread::Builder::new()
            .name(String::from("bg-worker"))
            .spawn(move || worker_thread(tasks_receiver))
            .expect("Could not spawn background worker thread");

        // This needs special handling if `worker_thread_handle` was already a `Some` value because
        // the `Drop` will decrease the reference count when it gets overwritten. There may be a
        // better alternative to this.
        if worker_thread_handle.is_some() {
            reference_count.fetch_add(1, Ordering::SeqCst);
        }

        *worker_thread_handle = Some(WorkerThreadHandle {
            tasks_sender,
            reference_count: reference_count.clone(),
            join_handle: Arc::new(Mutex::new(Some(join_handle))),
        });
    }

    worker_thread_handle.clone().unwrap()
}

/// The worker thread used in [`EventLoop`] that executes incoming tasks on the event loop's
/// executor.
fn worker_thread<T, E>(tasks_receiver: channel::Receiver<Message<T, E>>)
where
    T: Send,
    E: MainThreadExecutor<T>,
{
    loop {
        match tasks_receiver.recv() {
            Ok(Message::Task((task, executor))) => match executor.upgrade() {
                Some(e) => e.execute(task, true),
                None => {
                    nih_trace!(
                        "Received a new task but the executor is no longer alive, shutting down \
                         worker"
                    );
                    return;
                }
            },
            Ok(Message::Shutdown) => return,
            Err(err) => {
                nih_trace!(
                    "Worker thread got disconnected unexpectedly, shutting down: {}",
                    err
                );
                return;
            }
        }
    }
}