Taskflow  2.4-master-branch
executor.hpp
1 #pragma once
2 
3 #include "tsq.hpp"
4 #include "notifier.hpp"
5 #include "observer.hpp"
6 #include "taskflow.hpp"
7 
8 namespace tf {
9 
10 
16 //class WorkerView {
17 //
18 // friend class Executor;
19 //
20 // public:
21 //
22 //
23 // private:
24 //
25 // Worker* _worker;
26 //
27 //};
28 
29 
30 // ----------------------------------------------------------------------------
31 // Executor Definition
32 // ----------------------------------------------------------------------------
33 
34 
43 class Executor {
44 
45  struct Worker {
46  size_t id;
47  size_t victim;
48  Domain domain;
49  Executor* executor;
50  Notifier::Waiter* waiter;
51  std::mt19937 rdgen { std::random_device{}() };
52  TaskQueue<Node*> wsq[NUM_DOMAINS];
53  Node* cache {nullptr};
54  };
55 
56  struct PerThread {
57  Worker* worker {nullptr};
58  };
59 
60 #ifdef TF_ENABLE_CUDA
61  struct cudaDevice {
63  };
64 #endif
65 
66  public:
67 
68 #ifdef TF_ENABLE_CUDA
69 
72  explicit Executor(
74  size_t M = cuda_num_devices()
75  );
76 #else
77 
80  explicit Executor(size_t N = std::thread::hardware_concurrency());
81 #endif
82 
86  ~Executor();
87 
95  std::future<void> run(Taskflow& taskflow);
96 
105  template<typename C>
106  std::future<void> run(Taskflow& taskflow, C&& callable);
107 
116  std::future<void> run_n(Taskflow& taskflow, size_t N);
117 
127  template<typename C>
128  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
129 
139  template<typename P>
140  std::future<void> run_until(Taskflow& taskflow, P&& pred);
141 
152  template<typename P, typename C>
153  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
154 
158  void wait_for_all();
159 
163  size_t num_workers() const;
164 
171  size_t num_topologies() const;
172 
179  size_t num_domains() const;
180 
187  int this_worker_id() const;
188 
201  template <typename Observer, typename... Args>
203 
207  template <typename Observer>
209 
213  size_t num_observers() const;
214 
215  private:
216 
217  const size_t _VICTIM_BEG;
218  const size_t _VICTIM_END;
219  const size_t _MAX_STEALS;
220  const size_t _MAX_YIELDS;
221 
222  std::condition_variable _topology_cv;
223  std::mutex _topology_mutex;
224  std::mutex _wsq_mutex;
225 
226  size_t _num_topologies {0};
227 
228  std::vector<Worker> _workers;
229  std::vector<std::thread> _threads;
230 
231 #ifdef TF_ENABLE_CUDA
232  std::vector<cudaDevice> _cuda_devices;
233 #endif
234 
235  Notifier _notifier[NUM_DOMAINS];
236 
237  TaskQueue<Node*> _wsq[NUM_DOMAINS];
238 
239  size_t _id_offset[NUM_DOMAINS] = {0};
240 
241  std::atomic<size_t> _num_actives[NUM_DOMAINS];
242  std::atomic<size_t> _num_thieves[NUM_DOMAINS];
243  std::atomic<bool> _done {0};
244 
246 
247  TFProfObserver* _tfprof;
248 
249  PerThread& _per_thread() const;
250 
251  bool _wait_for_task(Worker&, Node*&);
252 
253  void _instantiate_tfprof();
254  void _flush_tfprof();
255  void _observer_prologue(Worker&, Node*);
256  void _observer_epilogue(Worker&, Node*);
257  void _spawn(size_t, Domain);
258  void _worker_loop(Worker&);
259  void _exploit_task(Worker&, Node*&);
260  void _explore_task(Worker&, Node*&);
261  void _schedule(Node*, bool);
262  void _schedule(PassiveVector<Node*>&);
263  void _invoke(Worker&, Node*);
264  void _invoke_static_work(Worker&, Node*);
265  void _invoke_dynamic_work(Worker&, Node*, bool&);
266  void _invoke_condition_work(Worker&, Node*);
267  void _invoke_module_work(Worker&, Node*, bool&);
268 
269 #ifdef TF_ENABLE_CUDA
270  void _invoke_cudaflow_work(Worker&, Node*);
271  void _invoke_cudaflow_work_impl(Worker&, Node*);
272 #endif
273 
274  void _set_up_topology(Topology*);
275  void _tear_down_topology(Topology**);
276  void _increment_topology();
277  void _decrement_topology();
278  void _decrement_topology_and_notify();
279 };
280 
281 
282 #ifdef TF_ENABLE_CUDA
283 // Constructor
284 inline Executor::Executor(size_t N, size_t M) :
285  _VICTIM_BEG {0},
286  _VICTIM_END {N + M - 1},
287  _MAX_STEALS {(N + M + 1) << 1},
288  _MAX_YIELDS {100},
289  _workers {N + M},
290  _cuda_devices {cuda_num_devices()},
291  _notifier {Notifier(N), Notifier(M)} {
292 
293  if(N == 0) {
294  TF_THROW("no cpu workers to execute taskflows");
295  }
296 
297  if(M == 0) {
298  TF_THROW("no gpu workers to execute cudaflows");
299  }
300 
301  for(int i=0; i<NUM_DOMAINS; ++i) {
302  _num_actives[i].store(0, std::memory_order_relaxed);
303  _num_thieves[i].store(0, std::memory_order_relaxed);
304  }
305 
306  // create a per-worker stream on each cuda device
307  for(size_t i=0; i<_cuda_devices.size(); ++i) {
308  _cuda_devices[i].streams.resize(M);
309  cudaScopedDevice ctx(i);
310  for(size_t m=0; m<M; ++m) {
311  TF_CHECK_CUDA(
312  cudaStreamCreate(&(_cuda_devices[i].streams[m])),
313  "failed to create a cudaStream for worker ", m, " on device ", i
314  );
315  }
316  }
317 
318  _spawn(N, HOST);
319  _spawn(M, CUDA);
320 
321  // initiate the observer if requested
322  _instantiate_tfprof();
323 }
324 
325 #else
326 // Constructor
327 inline Executor::Executor(size_t N) :
328  _VICTIM_BEG {0},
329  _VICTIM_END {N - 1},
330  _MAX_STEALS {(N + 1) << 1},
331  _MAX_YIELDS {100},
332  _workers {N},
333  _notifier {Notifier(N)} {
334 
335  if(N == 0) {
336  TF_THROW("no cpu workers to execute taskflows");
337  }
338 
339  for(int i=0; i<NUM_DOMAINS; ++i) {
340  _num_actives[i].store(0, std::memory_order_relaxed);
341  _num_thieves[i].store(0, std::memory_order_relaxed);
342  }
343 
344  _spawn(N, HOST);
345 
346  // instantite the default observer if requested
347  _instantiate_tfprof();
348 }
349 #endif
350 
351 // Destructor
353 
354  // wait for all topologies to complete
355  wait_for_all();
356 
357  // shut down the scheduler
358  _done = true;
359 
360  for(int i=0; i<NUM_DOMAINS; ++i) {
361  _notifier[i].notify(true);
362  }
363 
364  for(auto& t : _threads){
365  t.join();
366  }
367 
368 #ifdef TF_ENABLE_CUDA
369  // clean up the cuda streams
370  for(size_t i=0; i<_cuda_devices.size(); ++i) {
371  cudaScopedDevice ctx(i);
372  for(size_t m=0; m<_cuda_devices[i].streams.size(); ++m) {
373  cudaStreamDestroy(_cuda_devices[i].streams[m]);
374  }
375  }
376 #endif
377 
378  // flush the default observer
379  _flush_tfprof();
380 }
381 
382 // Procedure: _instantiate_tfprof
383 inline void Executor::_instantiate_tfprof() {
384  // TF_OBSERVER_TYPE
385  _tfprof = get_env("TF_ENABLE_PROFILER").empty() ?
386  nullptr : make_observer<TFProfObserver>().get();
387 }
388 
389 // Procedure: _flush_tfprof
390 inline void Executor::_flush_tfprof() {
391  if(_tfprof) {
392  std::ostringstream fpath;
393  fpath << get_env("TF_ENABLE_PROFILER") << _tfprof->_uuid << ".tfp";
394  std::ofstream ofs(fpath.str());
395  _tfprof->dump(ofs);
396  }
397 }
398 
399 // Function: num_workers
400 inline size_t Executor::num_workers() const {
401  return _workers.size();
402 }
403 
404 // Function: num_domains
405 inline size_t Executor::num_domains() const {
406  return NUM_DOMAINS;
407 }
408 
409 // Function: num_topologies
410 inline size_t Executor::num_topologies() const {
411  return _num_topologies;
412 }
413 
414 // Function: _per_thread
415 inline Executor::PerThread& Executor::_per_thread() const {
416  thread_local PerThread pt;
417  return pt;
418 }
419 
420 // Function: this_worker_id
421 inline int Executor::this_worker_id() const {
422  auto worker = _per_thread().worker;
423  return worker ? static_cast<int>(worker->id) : -1;
424 }
425 
426 // Procedure: _spawn
427 inline void Executor::_spawn(size_t N, Domain d) {
428 
429  auto id = _threads.size();
430 
431  _id_offset[d] = id;
432 
433  for(size_t i=0; i<N; ++i, ++id) {
434 
435  _workers[id].id = id;
436  _workers[id].victim = id;
437  _workers[id].domain = d;
438  _workers[id].executor = this;
439  _workers[id].waiter = &_notifier[d]._waiters[i];
440 
441  _threads.emplace_back([this] (Worker& w) -> void {
442 
443  PerThread& pt = _per_thread();
444  pt.worker = &w;
445 
446  Node* t = nullptr;
447 
448  // must use 1 as condition instead of !done
449  while(1) {
450 
451  // execute the tasks.
452  _exploit_task(w, t);
453 
454  // wait for tasks
455  if(_wait_for_task(w, t) == false) {
456  break;
457  }
458  }
459 
460  }, std::ref(_workers[id]));
461  }
462 
463 }
464 
465 // Function: _explore_task
466 inline void Executor::_explore_task(Worker& w, Node*& t) {
467 
468  //assert(_workers[w].wsq.empty());
469  assert(!t);
470 
471  const auto d = w.domain;
472 
473  size_t num_steals = 0;
474  size_t num_yields = 0;
475 
476  std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END);
477 
478  //while(!_done) {
479  //
480  // size_t vtm = rdvtm(w.rdgen);
481  //
482  // t = (vtm == w.id) ? _wsq[d].steal() : _workers[vtm].wsq[d].steal();
483 
484  // if(t) {
485  // break;
486  // }
487 
488  // if(num_steal++ > _MAX_STEALS) {
489  // std::this_thread::yield();
490  // if(num_yields++ > _MAX_YIELDS) {
491  // break;
492  // }
493  // }
494  //}
495 
496  do {
497  t = (w.id == w.victim) ? _wsq[d].steal() : _workers[w.victim].wsq[d].steal();
498 
499  if(t) {
500  break;
501  }
502 
503  if(num_steals++ > _MAX_STEALS) {
505  if(num_yields++ > _MAX_YIELDS) {
506  break;
507  }
508  }
509 
510  w.victim = rdvtm(w.rdgen);
511  } while(!_done);
512 
513 }
514 
515 // Procedure: _exploit_task
516 inline void Executor::_exploit_task(Worker& w, Node*& t) {
517 
518  assert(!w.cache);
519 
520  if(t) {
521 
522  const auto d = w.domain;
523 
524  if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
525  _notifier[d].notify(false);
526  }
527 
528  auto tpg = t->_topology;
529  auto par = t->_parent;
530  auto exe = size_t{1};
531 
532  do {
533  _invoke(w, t);
534 
535  if(w.cache) {
536  t = w.cache;
537  w.cache = nullptr;
538  }
539  else {
540  t = w.wsq[d].pop();
541  if(t) {
542  // We only increment the counter when poping task from wsq
543  // (NOT including cache!)
544  if(t->_parent == par) {
545  exe++;
546  }
547  // joined subflow
548  else {
549  if(par == nullptr) {
550  // still have tasks so the topology join counter can't be zero
551  t->_topology->_join_counter.fetch_sub(exe);
552  }
553  else {
554  auto ret = par->_join_counter.fetch_sub(exe);
555  if(ret == exe) {
556  if(par->domain() == d) {
557  w.wsq[d].push(par);
558  }
559  else {
560  _schedule(par, false);
561  }
562  }
563  }
564  exe = 1;
565  par = t->_parent;
566  }
567  }
568  else {
569  // If no more local tasks!
570  if(par == nullptr) {
571  if(tpg->_join_counter.fetch_sub(exe) == exe) {
572  // TODO: Store tpg in local variable not in w
573  _tear_down_topology(&tpg);
574  if(tpg != nullptr) {
575  t = w.wsq[d].pop();
576  if(t) {
577  exe = 1;
578  }
579  }
580  }
581  }
582  else {
583  if(par->_join_counter.fetch_sub(exe) == exe) {
584  if(par->domain() == d) {
585  t = par;
586  par = par->_parent;
587  exe = 1;
588  }
589  else {
590  _schedule(par, false);
591  }
592  }
593  }
594  }
595  }
596  } while(t);
597 
598  --_num_actives[d];
599  }
600 }
601 
602 // Function: _wait_for_task
603 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
604 
605  const auto d = worker.domain;
606 
607  wait_for_task:
608 
609  assert(!t);
610 
611  ++_num_thieves[d];
612 
613  explore_task:
614 
615  _explore_task(worker, t);
616 
617  if(t) {
618  if(_num_thieves[d].fetch_sub(1) == 1) {
619  _notifier[d].notify(false);
620  }
621  return true;
622  }
623 
624  _notifier[d].prepare_wait(worker.waiter);
625 
626  //if(auto vtm = _find_victim(me); vtm != _workers.size()) {
627  if(!_wsq[d].empty()) {
628 
629  _notifier[d].cancel_wait(worker.waiter);
630  //t = (vtm == me) ? _wsq.steal() : _workers[vtm].wsq.steal();
631 
632  t = _wsq[d].steal();
633  if(t) {
634  if(_num_thieves[d].fetch_sub(1) == 1) {
635  _notifier[d].notify(false);
636  }
637  return true;
638  }
639  else {
640  worker.victim = worker.id;
641  goto explore_task;
642  }
643  }
644 
645  if(_done) {
646  _notifier[d].cancel_wait(worker.waiter);
647  for(int i=0; i<NUM_DOMAINS; ++i) {
648  _notifier[i].notify(true);
649  }
650  --_num_thieves[d];
651  return false;
652  }
653 
654  if(_num_thieves[d].fetch_sub(1) == 1) {
655  if(_num_actives[d]) {
656  _notifier[d].cancel_wait(worker.waiter);
657  goto wait_for_task;
658  }
659  // check all domain queue again
660  for(auto& w : _workers) {
661  if(!w.wsq[d].empty()) {
662  worker.victim = w.id;
663  _notifier[d].cancel_wait(worker.waiter);
664  goto wait_for_task;
665  }
666  }
667  }
668 
669  // Now I really need to relinguish my self to others
670  _notifier[d].commit_wait(worker.waiter);
671 
672  return true;
673 }
674 
675 // Function: make_observer
676 template<typename Observer, typename... Args>
678 
679  static_assert(
681  "Observer must be derived from ObserverInterface"
682  );
683 
684  // use a local variable to mimic the constructor
685  auto ptr = std::make_shared<Observer>(std::forward<Args>(args)...);
686 
687  ptr->set_up(_workers.size());
688 
689  _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
690 
691  return ptr;
692 }
693 
694 // Procedure: remove_observer
695 template <typename Observer>
697 
698  static_assert(
700  "Observer must be derived from ObserverInterface"
701  );
702 
703  _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
704 }
705 
706 // Function: num_observers
707 inline size_t Executor::num_observers() const {
708  return _observers.size();
709 }
710 
711 // Procedure: _schedule
712 // The main procedure to schedule a give task node.
713 // Each task node has two types of tasks - regular and subflow.
714 inline void Executor::_schedule(Node* node, bool bypass_hint) {
715 
716  //assert(_workers.size() != 0);
717 
718  const auto d = node->domain();
719 
720  // caller is a worker to this pool
721  auto worker = _per_thread().worker;
722 
723  if(worker != nullptr && worker->executor == this) {
724  if(bypass_hint) {
725  assert(!worker->cache);
726  worker->cache = node;
727  }
728  else {
729  worker->wsq[d].push(node);
730  if(worker->domain != d) {
731  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
732  _notifier[d].notify(false);
733  }
734  }
735  }
736  return;
737  }
738 
739  // other threads
740  {
741  std::lock_guard<std::mutex> lock(_wsq_mutex);
742  _wsq[d].push(node);
743  }
744 
745  _notifier[d].notify(false);
746 }
747 
748 // Procedure: _schedule
749 // The main procedure to schedule a set of task nodes.
750 // Each task node has two types of tasks - regular and subflow.
751 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
752 
753  //assert(_workers.size() != 0);
754 
755  // We need to cacth the node count to avoid accessing the nodes
756  // vector while the parent topology is removed!
757  const auto num_nodes = nodes.size();
758 
759  if(num_nodes == 0) {
760  return;
761  }
762 
763  // worker thread
764  auto worker = _per_thread().worker;
765 
766  // task counts
767  size_t tcount[NUM_DOMAINS] = {0};
768 
769  if(worker != nullptr && worker->executor == this) {
770  for(size_t i=0; i<num_nodes; ++i) {
771  const auto d = nodes[i]->domain();
772  worker->wsq[d].push(nodes[i]);
773  tcount[d]++;
774  }
775 
776  for(int d=0; d<NUM_DOMAINS; ++d) {
777  if(tcount[d] && d != worker->domain) {
778  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
779  _notifier[d].notify_n(tcount[d]);
780  }
781  }
782  }
783 
784  return;
785  }
786 
787  // other threads
788  {
789  std::lock_guard<std::mutex> lock(_wsq_mutex);
790  for(size_t k=0; k<num_nodes; ++k) {
791  const auto d = nodes[k]->domain();
792  _wsq[d].push(nodes[k]);
793  tcount[d]++;
794  }
795  }
796 
797  for(int d=0; d<NUM_DOMAINS; ++d) {
798  _notifier[d].notify_n(tcount[d]);
799  }
800 }
801 
802 
803 // Procedure: _invoke
804 inline void Executor::_invoke(Worker& worker, Node* node) {
805 
806  //assert(_workers.size() != 0);
807 
808  // Here we need to fetch the num_successors first to avoid the invalid memory
809  // access caused by topology clear.
810  const auto num_successors = node->num_successors();
811 
812  // acquire the parent flow counter
813  auto& c = (node->_parent) ? node->_parent->_join_counter :
814  node->_topology->_join_counter;
815 
816  // switch is faster than nested if-else due to jump table
817  switch(node->_handle.index()) {
818  // static task
819  case Node::STATIC_WORK:{
820  _invoke_static_work(worker, node);
821  }
822  break;
823 
824  // module task
825  case Node::MODULE_WORK: {
826  bool first_time = !node->_has_state(Node::SPAWNED);
827  bool emptiness = false;
828  _invoke_module_work(worker, node, emptiness);
829  if(first_time && !emptiness) {
830  return;
831  }
832  }
833  break;
834 
835  // dynamic task
836  case Node::DYNAMIC_WORK: {
837  // Need to create a subflow if it is the first time entering here
838  if(!node->_has_state(Node::SPAWNED)) {
839  bool join = false;
840  _invoke_dynamic_work(worker, node, join);
841  if(join) {
842  return;
843  }
844  }
845  }
846  break;
847 
848  // condition task
849  case Node::CONDITION_WORK: {
850  _invoke_condition_work(worker, node);
851  return ;
852  } // no need to add a break here due to the immediate return
853 
854  // cudaflow task
855 #ifdef TF_ENABLE_CUDA
856  case Node::CUDAFLOW_WORK: {
857  _invoke_cudaflow_work(worker, node);
858  }
859  break;
860 #endif
861 
862  // monostate
863  default:
864  break;
865  }
866 
867 
868  // We MUST recover the dependency since subflow may have
869  // a condition node to go back (cyclic).
870  // This must be done before scheduling the successors, otherwise this might cause
871  // race condition on the _dependents
872  if(node->_has_state(Node::BRANCH)) {
873  // If this is a case node, we need to deduct condition predecessors
874  node->_join_counter = node->num_strong_dependents();
875  }
876  else {
877  node->_join_counter = node->num_dependents();
878  }
879 
880  node->_unset_state(Node::SPAWNED);
881 
882  // At this point, the node storage might be destructed.
883  Node* cache {nullptr};
884 
885  for(size_t i=0; i<num_successors; ++i) {
886  if(--(node->_successors[i]->_join_counter) == 0) {
887  if(node->_successors[i]->domain() != worker.domain) {
888  c.fetch_add(1);
889  _schedule(node->_successors[i], false);
890  }
891  else {
892  if(cache) {
893  c.fetch_add(1);
894  _schedule(cache, false);
895  }
896  cache = node->_successors[i];
897  }
898  }
899  }
900 
901  if(cache) {
902  _schedule(cache, true);
903  }
904 }
905 
906 // Procedure: _observer_prologue
907 inline void Executor::_observer_prologue(Worker& worker, Node* node) {
908  for(auto& observer : _observers) {
909  observer->on_entry(worker.id, TaskView(node));
910  }
911 }
912 
913 // Procedure: _observer_epilogue
914 inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
915  for(auto& observer : _observers) {
916  observer->on_exit(worker.id, TaskView(node));
917  }
918 }
919 
920 // Procedure: _invoke_static_work
921 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
922  _observer_prologue(worker, node);
923  nstd::get<Node::StaticWork>(node->_handle).work();
924  _observer_epilogue(worker, node);
925 }
926 
927 // Procedure: _invoke_dynamic_work
928 inline void Executor::_invoke_dynamic_work(Worker& worker, Node* node, bool& join) {
929 
930  //assert(!node->_has_state(Node::SPAWNED));
931 
932  _observer_prologue(worker, node);
933 
934  auto& handle = nstd::get<Node::DynamicWork>(node->_handle);
935 
936  handle.subgraph.clear();
937  Subflow fb(handle.subgraph);
938 
939  handle.work(fb);
940 
941  node->_set_state(Node::SPAWNED);
942 
943  if(!handle.subgraph.empty()) {
944 
945  PassiveVector<Node*> src;
946 
947  for(auto n : handle.subgraph._nodes) {
948 
949  n->_topology = node->_topology;
950  n->_set_up_join_counter();
951 
952  if(!fb.detached()) {
953  n->_parent = node;
954  }
955 
956  if(n->num_dependents() == 0) {
957  src.push_back(n);
958  }
959  }
960 
961  join = fb.joined();
962 
963  if(!join) { // Detach mode
964  node->_topology->_join_counter.fetch_add(src.size());
965  }
966  else { // Join mode (spawned nodes need second-round execution
967  node->_join_counter.fetch_add(src.size());
968 
969  node->_parent ? node->_parent->_join_counter.fetch_add(1) :
970  node->_topology->_join_counter.fetch_add(1);
971  }
972 
973  _schedule(src);
974  }
975 
976  _observer_epilogue(worker, node);
977 }
978 
979 // Procedure: _invoke_condition_work
980 inline void Executor::_invoke_condition_work(Worker& worker, Node* node) {
981 
982  _observer_prologue(worker, node);
983 
984  if(node->_has_state(Node::BRANCH)) {
985  node->_join_counter = node->num_strong_dependents();
986  }
987  else {
988  node->_join_counter = node->num_dependents();
989  }
990 
991  auto id = nstd::get<Node::ConditionWork>(node->_handle).work();
992 
993  if(id >= 0 && static_cast<size_t>(id) < node->num_successors()) {
994  auto s = node->_successors[id];
995  s->_join_counter.store(0);
996 
997  if(s->domain() == worker.domain) {
998  _schedule(s, true);
999  }
1000  else {
1001  node->_parent ? node->_parent->_join_counter.fetch_add(1) :
1002  node->_topology->_join_counter.fetch_add(1);
1003  _schedule(s, false);
1004  }
1005  }
1006 
1007  _observer_epilogue(worker, node);
1008 }
1009 
1010 #ifdef TF_ENABLE_CUDA
1011 // Procedure: _invoke_cudaflow_work
1012 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
1013  _observer_prologue(worker, node);
1014  _invoke_cudaflow_work_impl(worker, node);
1015  _observer_epilogue(worker, node);
1016 }
1017 
1018 // Procedure: _invoke_cudaflow_work_impl
1019 inline void Executor::_invoke_cudaflow_work_impl(Worker& w, Node* node) {
1020 
1021  assert(w.domain == node->domain());
1022 
1023  auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
1024 
1025  h.graph.clear();
1026 
1027  cudaFlow cf(h.graph, [repeat=1] () mutable { return repeat-- == 0; });
1028 
1029  h.work(cf);
1030 
1031  if(h.graph.empty()) {
1032  return;
1033  }
1034 
1035  // transforms cudaFlow to a native cudaGraph under the specified device
1036  // and launches the graph through a given or an internal device stream
1037  const int d = cf._device;
1038 
1039  cudaScopedDevice ctx(d);
1040 
1041  auto s = cf._stream ? *(cf._stream) :
1042  _cuda_devices[d].streams[w.id - _id_offset[w.domain]];
1043 
1044  h.graph._make_native_graph();
1045 
1046  cudaGraphExec_t exec;
1047 
1048  TF_CHECK_CUDA(
1049  cudaGraphInstantiate(&exec, h.graph._native_handle, nullptr, nullptr, 0),
1050  "failed to create an executable cudaGraph"
1051  );
1052 
1053  while(!cf._predicate()) {
1054  TF_CHECK_CUDA(
1055  cudaGraphLaunch(exec, s), "failed to launch cudaGraph on stream ", s
1056  );
1057 
1058  TF_CHECK_CUDA(
1059  cudaStreamSynchronize(s), "failed to synchronize stream ", s
1060  );
1061  }
1062 
1063  TF_CHECK_CUDA(
1064  cudaGraphExecDestroy(exec), "failed to destroy an executable cudaGraph"
1065  );
1066 }
1067 #endif
1068 
1069 // Procedure: _invoke_module_work
1070 inline void Executor::_invoke_module_work(Worker& worker, Node* node, bool& ept) {
1071 
1072  // second time to enter this context
1073  if(node->_has_state(Node::SPAWNED)) {
1074  return;
1075  }
1076 
1077  _observer_prologue(worker, node);
1078 
1079  // first time to enter this context
1080  node->_set_state(Node::SPAWNED);
1081 
1082  auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
1083 
1084  if(module->empty()) {
1085  ept = true;
1086  return;
1087  }
1088 
1089  PassiveVector<Node*> src;
1090 
1091  for(auto n: module->_graph._nodes) {
1092 
1093  n->_topology = node->_topology;
1094  n->_parent = node;
1095  n->_set_up_join_counter();
1096 
1097  if(n->num_dependents() == 0) {
1098  src.push_back(n);
1099  }
1100  }
1101 
1102  node->_join_counter.fetch_add(src.size());
1103 
1104  if(node->_parent == nullptr) {
1105  node->_topology->_join_counter.fetch_add(1);
1106  }
1107  else {
1108  node->_parent->_join_counter.fetch_add(1);
1109  }
1110 
1111  // src can't be empty (banned outside)
1112  _schedule(src);
1113 
1114  _observer_epilogue(worker, node);
1115 }
1116 
1117 // Function: run
1119  return run_n(f, 1, [](){});
1120 }
1121 
1122 // Function: run
1123 template <typename C>
1125  return run_n(f, 1, std::forward<C>(c));
1126 }
1127 
1128 // Function: run_n
1129 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
1130  return run_n(f, repeat, [](){});
1131 }
1132 
1133 // Function: run_n
1134 template <typename C>
1135 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
1136  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
1137 }
1138 
1139 // Function: run_until
1140 template<typename P>
1142  return run_until(f, std::forward<P>(pred), [](){});
1143 }
1144 
1145 // Function: _set_up_topology
1146 inline void Executor::_set_up_topology(Topology* tpg) {
1147 
1148  tpg->_sources.clear();
1149 
1150  // scan each node in the graph and build up the links
1151  for(auto node : tpg->_taskflow._graph._nodes) {
1152 
1153  node->_topology = tpg;
1154  node->_clear_state();
1155 
1156  if(node->num_dependents() == 0) {
1157  tpg->_sources.push_back(node);
1158  }
1159 
1160  node->_set_up_join_counter();
1161  }
1162 
1163  tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1164 }
1165 
1166 // Function: _tear_down_topology
1167 inline void Executor::_tear_down_topology(Topology** tpg) {
1168 
1169  auto &f = (*tpg)->_taskflow;
1170 
1171  //assert(&tpg == &(f._topologies.front()));
1172 
1173  // case 1: we still need to run the topology again
1174  if(! (*tpg)->_pred() ) {
1175  //tpg->_recover_num_sinks();
1176 
1177  assert((*tpg)->_join_counter == 0);
1178  (*tpg)->_join_counter = (*tpg)->_sources.size();
1179 
1180  _schedule((*tpg)->_sources);
1181  }
1182  // case 2: the final run of this topology
1183  else {
1184 
1185  if((*tpg)->_call != nullptr) {
1186  (*tpg)->_call();
1187  }
1188 
1189  f._mtx.lock();
1190 
1191  // If there is another run (interleave between lock)
1192  if(f._topologies.size() > 1) {
1193 
1194  assert((*tpg)->_join_counter == 0);
1195 
1196  // Set the promise
1197  (*tpg)->_promise.set_value();
1198  f._topologies.pop_front();
1199  f._mtx.unlock();
1200 
1201  // decrement the topology but since this is not the last we don't notify
1202  _decrement_topology();
1203 
1204  *tpg = &(f._topologies.front());
1205 
1206  _set_up_topology(*tpg);
1207  _schedule((*tpg)->_sources);
1208 
1209  //f._topologies.front()._bind(f._graph);
1210  //*tpg = &(f._topologies.front());
1211 
1212  //assert(f._topologies.front()._join_counter == 0);
1213 
1214  //f._topologies.front()._join_counter = f._topologies.front()._sources.size();
1215 
1216  //_schedule(f._topologies.front()._sources);
1217  }
1218  else {
1219  assert(f._topologies.size() == 1);
1220 
1221  // Need to back up the promise first here becuz taskflow might be
1222  // destroy before taskflow leaves
1223  auto p {std::move((*tpg)->_promise)};
1224 
1225  f._topologies.pop_front();
1226 
1227  f._mtx.unlock();
1228 
1229  // We set the promise in the end in case taskflow leaves before taskflow
1230  p.set_value();
1231 
1232  _decrement_topology_and_notify();
1233 
1234  // Reset topology so caller can stop execution
1235  *tpg = nullptr;
1236  }
1237  }
1238 }
1239 
1240 // Function: run_until
1241 template <typename P, typename C>
1243 
1244  _increment_topology();
1245 
1246  // Special case of predicate
1247  if(f.empty() || pred()) {
1248  std::promise<void> promise;
1249  promise.set_value();
1250  _decrement_topology_and_notify();
1251  return promise.get_future();
1252  }
1253 
1254  // Multi-threaded execution.
1255  bool run_now {false};
1256  Topology* tpg;
1257  std::future<void> future;
1258 
1259  {
1260  std::lock_guard<std::mutex> lock(f._mtx);
1261 
1262  // create a topology for this run
1263  //tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1264  f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1265  tpg = &(f._topologies.back());
1266  future = tpg->_promise.get_future();
1267 
1268  if(f._topologies.size() == 1) {
1269  run_now = true;
1270  //tpg->_bind(f._graph);
1271  //_schedule(tpg->_sources);
1272  }
1273  }
1274 
1275  // Notice here calling schedule may cause the topology to be removed sonner
1276  // before the function leaves.
1277  if(run_now) {
1278  _set_up_topology(tpg);
1279  _schedule(tpg->_sources);
1280  }
1281 
1282  return future;
1283 }
1284 
1285 // Procedure: _increment_topology
1286 inline void Executor::_increment_topology() {
1287  std::lock_guard<std::mutex> lock(_topology_mutex);
1288  ++_num_topologies;
1289 }
1290 
1291 // Procedure: _decrement_topology_and_notify
1292 inline void Executor::_decrement_topology_and_notify() {
1293  std::lock_guard<std::mutex> lock(_topology_mutex);
1294  if(--_num_topologies == 0) {
1295  _topology_cv.notify_all();
1296  }
1297 }
1298 
1299 // Procedure: _decrement_topology
1300 inline void Executor::_decrement_topology() {
1301  std::lock_guard<std::mutex> lock(_topology_mutex);
1302  --_num_topologies;
1303 }
1304 
1305 // Procedure: wait_for_all
1306 inline void Executor::wait_for_all() {
1307  std::unique_lock<std::mutex> lock(_topology_mutex);
1308  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1309 }
1310 
1311 } // end of namespace tf -----------------------------------------------------
1312 
1313 
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:421
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:1118
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:1141
~Executor()
destructs the executor
Definition: executor.hpp:352
T yield(T... args)
Definition: error.hpp:9
T hardware_concurrency(T... args)
observer designed based on taskflow board format
Definition: observer.hpp:265
Executor(size_t N=std::thread::hardware_concurrency(), size_t M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:284
void remove_observer(std::shared_ptr< Observer > observer)
removes the associated observer
Definition: executor.hpp:696
T lock(T... args)
main entry to create a task dependency graph
Definition: taskflow.hpp:18
bool empty() const
queries the emptiness of the taskflow
Definition: taskflow.hpp:132
T move(T... args)
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:405
std::shared_ptr< Observer > make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:677
T ref(T... args)
void dump(std::ostream &ostream) const
dump the timelines in JSON format to an ostream
Definition: observer.hpp:386
size_t num_observers() const
queries the number of observers
Definition: executor.hpp:707
execution interface for running a taskflow graph
Definition: executor.hpp:43
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:400
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:410
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:1129
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1306