Taskflow  2.4-master-branch
flow_builder.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
13 class FlowBuilder {
14 
15  friend class Task;
16 
17  public:
18 
24  FlowBuilder(Graph& graph);
25 
35  template <typename C>
36  std::enable_if_t<is_static_task_v<C>, Task> emplace(C&& callable);
37 
47  template <typename C>
48  std::enable_if_t<is_dynamic_task_v<C>, Task> emplace(C&& callable);
49 
59  template <typename C>
60  std::enable_if_t<is_condition_task_v<C>, Task> emplace(C&& callable);
61 
62 #ifdef TF_ENABLE_CUDA
63 
72  template <typename C>
73  std::enable_if_t<is_cudaflow_task_v<C>, Task> emplace(C&& callable);
74 #endif
75 
85  template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>* = nullptr>
86  auto emplace(C&&... callables);
87 
94  Task composed_of(Taskflow& taskflow);
95 
117  template <typename I, typename C>
118  std::pair<Task, Task> parallel_for(I beg, I end, C&& callable, size_t chunk=1);
119 
137  template <
138  typename I,
139  typename C,
140  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>* = nullptr
141  >
143  I beg, I end, I step, C&& callable, size_t chunk = 1
144  );
145 
163  template <
164  typename I,
165  typename C,
166  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>* = nullptr
167  >
169  I beg, I end, I step, C&& callable, size_t chunk = 1
170  );
171 
189  template <typename I, typename T, typename B>
190  std::pair<Task, Task> reduce(I beg, I end, T& result, B&& bop);
191 
207  template <typename I, typename T>
208  std::pair<Task, Task> reduce_min(I beg, I end, T& result);
209 
225  template <typename I, typename T>
226  std::pair<Task, Task> reduce_max(I beg, I end, T& result);
227 
249  template <typename I, typename T, typename B, typename U>
250  std::pair<Task, Task> transform_reduce(I beg, I end, T& result, B&& bop, U&& uop);
251 
276  template <typename I, typename T, typename B, typename P, typename U>
278  I beg, I end, T& result, B&& bop1, P&& bop2, U&& uop
279  );
280 
286  Task placeholder();
287 
294  void precede(Task A, Task B);
295 
301  void linearize(std::vector<Task>& tasks);
302 
309 
316  void broadcast(Task A, std::vector<Task>& others);
317 
325 
332  void succeed(std::vector<Task>& others, Task A);
333 
340  void succeed(std::initializer_list<Task> others, Task A);
341 
342  private:
343 
344  Graph& _graph;
345 
346  template <typename L>
347  void _linearize(L&);
348 };
349 
350 // Constructor
351 inline FlowBuilder::FlowBuilder(Graph& graph) :
352  _graph {graph} {
353 }
354 
355 // Function: emplace
356 template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
357 auto FlowBuilder::emplace(C&&... cs) {
358  return std::make_tuple(emplace(std::forward<C>(cs))...);
359 }
360 
361 // Function: emplace
362 // emplaces a static task
363 template <typename C>
364 std::enable_if_t<is_static_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
365  auto n = _graph.emplace_back(
366  nstd::in_place_type_t<Node::StaticWork>{}, std::forward<C>(c)
367  );
368  return Task(n);
369 }
370 
371 // Function: emplace
372 // emplaces a dynamic task
373 template <typename C>
374 std::enable_if_t<is_dynamic_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
375  auto n = _graph.emplace_back(
376  nstd::in_place_type_t<Node::DynamicWork>{}, std::forward<C>(c)
377  );
378  return Task(n);
379 }
380 
381 // Function: emplace
382 // emplaces a condition task
383 template <typename C>
384 std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
385  auto n = _graph.emplace_back(
386  nstd::in_place_type_t<Node::ConditionWork>{}, std::forward<C>(c)
387  );
388  return Task(n);
389 }
390 
391 #ifdef TF_ENABLE_CUDA
392 // Function: emplace
393 // emplaces a cudaflow task
394 template <typename C>
395 std::enable_if_t<is_cudaflow_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
396  auto n = _graph.emplace_back(
397  nstd::in_place_type_t<Node::cudaFlowWork>{}, std::forward<C>(c)
398  );
399  return Task(n);
400 }
401 #endif
402 
403 // Function: composed_of
405  auto node = _graph.emplace_back(
406  nstd::in_place_type_t<Node::ModuleWork>{}, &taskflow
407  );
408  return Task(node);
409 }
410 
411 // Procedure: precede
412 inline void FlowBuilder::precede(Task from, Task to) {
413  from._node->_precede(to._node);
414 }
415 
416 // Procedure: broadcast
418  for(auto to : tos) {
419  from.precede(to);
420  }
421 }
422 
423 // Procedure: broadcast
425  for(auto to : tos) {
426  from.precede(to);
427  }
428 }
429 
430 // Function: succeed
431 inline void FlowBuilder::succeed(std::vector<Task>& froms, Task to) {
432  for(auto from : froms) {
433  to.succeed(from);
434  }
435 }
436 
437 // Function: succeed
439  for(auto from : froms) {
440  to.succeed(from);
441  }
442 }
443 
444 // Function: placeholder
446  auto node = _graph.emplace_back();
447  return Task(node);
448 }
449 
450 // Function: parallel_for
451 template <typename I, typename C>
453  I beg, I end, C&& c, size_t chunk
454 ){
455 
456  //using category = typename std::iterator_traits<I>::iterator_category;
457 
458  auto S = placeholder();
459  auto T = placeholder();
460 
461  // default partition equals to the worker count
462  if(chunk == 0) {
463  chunk = 1;
464  }
465 
466  size_t remain = std::distance(beg, end);
467 
468  while(beg != end) {
469 
470  auto e = beg;
471 
472  auto x = std::min(remain, chunk);
473  std::advance(e, x);
474  remain -= x;
475 
476  // Create a task
477  auto task = emplace([beg, e, c] () mutable {
478  std::for_each(beg, e, c);
479  });
480 
481  S.precede(task);
482  task.precede(T);
483 
484  // adjust the pointer
485  beg = e;
486  }
487 
488  // special case
489  if(S.num_successors() == 0) {
490  S.precede(T);
491  }
492 
493  return std::make_pair(S, T);
494 }
495 
496 // Function: parallel_for
497 template <
498  typename I,
499  typename C,
500  std::enable_if_t<std::is_integral<std::decay_t<I>>::value, void>*
501 >
502 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
503 
504  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
505  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
506  }
507 
508  // source and target
509  auto source = placeholder();
510  auto target = placeholder();
511 
512  if(chunk == 0) {
513  chunk = 1;
514  }
515 
516  // positive case
517  if(beg < end) {
518  while(beg != end) {
519  auto o = static_cast<I>(chunk) * s;
520  auto e = std::min(beg + o, end);
521  auto task = emplace([=] () mutable {
522  for(auto i=beg; i<e; i+=s) {
523  c(i);
524  }
525  });
526  source.precede(task);
527  task.precede(target);
528  beg = e;
529  }
530  }
531  // negative case
532  else if(beg > end) {
533  while(beg != end) {
534  auto o = static_cast<I>(chunk) * s;
535  auto e = std::max(beg + o, end);
536  auto task = emplace([=] () mutable {
537  for(auto i=beg; i>e; i+=s) {
538  c(i);
539  }
540  });
541  source.precede(task);
542  task.precede(target);
543  beg = e;
544  }
545  }
546 
547  if(source.num_successors() == 0) {
548  source.precede(target);
549  }
550 
551  return std::make_pair(source, target);
552 }
553 
554 // Function: parallel_for
555 template <typename I, typename C,
556  std::enable_if_t<std::is_floating_point<std::decay_t<I>>::value, void>*
557 >
558 std::pair<Task, Task> FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t chunk) {
559 
560  if((s == 0) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
561  TF_THROW("invalid range [", beg, ", ", end, ") with step size ", s);
562  }
563 
564  // source and target
565  auto source = placeholder();
566  auto target = placeholder();
567 
568  if(chunk == 0) {
569  chunk = 1;
570  }
571 
572  // positive case
573  if(beg < end) {
574  size_t N=0;
575  I b = beg;
576  for(I e=beg; e<end; e+=s) {
577  if(++N == chunk) {
578  auto task = emplace([=] () mutable {
579  for(size_t i=0; i<N; ++i, b+=s) {
580  c(b);
581  }
582  });
583  source.precede(task);
584  task.precede(target);
585  N = 0;
586  b = e;
587  }
588  }
589 
590  if(N) {
591  auto task = emplace([=] () mutable {
592  for(size_t i=0; i<N; ++i, b+=s) {
593  c(b);
594  }
595  });
596  source.precede(task);
597  task.precede(target);
598  }
599  }
600  else if(beg > end) {
601  size_t N=0;
602  I b = beg;
603  for(I e=beg; e>end; e+=s) {
604  if(++N == chunk) {
605  auto task = emplace([=] () mutable {
606  for(size_t i=0; i<N; ++i, b+=s) {
607  c(b);
608  }
609  });
610  source.precede(task);
611  task.precede(target);
612  N = 0;
613  b = e;
614  }
615  }
616 
617  if(N) {
618  auto task = emplace([=] () mutable {
619  for(size_t i=0; i<N; ++i, b+=s) {
620  c(b);
621  }
622  });
623  source.precede(task);
624  task.precede(target);
625  }
626  }
627 
628  if(source.num_successors() == 0) {
629  source.precede(target);
630  }
631 
632  return std::make_pair(source, target);
633 }
634 
635 // Function: reduce_min
636 // Find the minimum element over a range of items.
637 template <typename I, typename T>
639  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
640  return std::min(l, r);
641  });
642 }
643 
644 // Function: reduce_max
645 // Find the maximum element over a range of items.
646 template <typename I, typename T>
648  return reduce(beg, end, result, [] (const auto& l, const auto& r) {
649  return std::max(l, r);
650  });
651 }
652 
653 // Function: transform_reduce
654 template <typename I, typename T, typename B, typename U>
656  I beg, I end, T& result, B&& bop, U&& uop
657 ) {
658 
659  //using category = typename std::iterator_traits<I>::iterator_category;
660 
661  // Even partition
662  size_t d = std::distance(beg, end);
663  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
664  size_t g = std::max((d + w - 1) / w, size_t{2});
665 
666  auto source = placeholder();
667  auto target = placeholder();
668 
669  //std::vector<std::future<T>> futures;
670  auto g_results = std::make_unique<T[]>(w);
671  size_t id {0};
672 
673  size_t remain = d;
674 
675  while(beg != end) {
676 
677  auto e = beg;
678 
679  size_t x = std::min(remain, g);
680  std::advance(e, x);
681  remain -= x;
682 
683  // Create a task
684  auto task = emplace([beg, e, bop, uop, res=&(g_results[id])] () mutable {
685  *res = uop(*beg);
686  for(++beg; beg != e; ++beg) {
687  *res = bop(std::move(*res), uop(*beg));
688  }
689  });
690 
691  source.precede(task);
692  task.precede(target);
693 
694  // adjust the pointer
695  beg = e;
696  id ++;
697  }
698 
699  // target synchronizer
700  target.work([&result, bop, res=make_moc(std::move(g_results)), w=id] () {
701  for(auto i=0u; i<w; i++) {
702  result = bop(std::move(result), res.object[i]);
703  }
704  });
705 
706  return std::make_pair(source, target);
707 }
708 
709 // Function: transform_reduce
710 template <typename I, typename T, typename B, typename P, typename U>
712  I beg, I end, T& result, B&& bop, P&& pop, U&& uop
713 ) {
714 
715  //using category = typename std::iterator_traits<I>::iterator_category;
716 
717  // Even partition
718  size_t d = std::distance(beg, end);
719  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
720  size_t g = std::max((d + w - 1) / w, size_t{2});
721 
722  auto source = placeholder();
723  auto target = placeholder();
724 
725  auto g_results = std::make_unique<T[]>(w);
726 
727  size_t id {0};
728  size_t remain = d;
729 
730  while(beg != end) {
731 
732  auto e = beg;
733 
734  size_t x = std::min(remain, g);
735  std::advance(e, x);
736  remain -= x;
737 
738  // Create a task
739  auto task = emplace([beg, e, uop, pop, res= &g_results[id]] () mutable {
740  *res = uop(*beg);
741  for(++beg; beg != e; ++beg) {
742  *res = pop(std::move(*res), *beg);
743  }
744  });
745  source.precede(task);
746  task.precede(target);
747 
748  // adjust the pointer
749  beg = e;
750  id ++;
751  }
752 
753  // target synchronizer
754  target.work([&result, bop, g_results=make_moc(std::move(g_results)), w=id] () {
755  for(auto i=0u; i<w; i++) {
756  result = bop(std::move(result), std::move(g_results.object[i]));
757  }
758  });
759 
760  return std::make_pair(source, target);
761 }
762 
763 // Procedure: _linearize
764 template <typename L>
765 void FlowBuilder::_linearize(L& keys) {
766 
767  auto itr = keys.begin();
768  auto end = keys.end();
769 
770  if(itr == end) {
771  return;
772  }
773 
774  auto nxt = itr;
775 
776  for(++nxt; nxt != end; ++nxt, ++itr) {
777  itr->_node->_precede(nxt->_node);
778  }
779 }
780 
781 // Procedure: linearize
783  _linearize(keys);
784 }
785 
786 // Procedure: linearize
788  _linearize(keys);
789 }
790 
791 // Proceduer: reduce
792 template <typename I, typename T, typename B>
793 std::pair<Task, Task> FlowBuilder::reduce(I beg, I end, T& result, B&& op) {
794 
795  //using category = typename std::iterator_traits<I>::iterator_category;
796 
797  size_t d = std::distance(beg, end);
798  size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
799  size_t g = std::max((d + w - 1) / w, size_t{2});
800 
801  auto source = placeholder();
802  auto target = placeholder();
803 
804  //T* g_results = static_cast<T*>(malloc(sizeof(T)*w));
805  auto g_results = std::make_unique<T[]>(w);
806  //std::vector<std::future<T>> futures;
807 
808  size_t id {0};
809  size_t remain = d;
810 
811  while(beg != end) {
812 
813  auto e = beg;
814 
815  size_t x = std::min(remain, g);
816  std::advance(e, x);
817  remain -= x;
818 
819  // Create a task
820  //auto [task, future] = emplace([beg, e, op] () mutable {
821  auto task = emplace([beg, e, op, res = &g_results[id]] () mutable {
822  *res = *beg;
823  for(++beg; beg != e; ++beg) {
824  *res = op(std::move(*res), *beg);
825  }
826  //auto init = *beg;
827  //for(++beg; beg != e; ++beg) {
828  // init = op(std::move(init), *beg);
829  //}
830  //return init;
831  });
832  source.precede(task);
833  task.precede(target);
834  //futures.push_back(std::move(future));
835 
836  // adjust the pointer
837  beg = e;
838  id ++;
839  }
840 
841  // target synchronizer
842  //target.work([&result, futures=MoC{std::move(futures)}, op] () {
843  // for(auto& fu : futures.object) {
844  // result = op(std::move(result), fu.get());
845  // }
846  //});
847  target.work([g_results=make_moc(std::move(g_results)), &result, op, w=id] () {
848  for(auto i=0u; i<w; i++) {
849  result = op(std::move(result), g_results.object[i]);
850  }
851  });
852 
853  return std::make_pair(source, target);
854 }
855 
856 // ----------------------------------------------------------------------------
857 
864 class Subflow : public FlowBuilder {
865 
866  public:
867 
871  template <typename... Args>
872  Subflow(Args&&... args);
873 
877  void join();
878 
882  void detach();
883 
887  bool detached() const;
888 
892  bool joined() const;
893 
894  private:
895 
896  bool _detached {false};
897 };
898 
899 // Constructor
900 template <typename... Args>
901 Subflow::Subflow(Args&&... args) :
902  FlowBuilder {std::forward<Args>(args)...} {
903 }
904 
905 // Procedure: join
906 inline void Subflow::join() {
907  _detached = false;
908 }
909 
910 // Procedure: detach
911 inline void Subflow::detach() {
912  _detached = true;
913 }
914 
915 // Function: detached
916 inline bool Subflow::detached() const {
917  return _detached;
918 }
919 
920 // Function: joined
921 inline bool Subflow::joined() const {
922  return !_detached;
923 }
924 
925 
926 // ----------------------------------------------------------------------------
927 // Legacy code
928 // ----------------------------------------------------------------------------
929 
930 using SubflowBuilder = Subflow;
931 
932 } // end of namespace tf. ---------------------------------------------------
933 
934 
void linearize(std::vector< Task > &tasks)
adds adjacent dependency links to a linear list of tasks
Definition: flow_builder.hpp:782
void broadcast(Task A, std::vector< Task > &others)
adds dependency links from one task A to many tasks
Definition: flow_builder.hpp:417
std::pair< Task, Task > transform_reduce(I beg, I end, T &result, B &&bop, U &&uop)
constructs a task dependency graph of parallel transformation and reduction
Definition: flow_builder.hpp:655
T end(T... args)
Definition: error.hpp:9
void succeed(std::vector< Task > &others, Task A)
adds dependency links from many tasks to one task A
Definition: flow_builder.hpp:431
T hardware_concurrency(T... args)
Task placeholder()
creates an empty task
Definition: flow_builder.hpp:445
Subflow(Args &&... args)
constructs a subflow builder object
Definition: flow_builder.hpp:901
void detach()
enables the subflow to detach from its parent task
Definition: flow_builder.hpp:911
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:916
std::pair< Task, Task > reduce_max(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::max
Definition: flow_builder.hpp:647
Task & succeed(Ts &&... tasks)
adds precedence links from other tasks to this
Definition: task.hpp:362
std::enable_if_t< is_static_task_v< C >, Task > emplace(C &&callable)
creates a static task from a given callable object
Definition: flow_builder.hpp:364
std::pair< Task, Task > parallel_for(I beg, I end, C &&callable, size_t chunk=1)
constructs a task dependency graph of range-based parallel_for
Definition: flow_builder.hpp:452
Task composed_of(Taskflow &taskflow)
creates a module task from a taskflow
Definition: flow_builder.hpp:404
void precede(Task A, Task B)
adds a dependency link from task A to task B
Definition: flow_builder.hpp:412
T make_pair(T... args)
main entry to create a task dependency graph
Definition: taskflow.hpp:18
FlowBuilder(Graph &graph)
constructs a flow builder object
Definition: flow_builder.hpp:351
building methods of a task dependency graph
Definition: flow_builder.hpp:13
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:921
handle to a node in a task dependency graph
Definition: task.hpp:113
Task & precede(Ts &&... tasks)
adds precedence links from this to other tasks
Definition: task.hpp:339
std::pair< Task, Task > reduce(I beg, I end, T &result, B &&bop)
construct a task dependency graph of parallel reduction
Definition: flow_builder.hpp:793
building methods of a subflow graph in dynamic tasking
Definition: flow_builder.hpp:864
void join()
enables the subflow to join its parent task
Definition: flow_builder.hpp:906
std::pair< Task, Task > reduce_min(I beg, I end, T &result)
constructs a task dependency graph of parallel reduction through std::min
Definition: flow_builder.hpp:638