4 #include "notifier.hpp" 5 #include "observer.hpp" 6 #include "taskflow.hpp" 50 Notifier::Waiter* waiter;
52 TaskQueue<Node*> wsq[NUM_DOMAINS];
53 Node* cache {
nullptr};
57 Worker* worker {
nullptr};
74 size_t M = cuda_num_devices()
152 template<
typename P,
typename C>
201 template <
typename Observer,
typename... Args>
207 template <
typename Observer>
217 const size_t _VICTIM_BEG;
218 const size_t _VICTIM_END;
219 const size_t _MAX_STEALS;
220 const size_t _MAX_YIELDS;
226 size_t _num_topologies {0};
231 #ifdef TF_ENABLE_CUDA 235 Notifier _notifier[NUM_DOMAINS];
237 TaskQueue<Node*> _wsq[NUM_DOMAINS];
239 size_t _id_offset[NUM_DOMAINS] = {0};
249 PerThread& _per_thread()
const;
251 bool _wait_for_task(Worker&, Node*&);
253 void _instantiate_tfprof();
254 void _flush_tfprof();
255 void _observer_prologue(Worker&, Node*);
256 void _observer_epilogue(Worker&, Node*);
257 void _spawn(
size_t, Domain);
258 void _worker_loop(Worker&);
259 void _exploit_task(Worker&, Node*&);
260 void _explore_task(Worker&, Node*&);
261 void _schedule(Node*,
bool);
262 void _schedule(PassiveVector<Node*>&);
263 void _invoke(Worker&, Node*);
264 void _invoke_static_work(Worker&, Node*);
265 void _invoke_dynamic_work(Worker&, Node*,
bool&);
266 void _invoke_condition_work(Worker&, Node*);
267 void _invoke_module_work(Worker&, Node*,
bool&);
269 #ifdef TF_ENABLE_CUDA 270 void _invoke_cudaflow_work(Worker&, Node*);
271 void _invoke_cudaflow_work_impl(Worker&, Node*);
274 void _set_up_topology(Topology*);
275 void _tear_down_topology(Topology**);
276 void _increment_topology();
277 void _decrement_topology();
278 void _decrement_topology_and_notify();
282 #ifdef TF_ENABLE_CUDA 286 _VICTIM_END {N + M - 1},
287 _MAX_STEALS {(N + M + 1) << 1},
290 _cuda_devices {cuda_num_devices()},
291 _notifier {Notifier(N), Notifier(M)} {
294 TF_THROW(
"no cpu workers to execute taskflows");
298 TF_THROW(
"no gpu workers to execute cudaflows");
301 for(
int i=0; i<NUM_DOMAINS; ++i) {
302 _num_actives[i].store(0, std::memory_order_relaxed);
303 _num_thieves[i].store(0, std::memory_order_relaxed);
307 for(
size_t i=0; i<_cuda_devices.size(); ++i) {
308 _cuda_devices[i].streams.resize(M);
309 cudaScopedDevice ctx(i);
310 for(
size_t m=0; m<M; ++m) {
312 cudaStreamCreate(&(_cuda_devices[i].streams[m])),
313 "failed to create a cudaStream for worker ", m,
" on device ", i
322 _instantiate_tfprof();
330 _MAX_STEALS {(N + 1) << 1},
333 _notifier {Notifier(N)} {
336 TF_THROW(
"no cpu workers to execute taskflows");
339 for(
int i=0; i<NUM_DOMAINS; ++i) {
340 _num_actives[i].store(0, std::memory_order_relaxed);
341 _num_thieves[i].store(0, std::memory_order_relaxed);
347 _instantiate_tfprof();
360 for(
int i=0; i<NUM_DOMAINS; ++i) {
361 _notifier[i].notify(
true);
364 for(
auto& t : _threads){
368 #ifdef TF_ENABLE_CUDA 370 for(
size_t i=0; i<_cuda_devices.size(); ++i) {
371 cudaScopedDevice ctx(i);
372 for(
size_t m=0; m<_cuda_devices[i].streams.size(); ++m) {
373 cudaStreamDestroy(_cuda_devices[i].streams[m]);
383 inline void Executor::_instantiate_tfprof() {
385 _tfprof = get_env(
"TF_ENABLE_PROFILER").empty() ?
386 nullptr : make_observer<TFProfObserver>().
get();
390 inline void Executor::_flush_tfprof() {
393 fpath << get_env(
"TF_ENABLE_PROFILER") << _tfprof->_uuid <<
".tfp";
401 return _workers.size();
411 return _num_topologies;
415 inline Executor::PerThread& Executor::_per_thread()
const {
416 thread_local PerThread pt;
422 auto worker = _per_thread().worker;
423 return worker ?
static_cast<int>(worker->id) : -1;
427 inline void Executor::_spawn(
size_t N, Domain d) {
429 auto id = _threads.size();
433 for(
size_t i=0; i<N; ++i, ++id) {
435 _workers[id].id = id;
436 _workers[id].victim = id;
437 _workers[id].domain = d;
438 _workers[id].executor =
this;
439 _workers[id].waiter = &_notifier[d]._waiters[i];
441 _threads.emplace_back([
this] (Worker& w) ->
void {
443 PerThread& pt = _per_thread();
455 if(_wait_for_task(w, t) ==
false) {
466 inline void Executor::_explore_task(Worker& w, Node*& t) {
471 const auto d = w.domain;
473 size_t num_steals = 0;
474 size_t num_yields = 0;
497 t = (w.id == w.victim) ? _wsq[d].steal() : _workers[w.victim].wsq[d].steal();
503 if(num_steals++ > _MAX_STEALS) {
505 if(num_yields++ > _MAX_YIELDS) {
510 w.victim = rdvtm(w.rdgen);
516 inline void Executor::_exploit_task(Worker& w, Node*& t) {
522 const auto d = w.domain;
524 if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
525 _notifier[d].notify(
false);
528 auto tpg = t->_topology;
529 auto par = t->_parent;
530 auto exe =
size_t{1};
544 if(t->_parent == par) {
551 t->_topology->_join_counter.fetch_sub(exe);
554 auto ret = par->_join_counter.fetch_sub(exe);
556 if(par->domain() == d) {
560 _schedule(par,
false);
571 if(tpg->_join_counter.fetch_sub(exe) == exe) {
573 _tear_down_topology(&tpg);
583 if(par->_join_counter.fetch_sub(exe) == exe) {
584 if(par->domain() == d) {
590 _schedule(par,
false);
603 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
605 const auto d = worker.domain;
615 _explore_task(worker, t);
618 if(_num_thieves[d].fetch_sub(1) == 1) {
619 _notifier[d].notify(
false);
624 _notifier[d].prepare_wait(worker.waiter);
627 if(!_wsq[d].empty()) {
629 _notifier[d].cancel_wait(worker.waiter);
634 if(_num_thieves[d].fetch_sub(1) == 1) {
635 _notifier[d].notify(
false);
640 worker.victim = worker.id;
646 _notifier[d].cancel_wait(worker.waiter);
647 for(
int i=0; i<NUM_DOMAINS; ++i) {
648 _notifier[i].notify(
true);
654 if(_num_thieves[d].fetch_sub(1) == 1) {
655 if(_num_actives[d]) {
656 _notifier[d].cancel_wait(worker.waiter);
660 for(
auto& w : _workers) {
661 if(!w.wsq[d].empty()) {
662 worker.victim = w.id;
663 _notifier[d].cancel_wait(worker.waiter);
670 _notifier[d].commit_wait(worker.waiter);
676 template<
typename Observer,
typename... Args>
681 "Observer must be derived from ObserverInterface" 685 auto ptr = std::make_shared<Observer>(std::forward<Args>(args)...);
687 ptr->set_up(_workers.size());
689 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
695 template <
typename Observer>
700 "Observer must be derived from ObserverInterface" 703 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
708 return _observers.size();
714 inline void Executor::_schedule(Node* node,
bool bypass_hint) {
718 const auto d = node->domain();
721 auto worker = _per_thread().worker;
723 if(worker !=
nullptr && worker->executor ==
this) {
725 assert(!worker->cache);
726 worker->cache = node;
729 worker->wsq[d].push(node);
730 if(worker->domain != d) {
731 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
732 _notifier[d].notify(
false);
745 _notifier[d].notify(
false);
751 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
757 const auto num_nodes = nodes.size();
764 auto worker = _per_thread().worker;
767 size_t tcount[NUM_DOMAINS] = {0};
769 if(worker !=
nullptr && worker->executor ==
this) {
770 for(
size_t i=0; i<num_nodes; ++i) {
771 const auto d = nodes[i]->domain();
772 worker->wsq[d].push(nodes[i]);
776 for(
int d=0; d<NUM_DOMAINS; ++d) {
777 if(tcount[d] && d != worker->domain) {
778 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
779 _notifier[d].notify_n(tcount[d]);
790 for(
size_t k=0; k<num_nodes; ++k) {
791 const auto d = nodes[k]->domain();
792 _wsq[d].push(nodes[k]);
797 for(
int d=0; d<NUM_DOMAINS; ++d) {
798 _notifier[d].notify_n(tcount[d]);
804 inline void Executor::_invoke(Worker& worker, Node* node) {
810 const auto num_successors = node->num_successors();
813 auto& c = (node->_parent) ? node->_parent->_join_counter :
814 node->_topology->_join_counter;
817 switch(node->_handle.index()) {
819 case Node::STATIC_WORK:{
820 _invoke_static_work(worker, node);
825 case Node::MODULE_WORK: {
826 bool first_time = !node->_has_state(Node::SPAWNED);
827 bool emptiness =
false;
828 _invoke_module_work(worker, node, emptiness);
829 if(first_time && !emptiness) {
836 case Node::DYNAMIC_WORK: {
838 if(!node->_has_state(Node::SPAWNED)) {
840 _invoke_dynamic_work(worker, node, join);
849 case Node::CONDITION_WORK: {
850 _invoke_condition_work(worker, node);
855 #ifdef TF_ENABLE_CUDA 856 case Node::CUDAFLOW_WORK: {
857 _invoke_cudaflow_work(worker, node);
872 if(node->_has_state(Node::BRANCH)) {
874 node->_join_counter = node->num_strong_dependents();
877 node->_join_counter = node->num_dependents();
880 node->_unset_state(Node::SPAWNED);
883 Node* cache {
nullptr};
885 for(
size_t i=0; i<num_successors; ++i) {
886 if(--(node->_successors[i]->_join_counter) == 0) {
887 if(node->_successors[i]->domain() != worker.domain) {
889 _schedule(node->_successors[i],
false);
894 _schedule(cache,
false);
896 cache = node->_successors[i];
902 _schedule(cache,
true);
907 inline void Executor::_observer_prologue(Worker& worker, Node* node) {
908 for(
auto& observer : _observers) {
909 observer->on_entry(worker.id, TaskView(node));
914 inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
915 for(
auto& observer : _observers) {
916 observer->on_exit(worker.id, TaskView(node));
921 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
922 _observer_prologue(worker, node);
923 nstd::get<Node::StaticWork>(node->_handle).work();
924 _observer_epilogue(worker, node);
928 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node,
bool& join) {
932 _observer_prologue(worker, node);
934 auto& handle = nstd::get<Node::DynamicWork>(node->_handle);
936 handle.subgraph.clear();
937 Subflow fb(handle.subgraph);
941 node->_set_state(Node::SPAWNED);
943 if(!handle.subgraph.empty()) {
945 PassiveVector<Node*> src;
947 for(
auto n : handle.subgraph._nodes) {
949 n->_topology = node->_topology;
950 n->_set_up_join_counter();
956 if(n->num_dependents() == 0) {
964 node->_topology->_join_counter.fetch_add(src.size());
967 node->_join_counter.fetch_add(src.size());
969 node->_parent ? node->_parent->_join_counter.fetch_add(1) :
970 node->_topology->_join_counter.fetch_add(1);
976 _observer_epilogue(worker, node);
980 inline void Executor::_invoke_condition_work(Worker& worker, Node* node) {
982 _observer_prologue(worker, node);
984 if(node->_has_state(Node::BRANCH)) {
985 node->_join_counter = node->num_strong_dependents();
988 node->_join_counter = node->num_dependents();
991 auto id = nstd::get<Node::ConditionWork>(node->_handle).work();
993 if(
id >= 0 && static_cast<size_t>(
id) < node->num_successors()) {
994 auto s = node->_successors[id];
995 s->_join_counter.store(0);
997 if(s->domain() == worker.domain) {
1001 node->_parent ? node->_parent->_join_counter.fetch_add(1) :
1002 node->_topology->_join_counter.fetch_add(1);
1003 _schedule(s,
false);
1007 _observer_epilogue(worker, node);
1010 #ifdef TF_ENABLE_CUDA 1012 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
1013 _observer_prologue(worker, node);
1014 _invoke_cudaflow_work_impl(worker, node);
1015 _observer_epilogue(worker, node);
1019 inline void Executor::_invoke_cudaflow_work_impl(Worker& w, Node* node) {
1021 assert(w.domain == node->domain());
1023 auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
1027 cudaFlow cf(h.graph, [repeat=1] ()
mutable { return repeat-- == 0; });
1031 if(h.graph.empty()) {
1037 const int d = cf._device;
1039 cudaScopedDevice ctx(d);
1041 auto s = cf._stream ? *(cf._stream) :
1042 _cuda_devices[d].streams[w.id - _id_offset[w.domain]];
1044 h.graph._make_native_graph();
1046 cudaGraphExec_t exec;
1049 cudaGraphInstantiate(&exec, h.graph._native_handle,
nullptr,
nullptr, 0),
1050 "failed to create an executable cudaGraph" 1053 while(!cf._predicate()) {
1055 cudaGraphLaunch(exec, s),
"failed to launch cudaGraph on stream ", s
1059 cudaStreamSynchronize(s),
"failed to synchronize stream ", s
1064 cudaGraphExecDestroy(exec),
"failed to destroy an executable cudaGraph" 1070 inline void Executor::_invoke_module_work(Worker& worker, Node* node,
bool& ept) {
1073 if(node->_has_state(Node::SPAWNED)) {
1077 _observer_prologue(worker, node);
1080 node->_set_state(Node::SPAWNED);
1082 auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
1084 if(module->empty()) {
1089 PassiveVector<Node*> src;
1091 for(
auto n: module->_graph._nodes) {
1093 n->_topology = node->_topology;
1095 n->_set_up_join_counter();
1097 if(n->num_dependents() == 0) {
1102 node->_join_counter.fetch_add(src.size());
1104 if(node->_parent ==
nullptr) {
1105 node->_topology->_join_counter.fetch_add(1);
1108 node->_parent->_join_counter.fetch_add(1);
1114 _observer_epilogue(worker, node);
1119 return run_n(f, 1, [](){});
1123 template <
typename C>
1125 return run_n(f, 1, std::forward<C>(c));
1130 return run_n(f, repeat, [](){});
1134 template <
typename C>
1136 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
1140 template<
typename P>
1142 return run_until(f, std::forward<P>(pred), [](){});
1146 inline void Executor::_set_up_topology(Topology* tpg) {
1148 tpg->_sources.clear();
1151 for(
auto node : tpg->_taskflow._graph._nodes) {
1153 node->_topology = tpg;
1154 node->_clear_state();
1156 if(node->num_dependents() == 0) {
1157 tpg->_sources.push_back(node);
1160 node->_set_up_join_counter();
1163 tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1167 inline void Executor::_tear_down_topology(Topology** tpg) {
1169 auto &f = (*tpg)->_taskflow;
1174 if(! (*tpg)->_pred() ) {
1177 assert((*tpg)->_join_counter == 0);
1178 (*tpg)->_join_counter = (*tpg)->_sources.size();
1180 _schedule((*tpg)->_sources);
1185 if((*tpg)->_call !=
nullptr) {
1192 if(f._topologies.size() > 1) {
1194 assert((*tpg)->_join_counter == 0);
1197 (*tpg)->_promise.set_value();
1198 f._topologies.pop_front();
1202 _decrement_topology();
1204 *tpg = &(f._topologies.front());
1206 _set_up_topology(*tpg);
1207 _schedule((*tpg)->_sources);
1219 assert(f._topologies.size() == 1);
1225 f._topologies.pop_front();
1232 _decrement_topology_and_notify();
1241 template <
typename P,
typename C>
1244 _increment_topology();
1247 if(f.
empty() || pred()) {
1249 promise.set_value();
1250 _decrement_topology_and_notify();
1251 return promise.get_future();
1255 bool run_now {
false};
1264 f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1265 tpg = &(f._topologies.back());
1266 future = tpg->_promise.get_future();
1268 if(f._topologies.size() == 1) {
1278 _set_up_topology(tpg);
1279 _schedule(tpg->_sources);
1286 inline void Executor::_increment_topology() {
1292 inline void Executor::_decrement_topology_and_notify() {
1294 if(--_num_topologies == 0) {
1295 _topology_cv.notify_all();
1300 inline void Executor::_decrement_topology() {
1308 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:421
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:1118
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:1141
~Executor()
destructs the executor
Definition: executor.hpp:352
T hardware_concurrency(T... args)
observer designed based on taskflow board format
Definition: observer.hpp:265
Executor(size_t N=std::thread::hardware_concurrency(), size_t M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:284
void remove_observer(std::shared_ptr< Observer > observer)
removes the associated observer
Definition: executor.hpp:696
main entry to create a task dependency graph
Definition: taskflow.hpp:18
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:405
std::shared_ptr< Observer > make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:677
void dump(std::ostream &ostream) const
dump the timelines in JSON format to an ostream
Definition: observer.hpp:386
size_t num_observers() const
queries the number of observers
Definition: executor.hpp:707
execution interface for running a taskflow graph
Definition: executor.hpp:43
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:400
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:410
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:1129
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1306