4 #include "notifier.hpp" 5 #include "observer.hpp" 6 #include "taskflow.hpp" 34 Notifier::Waiter* waiter;
36 TaskQueue<Node*> wsq[NUM_DOMAINS];
40 Worker* worker {
nullptr};
57 size_t M = cuda_num_devices()
135 template<
typename P,
typename C>
183 template <
typename F,
typename... ArgsT>
188 async(F&& f, ArgsT&&... args);
201 template <
typename F,
typename... ArgsT>
206 async(F&& f, ArgsT&&... args);
220 template <
typename Observer,
typename... Args>
226 template <
typename Observer>
236 const size_t _VICTIM_BEG;
237 const size_t _VICTIM_END;
238 const size_t _MAX_STEALS;
239 const size_t _MAX_YIELDS;
245 size_t _num_topologies {0};
250 #ifdef TF_ENABLE_CUDA 254 Notifier _notifier[NUM_DOMAINS];
256 TaskQueue<Node*> _wsq[NUM_DOMAINS];
258 size_t _id_offset[NUM_DOMAINS] = {0};
268 PerThread& _per_thread()
const;
270 bool _wait_for_task(Worker&, Node*&);
272 void _instantiate_tfprof();
273 void _flush_tfprof();
274 void _observer_prologue(Worker&, Node*);
275 void _observer_epilogue(Worker&, Node*);
276 void _spawn(
size_t, Domain);
277 void _worker_loop(Worker&);
278 void _exploit_task(Worker&, Node*&);
279 void _explore_task(Worker&, Node*&);
280 void _schedule(Node*);
281 void _schedule(PassiveVector<Node*>&);
282 void _invoke(Worker&, Node*);
283 void _invoke_static_work(Worker&, Node*);
284 void _invoke_dynamic_work(Worker&, Node*);
285 void _invoke_dynamic_work_internal(Worker&, Node*, Graph&,
bool);
286 void _invoke_dynamic_work_external(Node*, Graph&,
bool);
287 void _invoke_condition_work(Worker&, Node*,
int&);
288 void _invoke_module_work(Worker&, Node*);
289 void _invoke_async_work(Worker&, Node*);
290 void _set_up_topology(Topology*);
291 void _tear_down_topology(Topology*);
292 void _increment_topology();
293 void _decrement_topology();
294 void _decrement_topology_and_notify();
296 #ifdef TF_ENABLE_CUDA 297 void _invoke_cudaflow_work(Worker&, Node*);
299 template <
typename P>
300 void _invoke_cudaflow_work_internal(Worker&,
cudaFlow&, P&&);
302 template <
typename P>
303 void _invoke_cudaflow_work_external(
cudaFlow&, P&&);
308 #ifdef TF_ENABLE_CUDA 312 _VICTIM_END {N + M - 1},
313 _MAX_STEALS {(N + M + 1) << 1},
316 _cuda_devices {cuda_num_devices()},
317 _notifier {Notifier(N), Notifier(M)} {
320 TF_THROW(
"no cpu workers to execute taskflows");
324 TF_THROW(
"no gpu workers to execute cudaflows");
327 for(
int i=0; i<NUM_DOMAINS; ++i) {
328 _num_actives[i].store(0, std::memory_order_relaxed);
329 _num_thieves[i].store(0, std::memory_order_relaxed);
333 for(
size_t i=0; i<_cuda_devices.size(); ++i) {
334 _cuda_devices[i].streams.resize(M);
335 cudaScopedDevice ctx(i);
336 for(
size_t m=0; m<M; ++m) {
338 cudaStreamCreate(&(_cuda_devices[i].streams[m])),
339 "failed to create a cudaStream for worker ", m,
" on device ", i
348 _instantiate_tfprof();
356 _MAX_STEALS {(N + 1) << 1},
359 _notifier {Notifier(N)} {
362 TF_THROW(
"no cpu workers to execute taskflows");
365 for(
int i=0; i<NUM_DOMAINS; ++i) {
366 _num_actives[i].store(0, std::memory_order_relaxed);
367 _num_thieves[i].store(0, std::memory_order_relaxed);
373 _instantiate_tfprof();
386 for(
int i=0; i<NUM_DOMAINS; ++i) {
387 _notifier[i].notify(
true);
390 for(
auto& t : _threads){
394 #ifdef TF_ENABLE_CUDA 396 for(
size_t i=0; i<_cuda_devices.size(); ++i) {
397 cudaScopedDevice ctx(i);
398 for(
size_t m=0; m<_cuda_devices[i].streams.size(); ++m) {
399 cudaStreamDestroy(_cuda_devices[i].streams[m]);
409 inline void Executor::_instantiate_tfprof() {
411 _tfprof = get_env(
"TF_ENABLE_PROFILER").empty() ?
412 nullptr : make_observer<TFProfObserver>().
get();
416 inline void Executor::_flush_tfprof() {
419 fpath << get_env(
"TF_ENABLE_PROFILER") << _tfprof->_uuid <<
".tfp";
427 return _workers.size();
437 return _num_topologies;
441 template <
typename F,
typename... ArgsT>
448 _increment_topology();
450 using R =
typename function_traits<F>::return_type;
454 auto fu = p.get_future();
456 auto node = Graph::_node_pool().animate(
457 nstd::in_place_type_t<Node::AsyncWork>{},
458 [p=make_moc(std::move(p)), f=std::forward<F>(f), args...] () {
459 p.object.set_value(f(args...));
469 template <
typename F,
typename... ArgsT>
476 _increment_topology();
480 auto fu = p.get_future();
482 auto node = Graph::_node_pool().animate(
483 nstd::in_place_type_t<Node::AsyncWork>{},
484 [p=make_moc(std::move(p)), f=std::forward<F>(f), args...] () {
486 p.object.set_value();
496 inline Executor::PerThread& Executor::_per_thread()
const {
497 thread_local PerThread pt;
503 auto worker = _per_thread().worker;
504 return worker ?
static_cast<int>(worker->id) : -1;
508 inline void Executor::_spawn(
size_t N, Domain d) {
510 auto id = _threads.size();
514 for(
size_t i=0; i<N; ++i, ++id) {
516 _workers[id].id = id;
517 _workers[id].vtm = id;
518 _workers[id].domain = d;
519 _workers[id].executor =
this;
520 _workers[id].waiter = &_notifier[d]._waiters[i];
522 _threads.emplace_back([
this] (Worker& w) ->
void {
524 PerThread& pt = _per_thread();
536 if(_wait_for_task(w, t) ==
false) {
547 inline void Executor::_explore_task(Worker& w, Node*& t) {
552 const auto d = w.domain;
554 size_t num_steals = 0;
555 size_t num_yields = 0;
578 t = (w.id == w.vtm) ? _wsq[d].steal() : _workers[w.vtm].wsq[d].steal();
584 if(num_steals++ > _MAX_STEALS) {
586 if(num_yields++ > _MAX_YIELDS) {
591 w.vtm = rdvtm(w.rdgen);
597 inline void Executor::_exploit_task(Worker& w, Node*& t) {
601 const auto d = w.domain;
603 if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
604 _notifier[d].notify(
false);
627 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
629 const auto d = worker.domain;
639 _explore_task(worker, t);
642 if(_num_thieves[d].fetch_sub(1) == 1) {
643 _notifier[d].notify(
false);
648 _notifier[d].prepare_wait(worker.waiter);
651 if(!_wsq[d].empty()) {
653 _notifier[d].cancel_wait(worker.waiter);
658 if(_num_thieves[d].fetch_sub(1) == 1) {
659 _notifier[d].notify(
false);
664 worker.vtm = worker.id;
670 _notifier[d].cancel_wait(worker.waiter);
671 for(
int i=0; i<NUM_DOMAINS; ++i) {
672 _notifier[i].notify(
true);
678 if(_num_thieves[d].fetch_sub(1) == 1) {
679 if(_num_actives[d]) {
680 _notifier[d].cancel_wait(worker.waiter);
684 for(
auto& w : _workers) {
685 if(!w.wsq[d].empty()) {
687 _notifier[d].cancel_wait(worker.waiter);
694 _notifier[d].commit_wait(worker.waiter);
700 template<
typename Observer,
typename... Args>
705 "Observer must be derived from ObserverInterface" 709 auto ptr = std::make_shared<Observer>(std::forward<Args>(args)...);
711 ptr->set_up(_workers.size());
713 _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
719 template <
typename Observer>
724 "Observer must be derived from ObserverInterface" 727 _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
732 return _observers.size();
738 inline void Executor::_schedule(Node* node) {
742 const auto d = node->domain();
745 auto worker = _per_thread().worker;
747 if(worker !=
nullptr && worker->executor ==
this) {
748 worker->wsq[d].push(node);
749 if(worker->domain != d) {
750 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
751 _notifier[d].notify(
false);
763 _notifier[d].notify(
false);
769 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
775 const auto num_nodes = nodes.size();
782 auto worker = _per_thread().worker;
785 size_t tcount[NUM_DOMAINS] = {0};
787 if(worker !=
nullptr && worker->executor ==
this) {
788 for(
size_t i=0; i<num_nodes; ++i) {
789 const auto d = nodes[i]->domain();
790 worker->wsq[d].push(nodes[i]);
794 for(
int d=0; d<NUM_DOMAINS; ++d) {
795 if(tcount[d] && d != worker->domain) {
796 if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
797 _notifier[d].notify_n(tcount[d]);
808 for(
size_t k=0; k<num_nodes; ++k) {
809 const auto d = nodes[k]->domain();
810 _wsq[d].push(nodes[k]);
815 for(
int d=0; d<NUM_DOMAINS; ++d) {
816 _notifier[d].notify_n(tcount[d]);
822 inline void Executor::_invoke(Worker& worker, Node* node) {
828 const auto num_successors = node->num_successors();
831 auto type = node->_handle.index();
839 case Node::STATIC_WORK:{
840 _invoke_static_work(worker, node);
845 case Node::DYNAMIC_WORK: {
846 _invoke_dynamic_work(worker, node);
851 case Node::CONDITION_WORK: {
852 _invoke_condition_work(worker, node, cond);
858 case Node::MODULE_WORK: {
859 _invoke_module_work(worker, node);
864 case Node::ASYNC_WORK: {
865 _invoke_async_work(worker, node);
866 _decrement_topology_and_notify();
872 #ifdef TF_ENABLE_CUDA 873 case Node::CUDAFLOW_WORK: {
874 _invoke_cudaflow_work(worker, node);
887 if(node->_has_state(Node::BRANCHED)) {
888 node->_join_counter = node->num_strong_dependents();
891 node->_join_counter = node->num_dependents();
895 auto& c = (node->_parent) ? node->_parent->_join_counter :
896 node->_topology->_join_counter;
900 if(type != Node::CONDITION_WORK) {
901 for(
size_t i=0; i<num_successors; ++i) {
902 if(--(node->_successors[i]->_join_counter) == 0) {
904 _schedule(node->_successors[i]);
910 if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
911 auto s = node->_successors[cond];
912 s->_join_counter.store(0);
919 if(node->_parent ==
nullptr) {
920 if(node->_topology->_join_counter.fetch_sub(1) == 1) {
921 _tear_down_topology(node->_topology);
925 node->_parent->_join_counter.fetch_sub(1);
930 inline void Executor::_observer_prologue(Worker& worker, Node* node) {
931 for(
auto& observer : _observers) {
932 observer->on_entry(worker.id, TaskView(node));
937 inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
938 for(
auto& observer : _observers) {
939 observer->on_exit(worker.id, TaskView(node));
944 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
945 _observer_prologue(worker, node);
946 nstd::get<Node::StaticWork>(node->_handle).work();
947 _observer_epilogue(worker, node);
951 inline void Executor::_invoke_dynamic_work(Worker& w, Node* node) {
953 _observer_prologue(w, node);
955 auto& handle = nstd::get<Node::DynamicWork>(node->_handle);
957 handle.subgraph.clear();
959 Subflow sf(*
this, node, handle.subgraph);
964 _invoke_dynamic_work_internal(w, node, handle.subgraph,
false);
968 _observer_epilogue(w, node);
972 inline void Executor::_invoke_dynamic_work_external(Node*p, Graph& g,
bool detach) {
974 auto worker = _per_thread().worker;
976 assert(worker && worker->executor ==
this);
978 _invoke_dynamic_work_internal(*worker, p, g, detach);
982 inline void Executor::_invoke_dynamic_work_internal(
983 Worker& w, Node* p, Graph& g,
bool detach
988 if(g.empty())
return;
990 PassiveVector<Node*> src;
992 for(
auto n : g._nodes) {
994 n->_topology = p->_topology;
995 n->_set_up_join_counter();
998 n->_parent =
nullptr;
999 n->_set_state(Node::DETACHED);
1005 if(n->num_dependents() == 0) {
1015 p->_topology->_taskflow._graph.merge(std::move(g));
1018 p->_topology->_join_counter.fetch_add(src.size());
1023 p->_join_counter.fetch_add(src.size());
1029 while(p->_join_counter != 0) {
1031 t = w.wsq[w.domain].pop();
1049 t = (w.id == w.vtm) ? _wsq[w.domain].steal() :
1050 _workers[w.vtm].wsq[w.domain].steal();
1054 else if(p->_join_counter != 0){
1056 w.vtm = rdvtm(w.rdgen);
1068 inline void Executor::_invoke_condition_work(Worker& worker, Node* node,
int& cond) {
1070 _observer_prologue(worker, node);
1072 cond = nstd::get<Node::ConditionWork>(node->_handle).work();
1074 _observer_epilogue(worker, node);
1077 #ifdef TF_ENABLE_CUDA 1079 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
1081 _observer_prologue(worker, node);
1083 assert(worker.domain == node->domain());
1086 auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
1090 cudaFlow cf(*
this, h.graph);
1096 _invoke_cudaflow_work_internal(
1097 worker, cf, [repeat=1] ()
mutable {
return repeat-- == 0; }
1099 cf._joinable =
false;
1102 _observer_epilogue(worker, node);
1106 template <
typename P>
1107 void Executor::_invoke_cudaflow_work_internal(
1108 Worker& w, cudaFlow& cf, P&& predicate
1116 auto d = (cf._device == -1) ? 0 : cf._device;
1118 cudaScopedDevice ctx(d);
1120 auto s = _cuda_devices[d].streams[w.id - _id_offset[w.domain]];
1127 cf._graph._create_native_graph();
1129 while(!predicate()) {
1132 cudaGraphLaunch(cf._graph._native_handle.image, s),
1133 "failed to launch cudaFlow on device ", d
1137 cudaStreamSynchronize(s),
1138 "failed to synchronize cudaFlow on device ", d
1142 cf._graph._destroy_native_graph();
1146 template <
typename P>
1147 void Executor::_invoke_cudaflow_work_external(cudaFlow& cf, P&& predicate) {
1149 auto w = _per_thread().worker;
1151 assert(w && w->executor ==
this);
1153 _invoke_cudaflow_work_internal(*w, cf, std::forward<P>(predicate));
1158 inline void Executor::_invoke_module_work(Worker& w, Node* node) {
1160 _observer_prologue(w, node);
1162 auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
1164 _invoke_dynamic_work_internal(w, node, module->_graph,
false);
1166 _observer_epilogue(w, node);
1170 inline void Executor::_invoke_async_work(Worker& w, Node* node) {
1171 _observer_prologue(w, node);
1173 nstd::get<Node::AsyncWork>(node->_handle).work();
1175 _observer_epilogue(w, node);
1178 Graph::_node_pool().recycle(node);
1183 return run_n(f, 1, [](){});
1187 template <
typename C>
1189 return run_n(f, 1, std::forward<C>(c));
1194 return run_n(f, repeat, [](){});
1198 template <
typename C>
1201 f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c)
1206 template<
typename P>
1208 return run_until(f, std::forward<P>(pred), [](){});
1212 inline void Executor::_set_up_topology(Topology* tpg) {
1214 tpg->_sources.clear();
1215 tpg->_taskflow._graph.clear_detached();
1218 for(
auto node : tpg->_taskflow._graph._nodes) {
1220 node->_topology = tpg;
1221 node->_clear_state();
1223 if(node->num_dependents() == 0) {
1224 tpg->_sources.push_back(node);
1227 node->_set_up_join_counter();
1230 tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1234 inline void Executor::_tear_down_topology(Topology* tpg) {
1236 auto &f = tpg->_taskflow;
1241 if(! tpg->_pred() ) {
1244 assert(tpg->_join_counter == 0);
1245 tpg->_join_counter = tpg->_sources.size();
1247 _schedule(tpg->_sources);
1252 if(tpg->_call !=
nullptr) {
1259 if(f._topologies.size() > 1) {
1261 assert(tpg->_join_counter == 0);
1264 tpg->_promise.set_value();
1265 f._topologies.pop_front();
1269 _decrement_topology();
1271 tpg = &(f._topologies.front());
1273 _set_up_topology(tpg);
1274 _schedule(tpg->_sources);
1286 assert(f._topologies.size() == 1);
1296 f._topologies.pop_front();
1303 _decrement_topology_and_notify();
1309 template <
typename P,
typename C>
1312 _increment_topology();
1315 if(f.
empty() || pred()) {
1317 promise.set_value();
1318 _decrement_topology_and_notify();
1319 return promise.get_future();
1323 bool run_now {
false};
1332 f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1333 tpg = &(f._topologies.back());
1334 future = tpg->_promise.get_future();
1336 if(f._topologies.size() == 1) {
1346 _set_up_topology(tpg);
1347 _schedule(tpg->_sources);
1354 inline void Executor::_increment_topology() {
1360 inline void Executor::_decrement_topology_and_notify() {
1362 if(--_num_topologies == 0) {
1363 _topology_cv.notify_all();
1368 inline void Executor::_decrement_topology() {
1376 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
1386 TF_THROW(
"subflow not joinable");
1389 _executor._invoke_dynamic_work_external(_parent,
_graph,
false);
1396 TF_THROW(
"subflow already joined or detached");
1399 _executor._invoke_dynamic_work_external(_parent,
_graph,
true);
1407 #ifdef TF_ENABLE_CUDA 1410 template <
typename P>
1411 void cudaFlow::offload_until(P&& predicate) {
1414 TF_THROW(
"cudaFlow already joined");
1417 _executor._invoke_cudaflow_work_external(*
this, std::forward<P>(predicate));
1421 inline void cudaFlow::offload_n(
size_t n) {
1422 offload_until([repeat=n] ()
mutable {
return repeat-- == 0; });
1426 inline void cudaFlow::offload() {
1427 offload_until([repeat=1] ()
mutable {
return repeat-- == 0; });
1431 template <
typename P>
1435 TF_THROW(
"cudaFlow already joined");
1438 _executor._invoke_cudaflow_work_external(*
this, std::forward<P>(predicate));
1444 join_until([repeat=n] ()
mutable {
return repeat-- == 0; });
1449 join_until([repeat=1] ()
mutable {
return repeat-- == 0; });
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:502
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:1182
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:1207
~Executor()
destructs the executor
Definition: executor.hpp:378
Graph & _graph
associated graph object
Definition: flow_builder.hpp:584
T hardware_concurrency(T... args)
methods for building a CUDA task dependency graph.
Definition: cuda_flow.hpp:26
observer designed based on taskflow board format
Definition: observer.hpp:262
void join_until(P &&predicate)
offloads the cudaFlow with the given stop predicate and then joins the execution
Definition: executor.hpp:1432
void detach()
enables the subflow to detach from its parent task
Definition: executor.hpp:1393
Executor(size_t N=std::thread::hardware_concurrency(), size_t M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:310
void remove_observer(std::shared_ptr< Observer > observer)
removes the associated observer
Definition: executor.hpp:720
main entry to create a task dependency graph
Definition: core/taskflow.hpp:18
std::enable_if_t< !std::is_same< typename function_traits< F >::return_type, void >::value, std::future< typename function_traits< F >::return_type > > async(F &&f, ArgsT &&... args)
runs a given function asynchronously
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:132
void join()
offloads the cudaFlow once and then joins the execution
Definition: executor.hpp:1448
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:431
std::shared_ptr< Observer > make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:701
void dump(std::ostream &ostream) const
dump the timelines in JSON format to an ostream
Definition: observer.hpp:395
size_t num_observers() const
queries the number of observers
Definition: executor.hpp:731
execution interface for running a taskflow graph
Definition: executor.hpp:24
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:426
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:436
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:698
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:1193
void join_n(size_t N)
offloads the cudaFlow by the given times and then joins the execution
Definition: executor.hpp:1443
void join()
enables the subflow to join its parent task
Definition: executor.hpp:1383
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1374