Taskflow  2.7.0
executor.hpp
1 #pragma once
2 
3 #include "tsq.hpp"
4 #include "notifier.hpp"
5 #include "observer.hpp"
6 #include "taskflow.hpp"
7 
8 namespace tf {
9 
10 
11 // ----------------------------------------------------------------------------
12 // Executor Definition
13 // ----------------------------------------------------------------------------
14 
15 
24 class Executor {
25 
26  friend class Subflow;
27  friend class cudaFlow;
28 
29  struct Worker {
30  size_t id;
31  size_t vtm;
32  Domain domain;
33  Executor* executor;
34  Notifier::Waiter* waiter;
35  std::mt19937 rdgen { std::random_device{}() };
36  TaskQueue<Node*> wsq[NUM_DOMAINS];
37  };
38 
39  struct PerThread {
40  Worker* worker {nullptr};
41  };
42 
43 #ifdef TF_ENABLE_CUDA
44  struct cudaDevice {
46  };
47 #endif
48 
49  public:
50 
51 #ifdef TF_ENABLE_CUDA
52 
55  explicit Executor(
57  size_t M = cuda_num_devices()
58  );
59 #else
60 
63  explicit Executor(size_t N = std::thread::hardware_concurrency());
64 #endif
65 
69  ~Executor();
70 
78  std::future<void> run(Taskflow& taskflow);
79 
88  template<typename C>
89  std::future<void> run(Taskflow& taskflow, C&& callable);
90 
99  std::future<void> run_n(Taskflow& taskflow, size_t N);
100 
110  template<typename C>
111  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
112 
122  template<typename P>
123  std::future<void> run_until(Taskflow& taskflow, P&& pred);
124 
135  template<typename P, typename C>
136  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
137 
141  void wait_for_all();
142 
146  size_t num_workers() const;
147 
154  size_t num_topologies() const;
155 
162  size_t num_domains() const;
163 
170  int this_worker_id() const;
171 
183  template <typename F, typename... ArgsT>
184  std::enable_if_t<
187  >
188  async(F&& f, ArgsT&&... args);
189 
201  template <typename F, typename... ArgsT>
202  std::enable_if_t<
205  >
206  async(F&& f, ArgsT&&... args);
207 
220  template <typename Observer, typename... Args>
222 
226  template <typename Observer>
228 
232  size_t num_observers() const;
233 
234  private:
235 
236  const size_t _VICTIM_BEG;
237  const size_t _VICTIM_END;
238  const size_t _MAX_STEALS;
239  const size_t _MAX_YIELDS;
240 
241  std::condition_variable _topology_cv;
242  std::mutex _topology_mutex;
243  std::mutex _wsq_mutex;
244 
245  size_t _num_topologies {0};
246 
247  std::vector<Worker> _workers;
248  std::vector<std::thread> _threads;
249 
250 #ifdef TF_ENABLE_CUDA
251  std::vector<cudaDevice> _cuda_devices;
252 #endif
253 
254  Notifier _notifier[NUM_DOMAINS];
255 
256  TaskQueue<Node*> _wsq[NUM_DOMAINS];
257 
258  size_t _id_offset[NUM_DOMAINS] = {0};
259 
260  std::atomic<size_t> _num_actives[NUM_DOMAINS];
261  std::atomic<size_t> _num_thieves[NUM_DOMAINS];
262  std::atomic<bool> _done {0};
263 
265 
266  TFProfObserver* _tfprof;
267 
268  PerThread& _per_thread() const;
269 
270  bool _wait_for_task(Worker&, Node*&);
271 
272  void _instantiate_tfprof();
273  void _flush_tfprof();
274  void _observer_prologue(Worker&, Node*);
275  void _observer_epilogue(Worker&, Node*);
276  void _spawn(size_t, Domain);
277  void _worker_loop(Worker&);
278  void _exploit_task(Worker&, Node*&);
279  void _explore_task(Worker&, Node*&);
280  void _schedule(Node*);
281  void _schedule(PassiveVector<Node*>&);
282  void _invoke(Worker&, Node*);
283  void _invoke_static_work(Worker&, Node*);
284  void _invoke_dynamic_work(Worker&, Node*);
285  void _invoke_dynamic_work_internal(Worker&, Node*, Graph&, bool);
286  void _invoke_dynamic_work_external(Node*, Graph&, bool);
287  void _invoke_condition_work(Worker&, Node*, int&);
288  void _invoke_module_work(Worker&, Node*);
289  void _invoke_async_work(Worker&, Node*);
290  void _set_up_topology(Topology*);
291  void _tear_down_topology(Topology*);
292  void _increment_topology();
293  void _decrement_topology();
294  void _decrement_topology_and_notify();
295 
296 #ifdef TF_ENABLE_CUDA
297  void _invoke_cudaflow_work(Worker&, Node*);
298 
299  template <typename P>
300  void _invoke_cudaflow_work_internal(Worker&, cudaFlow&, P&&);
301 
302  template <typename P>
303  void _invoke_cudaflow_work_external(cudaFlow&, P&&);
304 #endif
305 };
306 
307 
308 #ifdef TF_ENABLE_CUDA
309 // Constructor
310 inline Executor::Executor(size_t N, size_t M) :
311  _VICTIM_BEG {0},
312  _VICTIM_END {N + M - 1},
313  _MAX_STEALS {(N + M + 1) << 1},
314  _MAX_YIELDS {100},
315  _workers {N + M},
316  _cuda_devices {cuda_num_devices()},
317  _notifier {Notifier(N), Notifier(M)} {
318 
319  if(N == 0) {
320  TF_THROW("no cpu workers to execute taskflows");
321  }
322 
323  if(M == 0) {
324  TF_THROW("no gpu workers to execute cudaflows");
325  }
326 
327  for(int i=0; i<NUM_DOMAINS; ++i) {
328  _num_actives[i].store(0, std::memory_order_relaxed);
329  _num_thieves[i].store(0, std::memory_order_relaxed);
330  }
331 
332  // create a per-worker stream on each cuda device
333  for(size_t i=0; i<_cuda_devices.size(); ++i) {
334  _cuda_devices[i].streams.resize(M);
335  cudaScopedDevice ctx(i);
336  for(size_t m=0; m<M; ++m) {
337  TF_CHECK_CUDA(
338  cudaStreamCreate(&(_cuda_devices[i].streams[m])),
339  "failed to create a cudaStream for worker ", m, " on device ", i
340  );
341  }
342  }
343 
344  _spawn(N, HOST);
345  _spawn(M, CUDA);
346 
347  // initiate the observer if requested
348  _instantiate_tfprof();
349 }
350 
351 #else
352 // Constructor
353 inline Executor::Executor(size_t N) :
354  _VICTIM_BEG {0},
355  _VICTIM_END {N - 1},
356  _MAX_STEALS {(N + 1) << 1},
357  _MAX_YIELDS {100},
358  _workers {N},
359  _notifier {Notifier(N)} {
360 
361  if(N == 0) {
362  TF_THROW("no cpu workers to execute taskflows");
363  }
364 
365  for(int i=0; i<NUM_DOMAINS; ++i) {
366  _num_actives[i].store(0, std::memory_order_relaxed);
367  _num_thieves[i].store(0, std::memory_order_relaxed);
368  }
369 
370  _spawn(N, HOST);
371 
372  // instantite the default observer if requested
373  _instantiate_tfprof();
374 }
375 #endif
376 
377 // Destructor
379 
380  // wait for all topologies to complete
381  wait_for_all();
382 
383  // shut down the scheduler
384  _done = true;
385 
386  for(int i=0; i<NUM_DOMAINS; ++i) {
387  _notifier[i].notify(true);
388  }
389 
390  for(auto& t : _threads){
391  t.join();
392  }
393 
394 #ifdef TF_ENABLE_CUDA
395  // clean up the cuda streams
396  for(size_t i=0; i<_cuda_devices.size(); ++i) {
397  cudaScopedDevice ctx(i);
398  for(size_t m=0; m<_cuda_devices[i].streams.size(); ++m) {
399  cudaStreamDestroy(_cuda_devices[i].streams[m]);
400  }
401  }
402 #endif
403 
404  // flush the default observer
405  _flush_tfprof();
406 }
407 
408 // Procedure: _instantiate_tfprof
409 inline void Executor::_instantiate_tfprof() {
410  // TF_OBSERVER_TYPE
411  _tfprof = get_env("TF_ENABLE_PROFILER").empty() ?
412  nullptr : make_observer<TFProfObserver>().get();
413 }
414 
415 // Procedure: _flush_tfprof
416 inline void Executor::_flush_tfprof() {
417  if(_tfprof) {
418  std::ostringstream fpath;
419  fpath << get_env("TF_ENABLE_PROFILER") << _tfprof->_uuid << ".tfp";
420  std::ofstream ofs(fpath.str());
421  _tfprof->dump(ofs);
422  }
423 }
424 
425 // Function: num_workers
426 inline size_t Executor::num_workers() const {
427  return _workers.size();
428 }
429 
430 // Function: num_domains
431 inline size_t Executor::num_domains() const {
432  return NUM_DOMAINS;
433 }
434 
435 // Function: num_topologies
436 inline size_t Executor::num_topologies() const {
437  return _num_topologies;
438 }
439 
440 // Function: async
441 template <typename F, typename... ArgsT>
442 std::enable_if_t<
445 >
446 Executor::async(F&& f, ArgsT&&... args) {
447 
448  _increment_topology();
449 
450  using R = typename function_traits<F>::return_type;
451 
452  std::promise<R> p;
453 
454  auto fu = p.get_future();
455 
456  auto node = Graph::_node_pool().animate(
457  nstd::in_place_type_t<Node::AsyncWork>{},
458  [p=make_moc(std::move(p)), f=std::forward<F>(f), args...] () {
459  p.object.set_value(f(args...));
460  }
461  );
462 
463  _schedule(node);
464 
465  return fu;
466 }
467 
468 // Function: async
469 template <typename F, typename... ArgsT>
470 std::enable_if_t<
473 >
474 Executor::async(F&& f, ArgsT&&... args) {
475 
476  _increment_topology();
477 
479 
480  auto fu = p.get_future();
481 
482  auto node = Graph::_node_pool().animate(
483  nstd::in_place_type_t<Node::AsyncWork>{},
484  [p=make_moc(std::move(p)), f=std::forward<F>(f), args...] () {
485  f(args...);
486  p.object.set_value();
487  }
488  );
489 
490  _schedule(node);
491 
492  return fu;
493 }
494 
495 // Function: _per_thread
496 inline Executor::PerThread& Executor::_per_thread() const {
497  thread_local PerThread pt;
498  return pt;
499 }
500 
501 // Function: this_worker_id
502 inline int Executor::this_worker_id() const {
503  auto worker = _per_thread().worker;
504  return worker ? static_cast<int>(worker->id) : -1;
505 }
506 
507 // Procedure: _spawn
508 inline void Executor::_spawn(size_t N, Domain d) {
509 
510  auto id = _threads.size();
511 
512  _id_offset[d] = id;
513 
514  for(size_t i=0; i<N; ++i, ++id) {
515 
516  _workers[id].id = id;
517  _workers[id].vtm = id;
518  _workers[id].domain = d;
519  _workers[id].executor = this;
520  _workers[id].waiter = &_notifier[d]._waiters[i];
521 
522  _threads.emplace_back([this] (Worker& w) -> void {
523 
524  PerThread& pt = _per_thread();
525  pt.worker = &w;
526 
527  Node* t = nullptr;
528 
529  // must use 1 as condition instead of !done
530  while(1) {
531 
532  // execute the tasks.
533  _exploit_task(w, t);
534 
535  // wait for tasks
536  if(_wait_for_task(w, t) == false) {
537  break;
538  }
539  }
540 
541  }, std::ref(_workers[id]));
542  }
543 
544 }
545 
546 // Function: _explore_task
547 inline void Executor::_explore_task(Worker& w, Node*& t) {
548 
549  //assert(_workers[w].wsq.empty());
550  assert(!t);
551 
552  const auto d = w.domain;
553 
554  size_t num_steals = 0;
555  size_t num_yields = 0;
556 
557  std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END);
558 
559  //while(!_done) {
560  //
561  // size_t vtm = rdvtm(w.rdgen);
562  //
563  // t = (vtm == w.id) ? _wsq[d].steal() : _workers[vtm].wsq[d].steal();
564 
565  // if(t) {
566  // break;
567  // }
568 
569  // if(num_steal++ > _MAX_STEALS) {
570  // std::this_thread::yield();
571  // if(num_yields++ > _MAX_YIELDS) {
572  // break;
573  // }
574  // }
575  //}
576 
577  do {
578  t = (w.id == w.vtm) ? _wsq[d].steal() : _workers[w.vtm].wsq[d].steal();
579 
580  if(t) {
581  break;
582  }
583 
584  if(num_steals++ > _MAX_STEALS) {
586  if(num_yields++ > _MAX_YIELDS) {
587  break;
588  }
589  }
590 
591  w.vtm = rdvtm(w.rdgen);
592  } while(!_done);
593 
594 }
595 
596 // Procedure: _exploit_task
597 inline void Executor::_exploit_task(Worker& w, Node*& t) {
598 
599  if(t) {
600 
601  const auto d = w.domain;
602 
603  if(_num_actives[d].fetch_add(1) == 0 && _num_thieves[d] == 0) {
604  _notifier[d].notify(false);
605  }
606 
607  while(t) {
608  _invoke(w, t);
609 
610  //if(t->_parent == nullptr) {
611  // if(t->_topology->_join_counter.fetch_sub(1) == 1) {
612  // _tear_down_topology(t->_topology);
613  // }
614  //}
615  //else { // joined subflow
616  // t->_parent->_join_counter.fetch_sub(1);
617  //}
618 
619  t = w.wsq[d].pop();
620  }
621 
622  --_num_actives[d];
623  }
624 }
625 
626 // Function: _wait_for_task
627 inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {
628 
629  const auto d = worker.domain;
630 
631  wait_for_task:
632 
633  assert(!t);
634 
635  ++_num_thieves[d];
636 
637  explore_task:
638 
639  _explore_task(worker, t);
640 
641  if(t) {
642  if(_num_thieves[d].fetch_sub(1) == 1) {
643  _notifier[d].notify(false);
644  }
645  return true;
646  }
647 
648  _notifier[d].prepare_wait(worker.waiter);
649 
650  //if(auto vtm = _find_vtm(me); vtm != _workers.size()) {
651  if(!_wsq[d].empty()) {
652 
653  _notifier[d].cancel_wait(worker.waiter);
654  //t = (vtm == me) ? _wsq.steal() : _workers[vtm].wsq.steal();
655 
656  t = _wsq[d].steal(); // must steal here
657  if(t) {
658  if(_num_thieves[d].fetch_sub(1) == 1) {
659  _notifier[d].notify(false);
660  }
661  return true;
662  }
663  else {
664  worker.vtm = worker.id;
665  goto explore_task;
666  }
667  }
668 
669  if(_done) {
670  _notifier[d].cancel_wait(worker.waiter);
671  for(int i=0; i<NUM_DOMAINS; ++i) {
672  _notifier[i].notify(true);
673  }
674  --_num_thieves[d];
675  return false;
676  }
677 
678  if(_num_thieves[d].fetch_sub(1) == 1) {
679  if(_num_actives[d]) {
680  _notifier[d].cancel_wait(worker.waiter);
681  goto wait_for_task;
682  }
683  // check all domain queue again
684  for(auto& w : _workers) {
685  if(!w.wsq[d].empty()) {
686  worker.vtm = w.id;
687  _notifier[d].cancel_wait(worker.waiter);
688  goto wait_for_task;
689  }
690  }
691  }
692 
693  // Now I really need to relinguish my self to others
694  _notifier[d].commit_wait(worker.waiter);
695 
696  return true;
697 }
698 
699 // Function: make_observer
700 template<typename Observer, typename... Args>
702 
703  static_assert(
705  "Observer must be derived from ObserverInterface"
706  );
707 
708  // use a local variable to mimic the constructor
709  auto ptr = std::make_shared<Observer>(std::forward<Args>(args)...);
710 
711  ptr->set_up(_workers.size());
712 
713  _observers.emplace(std::static_pointer_cast<ObserverInterface>(ptr));
714 
715  return ptr;
716 }
717 
718 // Procedure: remove_observer
719 template <typename Observer>
721 
722  static_assert(
724  "Observer must be derived from ObserverInterface"
725  );
726 
727  _observers.erase(std::static_pointer_cast<ObserverInterface>(ptr));
728 }
729 
730 // Function: num_observers
731 inline size_t Executor::num_observers() const {
732  return _observers.size();
733 }
734 
735 // Procedure: _schedule
736 // The main procedure to schedule a give task node.
737 // Each task node has two types of tasks - regular and subflow.
738 inline void Executor::_schedule(Node* node) {
739 
740  //assert(_workers.size() != 0);
741 
742  const auto d = node->domain();
743 
744  // caller is a worker to this pool
745  auto worker = _per_thread().worker;
746 
747  if(worker != nullptr && worker->executor == this) {
748  worker->wsq[d].push(node);
749  if(worker->domain != d) {
750  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
751  _notifier[d].notify(false);
752  }
753  }
754  return;
755  }
756 
757  // other threads
758  {
759  std::lock_guard<std::mutex> lock(_wsq_mutex);
760  _wsq[d].push(node);
761  }
762 
763  _notifier[d].notify(false);
764 }
765 
766 // Procedure: _schedule
767 // The main procedure to schedule a set of task nodes.
768 // Each task node has two types of tasks - regular and subflow.
769 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
770 
771  //assert(_workers.size() != 0);
772 
773  // We need to cacth the node count to avoid accessing the nodes
774  // vector while the parent topology is removed!
775  const auto num_nodes = nodes.size();
776 
777  if(num_nodes == 0) {
778  return;
779  }
780 
781  // worker thread
782  auto worker = _per_thread().worker;
783 
784  // task counts
785  size_t tcount[NUM_DOMAINS] = {0};
786 
787  if(worker != nullptr && worker->executor == this) {
788  for(size_t i=0; i<num_nodes; ++i) {
789  const auto d = nodes[i]->domain();
790  worker->wsq[d].push(nodes[i]);
791  tcount[d]++;
792  }
793 
794  for(int d=0; d<NUM_DOMAINS; ++d) {
795  if(tcount[d] && d != worker->domain) {
796  if(_num_actives[d] == 0 && _num_thieves[d] == 0) {
797  _notifier[d].notify_n(tcount[d]);
798  }
799  }
800  }
801 
802  return;
803  }
804 
805  // other threads
806  {
807  std::lock_guard<std::mutex> lock(_wsq_mutex);
808  for(size_t k=0; k<num_nodes; ++k) {
809  const auto d = nodes[k]->domain();
810  _wsq[d].push(nodes[k]);
811  tcount[d]++;
812  }
813  }
814 
815  for(int d=0; d<NUM_DOMAINS; ++d) {
816  _notifier[d].notify_n(tcount[d]);
817  }
818 }
819 
820 
821 // Procedure: _invoke
822 inline void Executor::_invoke(Worker& worker, Node* node) {
823 
824  //assert(_workers.size() != 0);
825 
826  // Here we need to fetch the num_successors first to avoid the invalid memory
827  // access caused by topology clear.
828  const auto num_successors = node->num_successors();
829 
830  // task type
831  auto type = node->_handle.index();
832 
833  // condition task
834  int cond = -1;
835 
836  // switch is faster than nested if-else due to jump table
837  switch(type) {
838  // static task
839  case Node::STATIC_WORK:{
840  _invoke_static_work(worker, node);
841  }
842  break;
843 
844  // dynamic task
845  case Node::DYNAMIC_WORK: {
846  _invoke_dynamic_work(worker, node);
847  }
848  break;
849 
850  // condition task
851  case Node::CONDITION_WORK: {
852  _invoke_condition_work(worker, node, cond);
853  }
854  break;
855  //} // no need to add a break here due to the immediate return
856 
857  // module task
858  case Node::MODULE_WORK: {
859  _invoke_module_work(worker, node);
860  }
861  break;
862 
863  // async task
864  case Node::ASYNC_WORK: {
865  _invoke_async_work(worker, node);
866  _decrement_topology_and_notify();
867  return ;
868  }
869  break;
870 
871  // cudaflow task
872 #ifdef TF_ENABLE_CUDA
873  case Node::CUDAFLOW_WORK: {
874  _invoke_cudaflow_work(worker, node);
875  }
876  break;
877 #endif
878 
879  // monostate
880  default:
881  break;
882  }
883 
884  // We MUST recover the dependency since the graph may have cycles.
885  // This must be done before scheduling the successors, otherwise this might cause
886  // race condition on the _dependents
887  if(node->_has_state(Node::BRANCHED)) {
888  node->_join_counter = node->num_strong_dependents();
889  }
890  else {
891  node->_join_counter = node->num_dependents();
892  }
893 
894  // acquire the parent flow counter
895  auto& c = (node->_parent) ? node->_parent->_join_counter :
896  node->_topology->_join_counter;
897 
898  // At this point, the node storage might be destructed (to be verified)
899  // case 1: non-condition task
900  if(type != Node::CONDITION_WORK) {
901  for(size_t i=0; i<num_successors; ++i) {
902  if(--(node->_successors[i]->_join_counter) == 0) {
903  c.fetch_add(1);
904  _schedule(node->_successors[i]);
905  }
906  }
907  }
908  // case 2: condition task
909  else {
910  if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
911  auto s = node->_successors[cond];
912  s->_join_counter.store(0); // seems redundant but just for invariant
913  c.fetch_add(1);
914  _schedule(s);
915  }
916  }
917 
918  // tear down topology if the node is the last one
919  if(node->_parent == nullptr) {
920  if(node->_topology->_join_counter.fetch_sub(1) == 1) {
921  _tear_down_topology(node->_topology);
922  }
923  }
924  else { // joined subflow
925  node->_parent->_join_counter.fetch_sub(1);
926  }
927 }
928 
929 // Procedure: _observer_prologue
930 inline void Executor::_observer_prologue(Worker& worker, Node* node) {
931  for(auto& observer : _observers) {
932  observer->on_entry(worker.id, TaskView(node));
933  }
934 }
935 
936 // Procedure: _observer_epilogue
937 inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
938  for(auto& observer : _observers) {
939  observer->on_exit(worker.id, TaskView(node));
940  }
941 }
942 
943 // Procedure: _invoke_static_work
944 inline void Executor::_invoke_static_work(Worker& worker, Node* node) {
945  _observer_prologue(worker, node);
946  nstd::get<Node::StaticWork>(node->_handle).work();
947  _observer_epilogue(worker, node);
948 }
949 
950 // Procedure: _invoke_dynamic_work
951 inline void Executor::_invoke_dynamic_work(Worker& w, Node* node) {
952 
953  _observer_prologue(w, node);
954 
955  auto& handle = nstd::get<Node::DynamicWork>(node->_handle);
956 
957  handle.subgraph.clear();
958 
959  Subflow sf(*this, node, handle.subgraph);
960 
961  handle.work(sf);
962 
963  if(sf._joinable) {
964  _invoke_dynamic_work_internal(w, node, handle.subgraph, false);
965  }
966 
967  // TODO
968  _observer_epilogue(w, node);
969 }
970 
971 // Procedure: _invoke_dynamic_work_external
972 inline void Executor::_invoke_dynamic_work_external(Node*p, Graph& g, bool detach) {
973 
974  auto worker = _per_thread().worker;
975 
976  assert(worker && worker->executor == this);
977 
978  _invoke_dynamic_work_internal(*worker, p, g, detach);
979 }
980 
981 // Procedure: _invoke_dynamic_work_internal
982 inline void Executor::_invoke_dynamic_work_internal(
983  Worker& w, Node* p, Graph& g, bool detach
984 ) {
985 
986  assert(p);
987 
988  if(g.empty()) return;
989 
990  PassiveVector<Node*> src;
991 
992  for(auto n : g._nodes) {
993 
994  n->_topology = p->_topology;
995  n->_set_up_join_counter();
996 
997  if(detach) {
998  n->_parent = nullptr;
999  n->_set_state(Node::DETACHED);
1000  }
1001  else {
1002  n->_parent = p;
1003  }
1004 
1005  if(n->num_dependents() == 0) {
1006  src.push_back(n);
1007  }
1008  }
1009 
1010  // detach here
1011  if(detach) {
1012 
1013  {
1014  std::lock_guard<std::mutex> lock(p->_topology->_taskflow._mtx);
1015  p->_topology->_taskflow._graph.merge(std::move(g));
1016  }
1017 
1018  p->_topology->_join_counter.fetch_add(src.size());
1019  _schedule(src);
1020  }
1021  // join here
1022  else {
1023  p->_join_counter.fetch_add(src.size());
1024  _schedule(src);
1025  Node* t = nullptr;
1026 
1027  std::uniform_int_distribution<size_t> rdvtm(_VICTIM_BEG, _VICTIM_END);
1028 
1029  while(p->_join_counter != 0) {
1030 
1031  t = w.wsq[w.domain].pop();
1032 
1033  exploit:
1034 
1035  if(t) {
1036  _invoke(w, t);
1037  //if(t->_parent == nullptr) {
1038  // if(t->_topology->_join_counter.fetch_sub(1) == 1) {
1039  // _tear_down_topology(t->_topology);
1040  // }
1041  //}
1042  //else { // joined subflow
1043  // t->_parent->_join_counter.fetch_sub(1);
1044  //}
1045  }
1046  else {
1047 
1048  explore:
1049  t = (w.id == w.vtm) ? _wsq[w.domain].steal() :
1050  _workers[w.vtm].wsq[w.domain].steal();
1051  if(t) {
1052  goto exploit;
1053  }
1054  else if(p->_join_counter != 0){
1056  w.vtm = rdvtm(w.rdgen);
1057  goto explore;
1058  }
1059  else {
1060  break;
1061  }
1062  }
1063  }
1064  }
1065 }
1066 
1067 // Procedure: _invoke_condition_work
1068 inline void Executor::_invoke_condition_work(Worker& worker, Node* node, int& cond) {
1069 
1070  _observer_prologue(worker, node);
1071 
1072  cond = nstd::get<Node::ConditionWork>(node->_handle).work();
1073 
1074  _observer_epilogue(worker, node);
1075 }
1076 
1077 #ifdef TF_ENABLE_CUDA
1078 // Procedure: _invoke_cudaflow_work
1079 inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
1080 
1081  _observer_prologue(worker, node);
1082 
1083  assert(worker.domain == node->domain());
1084 
1085  // create a cudaflow
1086  auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);
1087 
1088  h.graph.clear();
1089 
1090  cudaFlow cf(*this, h.graph);
1091 
1092  h.work(cf);
1093 
1094  // join the cudaflow
1095  if(cf._joinable) {
1096  _invoke_cudaflow_work_internal(
1097  worker, cf, [repeat=1] () mutable { return repeat-- == 0; }
1098  );
1099  cf._joinable = false;
1100  }
1101 
1102  _observer_epilogue(worker, node);
1103 }
1104 
1105 // Procedure: _invoke_cudaflow_work_internal
1106 template <typename P>
1107 void Executor::_invoke_cudaflow_work_internal(
1108  Worker& w, cudaFlow& cf, P&& predicate
1109 ) {
1110 
1111  if(cf.empty()) {
1112  return;
1113  }
1114 
1115  // by default, we stick with device 0
1116  auto d = (cf._device == -1) ? 0 : cf._device;
1117 
1118  cudaScopedDevice ctx(d);
1119 
1120  auto s = _cuda_devices[d].streams[w.id - _id_offset[w.domain]];
1121 
1122  // transforms cudaFlow to a native cudaGraph under the specified device
1123  // and launches the graph through a given or an internal device stream
1124  // TODO: need to leverage cudaGraphExecUpdate for changes between
1125  // successive offload calls; right now, we assume the graph
1126  // is not changed (only update parameter is allowed)
1127  cf._graph._create_native_graph();
1128 
1129  while(!predicate()) {
1130 
1131  TF_CHECK_CUDA(
1132  cudaGraphLaunch(cf._graph._native_handle.image, s),
1133  "failed to launch cudaFlow on device ", d
1134  );
1135 
1136  TF_CHECK_CUDA(
1137  cudaStreamSynchronize(s),
1138  "failed to synchronize cudaFlow on device ", d
1139  );
1140  }
1141 
1142  cf._graph._destroy_native_graph();
1143 }
1144 
1145 // Procedure: _invoke_cudaflow_work_external
1146 template <typename P>
1147 void Executor::_invoke_cudaflow_work_external(cudaFlow& cf, P&& predicate) {
1148 
1149  auto w = _per_thread().worker;
1150 
1151  assert(w && w->executor == this);
1152 
1153  _invoke_cudaflow_work_internal(*w, cf, std::forward<P>(predicate));
1154 }
1155 #endif
1156 
1157 // Procedure: _invoke_module_work
1158 inline void Executor::_invoke_module_work(Worker& w, Node* node) {
1159 
1160  _observer_prologue(w, node);
1161 
1162  auto module = nstd::get<Node::ModuleWork>(node->_handle).module;
1163 
1164  _invoke_dynamic_work_internal(w, node, module->_graph, false);
1165 
1166  _observer_epilogue(w, node);
1167 }
1168 
1169 // Procedure: _invoke_async_work
1170 inline void Executor::_invoke_async_work(Worker& w, Node* node) {
1171  _observer_prologue(w, node);
1172 
1173  nstd::get<Node::AsyncWork>(node->_handle).work();
1174 
1175  _observer_epilogue(w, node);
1176 
1177  // recycle the node
1178  Graph::_node_pool().recycle(node);
1179 }
1180 
1181 // Function: run
1183  return run_n(f, 1, [](){});
1184 }
1185 
1186 // Function: run
1187 template <typename C>
1189  return run_n(f, 1, std::forward<C>(c));
1190 }
1191 
1192 // Function: run_n
1193 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
1194  return run_n(f, repeat, [](){});
1195 }
1196 
1197 // Function: run_n
1198 template <typename C>
1199 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
1200  return run_until(
1201  f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
1202  );
1203 }
1204 
1205 // Function: run_until
1206 template<typename P>
1208  return run_until(f, std::forward<P>(pred), [](){});
1209 }
1210 
1211 // Function: _set_up_topology
1212 inline void Executor::_set_up_topology(Topology* tpg) {
1213 
1214  tpg->_sources.clear();
1215  tpg->_taskflow._graph.clear_detached();
1216 
1217  // scan each node in the graph and build up the links
1218  for(auto node : tpg->_taskflow._graph._nodes) {
1219 
1220  node->_topology = tpg;
1221  node->_clear_state();
1222 
1223  if(node->num_dependents() == 0) {
1224  tpg->_sources.push_back(node);
1225  }
1226 
1227  node->_set_up_join_counter();
1228  }
1229 
1230  tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);
1231 }
1232 
1233 // Function: _tear_down_topology
1234 inline void Executor::_tear_down_topology(Topology* tpg) {
1235 
1236  auto &f = tpg->_taskflow;
1237 
1238  //assert(&tpg == &(f._topologies.front()));
1239 
1240  // case 1: we still need to run the topology again
1241  if(! tpg->_pred() ) {
1242  //tpg->_recover_num_sinks();
1243 
1244  assert(tpg->_join_counter == 0);
1245  tpg->_join_counter = tpg->_sources.size();
1246 
1247  _schedule(tpg->_sources);
1248  }
1249  // case 2: the final run of this topology
1250  else {
1251 
1252  if(tpg->_call != nullptr) {
1253  tpg->_call();
1254  }
1255 
1256  f._mtx.lock();
1257 
1258  // If there is another run (interleave between lock)
1259  if(f._topologies.size() > 1) {
1260 
1261  assert(tpg->_join_counter == 0);
1262 
1263  // Set the promise
1264  tpg->_promise.set_value();
1265  f._topologies.pop_front();
1266  f._mtx.unlock();
1267 
1268  // decrement the topology but since this is not the last we don't notify
1269  _decrement_topology();
1270 
1271  tpg = &(f._topologies.front());
1272 
1273  _set_up_topology(tpg);
1274  _schedule(tpg->_sources);
1275 
1276  //f._topologies.front()._bind(f._graph);
1277  //*tpg = &(f._topologies.front());
1278 
1279  //assert(f._topologies.front()._join_counter == 0);
1280 
1281  //f._topologies.front()._join_counter = f._topologies.front()._sources.size();
1282 
1283  //_schedule(f._topologies.front()._sources);
1284  }
1285  else {
1286  assert(f._topologies.size() == 1);
1287 
1288  // Need to back up the promise first here becuz taskflow might be
1289  // destroy before taskflow leaves
1290  auto p {std::move(tpg->_promise)};
1291 
1292  // Back up lambda capture in case it has the topology pointer, to avoid it releasing on
1293  // pop_front ahead of _mtx.unlock & _promise.set_value. Released safely when leaving scope.
1294  auto bc{ std::move( tpg->_call ) };
1295 
1296  f._topologies.pop_front();
1297 
1298  f._mtx.unlock();
1299 
1300  // We set the promise in the end in case taskflow leaves before taskflow
1301  p.set_value();
1302 
1303  _decrement_topology_and_notify();
1304  }
1305  }
1306 }
1307 
1308 // Function: run_until
1309 template <typename P, typename C>
1311 
1312  _increment_topology();
1313 
1314  // Special case of predicate
1315  if(f.empty() || pred()) {
1316  std::promise<void> promise;
1317  promise.set_value();
1318  _decrement_topology_and_notify();
1319  return promise.get_future();
1320  }
1321 
1322  // Multi-threaded execution.
1323  bool run_now {false};
1324  Topology* tpg;
1325  std::future<void> future;
1326 
1327  {
1328  std::lock_guard<std::mutex> lock(f._mtx);
1329 
1330  // create a topology for this run
1331  //tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1332  f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c));
1333  tpg = &(f._topologies.back());
1334  future = tpg->_promise.get_future();
1335 
1336  if(f._topologies.size() == 1) {
1337  run_now = true;
1338  //tpg->_bind(f._graph);
1339  //_schedule(tpg->_sources);
1340  }
1341  }
1342 
1343  // Notice here calling schedule may cause the topology to be removed sonner
1344  // before the function leaves.
1345  if(run_now) {
1346  _set_up_topology(tpg);
1347  _schedule(tpg->_sources);
1348  }
1349 
1350  return future;
1351 }
1352 
1353 // Procedure: _increment_topology
1354 inline void Executor::_increment_topology() {
1355  std::lock_guard<std::mutex> lock(_topology_mutex);
1356  ++_num_topologies;
1357 }
1358 
1359 // Procedure: _decrement_topology_and_notify
1360 inline void Executor::_decrement_topology_and_notify() {
1361  std::lock_guard<std::mutex> lock(_topology_mutex);
1362  if(--_num_topologies == 0) {
1363  _topology_cv.notify_all();
1364  }
1365 }
1366 
1367 // Procedure: _decrement_topology
1368 inline void Executor::_decrement_topology() {
1369  std::lock_guard<std::mutex> lock(_topology_mutex);
1370  --_num_topologies;
1371 }
1372 
1373 // Procedure: wait_for_all
1374 inline void Executor::wait_for_all() {
1375  std::unique_lock<std::mutex> lock(_topology_mutex);
1376  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1377 }
1378 
1379 // ----------------------------------------------------------------------------
1380 // Subflow Definition
1381 // ----------------------------------------------------------------------------
1382 
1383 inline void Subflow::join() {
1384 
1385  if(!_joinable) {
1386  TF_THROW("subflow not joinable");
1387  }
1388 
1389  _executor._invoke_dynamic_work_external(_parent, _graph, false);
1390  _joinable = false;
1391 }
1392 
1393 inline void Subflow::detach() {
1394 
1395  if(!_joinable) {
1396  TF_THROW("subflow already joined or detached");
1397  }
1398 
1399  _executor._invoke_dynamic_work_external(_parent, _graph, true);
1400  _joinable = false;
1401 }
1402 
1403 // ----------------------------------------------------------------------------
1404 // cudaFlow
1405 // ----------------------------------------------------------------------------
1406 
1407 #ifdef TF_ENABLE_CUDA
1408 
1409 // Procedure: offload_until
1410 template <typename P>
1411 void cudaFlow::offload_until(P&& predicate) {
1412 
1413  if(!_joinable) {
1414  TF_THROW("cudaFlow already joined");
1415  }
1416 
1417  _executor._invoke_cudaflow_work_external(*this, std::forward<P>(predicate));
1418 }
1419 
1420 // Procedure: offload_n
1421 inline void cudaFlow::offload_n(size_t n) {
1422  offload_until([repeat=n] () mutable { return repeat-- == 0; });
1423 }
1424 
1425 // Procedure: offload
1426 inline void cudaFlow::offload() {
1427  offload_until([repeat=1] () mutable { return repeat-- == 0; });
1428 }
1429 
1430 // Procedure: join_until
1431 template <typename P>
1432 void cudaFlow::join_until(P&& predicate) {
1433 
1434  if(!_joinable) {
1435  TF_THROW("cudaFlow already joined");
1436  }
1437 
1438  _executor._invoke_cudaflow_work_external(*this, std::forward<P>(predicate));
1439  _joinable = false;
1440 }
1441 
1442 // Procedure: join_n
1443 inline void cudaFlow::join_n(size_t n) {
1444  join_until([repeat=n] () mutable { return repeat-- == 0; });
1445 }
1446 
1447 // Procedure: join
1448 inline void cudaFlow::join() {
1449  join_until([repeat=1] () mutable { return repeat-- == 0; });
1450 }
1451 
1452 
1453 #endif
1454 
1455 } // end of namespace tf -----------------------------------------------------
1456 
1457 
1458 
1459 
1460 
1461 
1462 
1463 
1464 
1465 
1466 
int this_worker_id() const
queries the id of the caller thread in this executor
Definition: executor.hpp:502
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:1182
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:1207
~Executor()
destructs the executor
Definition: executor.hpp:378
T yield(T... args)
Definition: error.hpp:9
Graph & _graph
associated graph object
Definition: flow_builder.hpp:584
T hardware_concurrency(T... args)
methods for building a CUDA task dependency graph.
Definition: cuda_flow.hpp:26
observer designed based on taskflow board format
Definition: observer.hpp:262
void join_until(P &&predicate)
offloads the cudaFlow with the given stop predicate and then joins the execution
Definition: executor.hpp:1432
void detach()
enables the subflow to detach from its parent task
Definition: executor.hpp:1393
Executor(size_t N=std::thread::hardware_concurrency(), size_t M=cuda_num_devices())
constructs the executor with N/M cpu/gpu worker threads
Definition: executor.hpp:310
void remove_observer(std::shared_ptr< Observer > observer)
removes the associated observer
Definition: executor.hpp:720
T lock(T... args)
main entry to create a task dependency graph
Definition: core/taskflow.hpp:18
std::enable_if_t< !std::is_same< typename function_traits< F >::return_type, void >::value, std::future< typename function_traits< F >::return_type > > async(F &&f, ArgsT &&... args)
runs a given function asynchronously
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:132
T move(T... args)
void join()
offloads the cudaFlow once and then joins the execution
Definition: executor.hpp:1448
size_t num_domains() const
queries the number of worker domains
Definition: executor.hpp:431
std::shared_ptr< Observer > make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:701
T ref(T... args)
void dump(std::ostream &ostream) const
dump the timelines in JSON format to an ostream
Definition: observer.hpp:395
size_t num_observers() const
queries the number of observers
Definition: executor.hpp:731
execution interface for running a taskflow graph
Definition: executor.hpp:24
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:426
size_t num_topologies() const
queries the number of running topologies at the time of this call
Definition: executor.hpp:436
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:698
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:1193
void join_n(size_t N)
offloads the cudaFlow by the given times and then joins the execution
Definition: executor.hpp:1443
void join()
enables the subflow to join its parent task
Definition: executor.hpp:1383
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1374