Limit the Maximum Concurrency
Contents
This chapters discusses how to limit the concurrency or the maximum number of workers in certain task sections of a graph.
Semaphore
Taskflow provides a mechanism, tf::
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); // create a semaphore with initial count 1 std::vector<tf::Task> tasks { taskflow.emplace([](){ std::cout << "A" << std::endl; }), taskflow.emplace([](){ std::cout << "B" << std::endl; }), taskflow.emplace([](){ std::cout << "C" << std::endl; }), taskflow.emplace([](){ std::cout << "D" << std::endl; }), taskflow.emplace([](){ std::cout << "E" << std::endl; }) }; for(auto & task : tasks) { // each task acquires and release the semaphore task.acquire(semaphore); task.release(semaphore); } executor.run(taskflow).wait();
The above example creates five tasks with no dependencies between them. Under normal circumstances, the five tasks would be executed concurrently. However, this example has a semaphore with initial count 1, and all tasks need to acquire that semaphore before running and release that semaphore after they are done. This organization limits the number of concurrently running tasks to only one. One possible output is shown below:
# the output is a sequential chain of five tasks A B E D C
You can use semaphores to limit the concurrency across different sections of taskflow graphs. When you submit multiple taskflows to an executor, the executor view them as a bag of dependent tasks. It does not matter which task in which taskflow graph acquires or releases a semaphore.
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow1; tf::Taskflow taskflow2; tf::Semaphore semaphore(1); // create a semaphore with initial count 1 taskflow1.emplace([](){std::cout << "task in taskflow1"; }) .acquire(semaphore) .release(semaphore); taskflow2.emplace([](){std::cout << "task in taskflow2"; }) .acquire(semaphore) .release(semaphore); executor.run(taskflow1); executor.run(taskflow2); executor.wait_for_all();
The above examples creates one task from each taskflow and submits the two taskflows to the executor. Again, under normal circumstances, the two tasks can run concurrently, but the semaphore restricts one worker to run the two task sequentially in arbitrary order.
Semaphores are powerful for limiting the maximum concurrency of not only a section of tasks but also different sections of tasks. The following example serializes the execution of five pairs of tasks using a semaphore rather than explicit dependencies.
tf::Executor executor(4); // creates an executor of 4 workers tf::Taskflow taskflow; tf::Semaphore semaphore(1); int N = 5; int counter = 0; // non-atomic integer counter for(int i=0; i<N; i++) { tf::Task f = taskflow.emplace([&](){ counter++; }) .name("from-"s + std::to_string(i)); tf::Task t = taskflow.emplace([&](){ counter++; }) .name("to-"s + std::to_string(i)); f.precede(t); f.acquire(semaphore); t.release(semaphore); } executor.run(taskflow).wait(); assert(counter == 2*N);
Under normal situation, each pair of tasks, e.g., from-0 -> to-0
, will run independently and concurrently. However, the program forces each from
task to acquire the semaphore before running its work and not to release it until its paired to
task is done. This constraint forces each pair of tasks to run sequentially, while the order of which pair runs first is up to the scheduler.
Critical Section
tf::
tf::Executor executor(8); // create an executor of 8 workers tf::Taskflow taskflow; // create a critical section of 1 worker tf::CriticalSection critical_section(1); tf::Task A = taskflow.emplace([](){ std::cout << "A" << std::endl; }); tf::Task B = taskflow.emplace([](){ std::cout << "B" << std::endl; }); tf::Task C = taskflow.emplace([](){ std::cout << "C" << std::endl; }); tf::Task D = taskflow.emplace([](){ std::cout << "D" << std::endl; }); tf::Task E = taskflow.emplace([](){ std::cout << "E" << std::endl; }); critical_section.add(A, B, C, D, E); executor.run(taskflow).wait();