Implementing Cooperative Multitasking
OxidizedOS is a multicore, x86-64 kernel written in Rust. For more information, see An Introduction.
The previous post in the series, Threads and Context Switching should be read before this post.
The code for this post is located here.
Intro
In this post, we will implement cooperative multitasking. For simplicity, we will use a round-robin scheduler, where each thread will be run in a FIFO order.
What is a cooperative scheduler? Threads can run as long they want, and can let other threads run by yielding to them. The problem? If threads refuse to yield, other threads will be unable to run.
Thread API
The Thread API looks like this:
/// Yield to another thread. Since yield is a reserved word in Rust,
/// we have to use a synonym here. If we call this, the current thread
/// will be placed on a ready queue and eventually run again.
pub fn surrender();
/// Stop the current thread. The current thread will halt execution,
/// and never run again, allowing its memory to be reclaimed.
pub fn stop();
/// Schedules a thread to be run by putting it in the ready queue.
pub fn schedule(thread: Box<dyn TCB>);
Let’s start with the easiest part: putting a thread in the ready queue. As our queue, we’ll use the VecDeque built into Rust core’s alloc crate. Since this requires heap allocation, we need to do the initialization at runtime, so we use the lazy static macro for this.
lazy_static! {
pub static ref READY: Mutex<VecDeque<Box<dyn TCB>>> = Mutex::new(VecDeque::new());
}
Since multiple threads can access this queue at the same time, we protect the queue with a Mutex.
To add a thread to the ready queue:
pub fn schedule(tcb: Box<dyn TCB>) {
READY.lock().push_back(tcb);
}
Now we can begin writing surrender.
Switching Threads
In order to switch to another thread, we need to get the currently running thread’s TCB and the TCB of the thread we want to switch to, and pass them to context_switch. To keep track of the currently running thread, we can maintain an array that contains the currently running thread. Initially, only the bootstrap threads are running on each core, so we create an abstraction for them.
#[repr(C)]
struct BootstrapTCB {
tcb_info: TCBInfo,
// TODO store this so we can reclaim the page representing the bootstrap stack
stack_frame_start: Option<usize>,
}
impl BootstrapTCB {
pub fn new() -> BootstrapTCB {
BootstrapTCB {
tcb_info: TCBInfo::new(0),
stack_frame_start: None,
}
}
}
impl TCB for BootstrapTCB {
fn get_info(&mut self) -> *mut TCBInfo {
&mut self.tcb_info as *mut TCBInfo
}
fn get_work(&mut self) -> Box<Task> {
panic!("BootstrapTCB has no work to do!");
}
}
lazy_static! {
pub static ref ACTIVE: [Mutex<Option<Box<dyn TCB>>>; 16] = {
let mut active: [MaybeUninit<Mutex<Option<Box<dyn TCB>>>>; 16] =
unsafe { MaybeUninit::uninit().assume_init() };
for i in 0..16 {
active[i] = MaybeUninit::new(Mutex::new(Some(Box::new(BootstrapTCB::new()))));
}
unsafe { core::mem::transmute::<_, [Mutex<Option<Box<dyn TCB>>>; 16]>(active) }
};
}
This pattern is essentially C-style array initialization, where each index in the array can be initialized to some value at runtime. We initialize each index to a BootstrapTCB to represent the currently running bootstrap thread on each core. Since each element of the array is then initialized, the transmute is completely safe. We use an Option type to allow there to be no active thread on the core, which will be useful when we implement preemption in the next post.
To get the current TCB, we do the following:
pub fn swap_active(swap_to: Option<Box<dyn TCB>>) -> Option<Box<dyn TCB>> {
let mut result = swap_to;
core::mem::swap(&mut result, &mut ACTIVE[smp::me()].lock());
result
}
smp::me() returns the core’s LAPIC id. Each core has a unique LAPIC id, so this provides a way to differentiate between cores on the system, and provides an index into the array of active threads. For example, if we start QEMU with -smp 4, each id will be in the range of [0, 3].
Since each index can only be accessed by at most one core (core i can only access ACTIVE[i]), a Mutex is actually unnecessary, but the lazy_static macro demands that all types declared implement Send + Sync, so we use a Mutex to satisfy this requirement. For simplicity, the array can only hold 16 elements, so we are limited to 16 cores, but this can be changed relatively easily (by using a boxed slice).
Now we can implement the first part of surrender:
pub fn surrender() {
let mut current_thread: Box<dyn TCB> = match swap_active(None) {
Some(mut tcb) => {tcb},
None => {panic!("No active thread!")}
};
let current_thread_info = current_thread.get_info();
// Put ourselves on the ready queue now?
block(current_thread_info);
}
(Note: Splitting up the logic into separate functions (surrender and block) will be useful when we implement semaphores (a synchronization primitive) in another post.)
We face an important question: when do we put the current thread on the ready queue?
When are we ready?
Intuitively, a thread should only be on the ready queue if it is ready to run (hence the name). If we add the thread to the ready queue now, the thread is certainly not ready to run – it’s currently running on a core! If we put it on the ready queue, another thread could attempt to switch to our currently running thread, with disastrous consequences: the same thread would be running simultaneously on two cores at the same time, meaning that two cores would share the same stack, so each core would corrupt the other’s data.
Let’s try to come up with ways to mitigate this:
We could hold the lock on READY until we’ve finished context switching. This would mean no other cores could context switch while we’re context switching, which would massively slow down the system. This is clearly unacceptable.
We could add an extra flag to the TCB which would represent whether or not the TCB was actually ready to run. This doesn’t feel like a great idea, as we’re still putting TCBs in the ready queue before they’re actually ready. However, this is significantly better than making surrender a critical section.
We could add ourselves to the ready queue when we’re ready to run – after we context switch. This seems problematic, as we’re no longer the currently running thread, as we have switched to some other kernel thread. What if we asked it to execute this task on our behalf?
Let’s explore approach #3. We can represent adding the current thread back into the ready queue as a closure:
let add_to_ready = move || {
READY.lock().push_back(current_thread);
};
Since this is a move closure, the closure now owns the current thread’s TCB, so it can be run independently of the current thread’s state. Let’s design a data structure that can hold these tasks (this is quite similar to our TCB).
type Cleanup = FnOnce() + Send + Sync;
pub struct TaskHolder {
tasks: VecDeque<Box<Cleanup>>,
}
impl TaskHolder {
pub fn new() -> TaskHolder {
TaskHolder {tasks: VecDeque::new()}
}
pub fn add_task(&mut self, task: Box<Cleanup>) {
self.tasks.push_back(task);
}
pub fn get_task(&mut self) -> Option<Box<Cleanup>> {
self.tasks.pop_front()
}
}
This is simply a queue of Cleanup tasks to be performed. A Cleanup function is simply something that can be run one time, and is Send + Sync, as a different thread will be executing the tasks.
Each active thread can submit cleanup tasks to be run by the next thread to run on the core, and since there are as many active threads as there are cores, we need one TaskHolder per core. Again, we use lazy_static:
lazy_static! {
pub static ref CLEANUP: [Mutex<Box<TaskHolder>>; 16] = {
let mut cleanup: [MaybeUninit<Mutex<Box<TaskHolder>>>; 16] =
unsafe { MaybeUninit::uninit().assume_init() };
for i in 0..16 {
cleanup[i] = MaybeUninit::new(Mutex::new(box TaskHolder::new()));
}
unsafe { core::mem::transmute::<_, [Mutex<Box<TaskHolder>>; 16]>(cleanup) }
};
}
Running all the cleanup tasks is simple:
fn cleanup() {
let me = smp::me();
let mut cleanup_work = CLEANUP[me].lock();
loop {
match cleanup_work.get_task() {
Some(work) => {work()},
None => {break}
}
}
}
Now we can finish our implementation of surrender and move on to block:
pub fn surrender() {
let mut current_thread: Box<dyn TCB> = match swap_active(None) {
Some(mut tcb) => {tcb},
None => {panic!("No active thread!")}
};
let current_thread_info = current_thread.get_info();
let me = smp::me();
let add_to_ready = move || {
READY.lock().push_back(current_thread);
};
// Have the next thread add us back to the ready queue
CLEANUP[me].lock().add_task(Box::new(add_to_ready));
block(current_thread_info);
}
Blocking
To block, we need to find something to switch to, switch to it, and set it as the active thread (using our cleanup tasks). We have one interesting question: what if we have nothing to switch to? For simplicity, we can just create something to switch to that does nothing, as most of the time, we’ll probably have something to switch to.
pub fn block(current_thread_info: *mut TCBInfo) {
// Find something to switch to
let mut next_thread: Box<dyn TCB> = match READY.lock().pop_front() {
Some(mut tcb) => tcb,
None => {
// Implementation Note: Potentially a trade off to switch to something that switches back,
// but most of the time, there should be something in the ready q
let work = move || {
return
};
let busy_work = Box::new(TCBImpl::new(Box::new(work)));
busy_work
}
};
let next_thread_info = next_thread.get_info();
let assert_as_active = move || {
// The next thread will now assert itself as the active thread
swap_active(Some(next_thread));
};
CLEANUP[smp::me()].lock().add_task(Box::new(assert_as_active));
unsafe {
machine::context_switch(current_thread_info, next_thread_info)
}
cleanup();
}
We run cleanup at the end of block, as when we switch back to this thread, it needs to run the cleanup tasks created by the previous thread running on this core. However, when a thread initially starts, it also must run the cleanup tasks, so this requires changing thread_entry_point.
Creating a thread entry point
Let’s improve thread_entry_point by running the cleanup tasks, and stopping when we finish work.
#[no_mangle]
pub extern "C" fn thread_entry_point() -> ! {
cleanup();
{
let mut active = match swap_active(None) {
Some(active) => active,
None => panic!("No thread available in thread entry point"),
};
let task = active.get_work();
swap_active(Some(active));
task();
}
stop();
loop {}
}
Now we need to implement stop.
Implementing stop
Stop is really just surrendering without putting ourselves back on the ready queue. Using this fact, we can rewrite surrender as a helper function:
pub fn surrender() {
surrender_help(true);
}
pub fn stop() {
surrender_help(false);
}
pub fn surrender_help(run_again: bool) {
let mut current_thread: Box<dyn TCB> = match swap_active(None) {
Some(mut tcb) => {tcb},
None => {panic!("No active thread!")}
};
let current_thread_info = current_thread.get_info();
let me = smp::me();
if (run_again) {
// Have the next thread add us back to the ready queue
let add_to_ready = move || {
READY.lock().push_back(current_thread);
};
CLEANUP[me].lock().add_task(Box::new(add_to_ready));
} else {
// Have the next thread free all the memory associated with the current TCB
let drop_current = move || {
let x = current_thread;
drop(x);
};
CLEANUP[me].lock().add_task(Box::new(drop_current));
}
block(current_thread_info);
}
Instead of having the next thread add the current TCB to the ready queue, we simply allow the current TCB to be dropped, freeing all the memory associated with the TCB.
Using the scheduler
Now we can schedule tasks. Let’s try something to see if it works.
pub fn cooperative_scheduler_test() {
println!("running cooperative scheduler test");
let counter = Arc::new(AtomicU32::new(0));
for i in 0..10 {
let c = Arc::clone(&counter);
let x = TCBImpl::new(box move || {
for i in 0..10 {
c.fetch_add(1, Ordering::SeqCst);
surrender();
}
});
schedule(box x);
}
println!("scheduled all threads");
while counter.load(Ordering::SeqCst) < 100 {
surrender();
}
println!("counter: {}", counter.load(Ordering::SeqCst));
}
This simply creates 10 threads, all incrementing the same atomic variable 10 times, surrendering between each increment.
When we run this, we’ll see
counter: 100
In my next post, I will introduce preemption to OxidizedOS.