Taskflow  2.7.0
C2: Executor

After you create a task dependency graph, you need to submit it to threads for execution. In this chapter, we will show you how to execute a task dependency graph.

Create an Executor

To execute a taskflow, you need to create an executor of type tf::Executor. An executor is a thread-safe object that manages a set of worker threads and executes tasks through an efficient work-stealing algorithm. Issuing a call to run a taskflow creates a topology, a data structure to keep track of the execution status of a running graph. tf::Executor takes an unsigned integer to construct with N worker threads. The default value is std::thread::hardware_concurrency.

tf::Executor executor1; // create an executor of std::thread::hardware_concurrency worker threads
tf::Executor executor2(4); // create an executor of 4 worker threads

An executor can be reused to execute multiple taskflows. In most workloads, you may need only one executor to run multiple taskflows where each taskflow represents a part of a parallel decomposition.

Execute a Taskflow

tf::Executor provides a set of run_* methods, tf::Executor::run, tf::Executor::run_n, and tf::Executor::run_until to run a taskflow for one time, multiple times, or until a given predicate evaluates to true. All methods accept an optional callback to invoke after the execution completes, and return a std::future for users to access the execution status. The code below shows several ways to run a taskflow.

1: // Declare an executor and a taskflow
2: tf::Executor executor;
3: tf::Taskflow taskflow;
4:
5: // Add three tasks into the taskflow
6: tf::Task A = taskflow.emplace([] () { std::cout << "This is TaskA\n"; });
7: tf::Task B = taskflow.emplace([] () { std::cout << "This is TaskB\n"; });
8: tf::Task C = taskflow.emplace([] () { std::cout << "This is TaskC\n"; });
9:
10: // Build precedence between tasks
11: A.precede(B, C);
12:
13: std::future<void> fu = executor.run(taskflow);
14: fu.wait(); // block until the execution completes
15:
16: executor.run(taskflow, [](){ std::cout << "end of one execution\n"; }).wait();
17: executor.run_n(taskflow, 4);
18: executor.wait_for_all(); // block until all associated executions finish
19: executor.run_n(taskflow, 4, [](){ std::cout << "end of four executions\n"; }).wait();
20: executor.run_until(taskflow, [int cnt=0] () mutable { return (++cnt == 10); });

Debrief:

  • Line 6-8 creates a taskflow of three tasks A, B, and C
  • Line 13-14 runs the taskflow once and use std::future::wait to wait for completion
  • Line 16 runs the taskflow once with a callback to invoke when the execution finishes
  • Line 17-18 runs the taskflow four times and use tf::Executor::wait_for_all to wait for completion
  • Line 19 runs the taskflow four times and invokes a callback at the end of the forth execution
  • Line 20 keeps running the taskflow until the predicate returns true

Issuing multiple runs on the same taskflow will automatically synchronize to a sequential chain of executions in the order of run calls.

executor.run(taskflow); // execution 1
executor.run_n(taskflow, 10); // execution 2
executor.run(taskflow); // execution 3
executor.wait_for_all(); // execution 1 -> execution 2 -> execution 3

A key point to notice is a running taskflow must remain alive during its execution. It is your responsibility to ensure a taskflow not being destructed when it is running. For example, the code below can result undefined behavior.

tf::Executor executor; // create an executor
// create a taskflow whose lifetime is restricted by the scope
{
tf::Taskflow taskflow;
// add tasks to the taskflow
// ...
// run the taskflow
executor.run(f);
} // at this point, taskflow might get destructed while it is running, resulting in undefined behavior

Similarly, you should avoid touching a taskflow while it is running.

tf::Taskflow taskflow;
// Add tasks into the taskflow
// ...
// Declare an executor
tf::Executor executor;
std::future<void> future = taskflow.run(f); // non-blocking return
// alter the taskflow while running leads to undefined behavior
f.emplace([](){ std::cout << "Add a new task\n"; });

A rule of thumb is to always keep a taskflow alive in your function scope while it is participating in an execution.

Thread Safety

All run_* methods are thread-safe. You can have multiple threads call these methods from an executor to run different taskflows. However, the order which taskflow runs first is non-deterministic and is up to the runtime.

1: tf::Executor executor;
2:
3: for(int i=0; i<10; ++i) {
4: std::thread([i, &](){
5: // ... modify my taskflow
6: executor.run(taskflows[i]); // run my taskflow
7: }).detach();
8: }

It is your responsibility to ensure all taskflows from different threads remain alive during their executions; or it can result unexpected behavior or program crash.

Run a Task Asynchronously

In addition to run a taskflow, the executor provides a STL-styled method, tf::Executor::async, for you to run a callable object (e.g., function) asynchronously. A common use case of the method is using the executor as a thread pool.

std::future<int> future = executor.async([](){
// do some heavy work
// ...
return 1;
});
executor.wait_for_all();
assert(future.get() == 1);

The method, tf::Executor::async, is thread-safe and can be called from any threads or from the execution of a task. Our scheduler autonomously detects whether the async task is submitted from an external thread or a worker thread and executes it in the work-stealing loop.

taskflow.emplace([&](){
// do some work
// ...
// launch a task asynchronously
executor.async([](){
// do another asynchronous work
})
});
executor.run(taskflow);
executor.wait_for_all(); // wait for all tasks to finish

Observe Thread Activities

You can observe thread activities in an executor when a worker thread participates in executing a task and leaves the execution using tf::ObserverInterface. tf::ObserverInterface is an interface class that provides a set of methods for you to define what to do when a thread enters and leaves the execution context of a task.

class ObserverInterface {
virtual ~ObserverInterface() = default;
virtual void set_up(size_t num_workers) = 0;
virtual void on_entry(size_t worker_id, tf::TaskView task_view) = 0;
virtual void on_exit(size_t worker_id, tf::TaskView task_view) = 0;
};

There are three methods you must define in your derived class, tf::ObserverInterface::set_up, tf::ObserverInterface::on_entry, and tf::ObserverInterface::on_exit. The method, tf::ObserverInterface::set_up, is a constructor-like method that will be called by the executor when the observer is constructed. It passes an argument of the number of workers to observer in the executor. You may use it to preallocate or initialize data storage, e.g., an independent vector for each worker. The methods, tf::ObserverInterface::on_entry and tf::ObserverInterface::on_exit, are called by a worker thread before and after the execution context of a task, respectively. You may use them to record timepoints and calculate the elapsed time of a task.

You can associate an executor with one or multiple observers (though one is common) using tf::Executor::make_observer. We use std::shared_ptr to manage the ownership of an observer. The executor loops through each observer and invoke the corresponding methods accordingly.

1 #include <taskflow/taskflow.hpp>
2
3 struct MyObserver : public tf::ObserverInterface {
4
5 MyObserver(const std::string& name) {
6 std::cout << "constructing observer " << name << '\n';
7 }
8
9 void set_up(size_t num_workers) override final {
10 std::cout << "settting up observer with " << num_workers << " workers\n";
11 }
12
13 void on_entry(size_t w, tf::TaskView tv) override final {
15 oss << "worker " << w << " ready to run " << tv.name() << '\n';
16 std::cout << oss.str();
17 }
18
19 void on_exit(size_t w, tf::TaskView tv) override final {
21 oss << "worker " << w << " finished running " << tv.name() << '\n';
22 std::cout << oss.str();
23 }
24
25 };
26
27 int main(){
28
29 tf::Executor executor(4);
30
31 // Create a taskflow of eight tasks
32 tf::Taskflow taskflow;
33
34 auto A = taskflow.emplace([] () { std::cout << "1\n"; }).name("A");
35 auto B = taskflow.emplace([] () { std::cout << "2\n"; }).name("B");
36 auto C = taskflow.emplace([] () { std::cout << "3\n"; }).name("C");
37 auto D = taskflow.emplace([] () { std::cout << "4\n"; }).name("D");
38 auto E = taskflow.emplace([] () { std::cout << "5\n"; }).name("E");
39 auto F = taskflow.emplace([] () { std::cout << "6\n"; }).name("F");
40 auto G = taskflow.emplace([] () { std::cout << "7\n"; }).name("G");
41 auto H = taskflow.emplace([] () { std::cout << "8\n"; }).name("H");
42
43 // create an observer
44 std::shared_ptr<MyObserver> observer = executor.make_observer<MyObserver>("MyObserver");
45
46 // run the taskflow
47 executor.run(taskflow).get();
48
49 // remove the observer (optional)
50 executor.remove_observer(std::move(observer));
51
52 return 0;
53 }

The above code produces the following output:

constructing observer MyObserver
setting up observer with 4 workers
worker 2 ready to run A
1
worker 2 finished running A
worker 2 ready to run B
2
worker 1 ready to run C
worker 2 finished running B
3
worker 2 ready to run D
worker 3 ready to run E
worker 1 finished running C
4
5
worker 1 ready to run F
worker 2 finished running D
worker 3 finished running E
6
worker 2 ready to run G
worker 3 ready to run H
worker 1 finished running F
7
8
worker 2 finished running G
worker 3 finished running H

It is expected each line of std::cout interleaves with each other as there are four workers participating in task scheduling. However, the ready message always appears before the corresponding task message (e.g., numbers) and then the finished message.