transwarp
transwarp.h
1 /// @mainpage transwarp is a header-only C++ library for task concurrency
2 /// @details https://github.com/bloomen/transwarp
3 /// @version 2.2.2
4 /// @author Christian Blume, Guan Wang
5 /// @date 2019
6 /// @copyright MIT http://www.opensource.org/licenses/mit-license.php
7 #pragma once
8 #include <algorithm>
9 #ifndef TRANSWARP_CPP11
10 #include <any>
11 #endif
12 #include <array>
13 #include <atomic>
14 #include <chrono>
15 #include <cstddef>
16 #include <cstdint>
17 #include <functional>
18 #include <future>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #ifndef TRANSWARP_CPP11
23 #include <optional>
24 #endif
25 #include <queue>
26 #include <stdexcept>
27 #include <string>
28 #include <thread>
29 #include <tuple>
30 #include <type_traits>
31 #include <unordered_map>
32 #include <utility>
33 #include <vector>
34 
35 
36 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
37 
38 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
39 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
40 #endif
41 
42 #ifndef TRANSWARP_DISABLE_TASK_NAME
43 #define TRANSWARP_DISABLE_TASK_NAME
44 #endif
45 
46 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
47 #define TRANSWARP_DISABLE_TASK_PRIORITY
48 #endif
49 
50 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
51 #define TRANSWARP_DISABLE_TASK_REFCOUNT
52 #endif
53 
54 #ifndef TRANSWARP_DISABLE_TASK_TIME
55 #define TRANSWARP_DISABLE_TASK_TIME
56 #endif
57 
58 #endif
59 
60 
61 /// The transwarp namespace
62 namespace transwarp {
63 
64 
65 #ifdef TRANSWARP_CPP11
66 /// A simple value class that optionally holds a string
67 class option_str {
68 public:
69  option_str() {}
70 
71  option_str(std::string str)
72  : str_(std::move(str)),
73  valid_(true)
74  {}
75 
76  // default copy/move semantics
77  option_str(const option_str&) = default;
78  option_str& operator=(const option_str&) = default;
79  option_str(option_str&&) = default;
80  option_str& operator=(option_str&&) = default;
81 
82  operator bool() const noexcept {
83  return valid_;
84  }
85 
86  const std::string& operator*() const noexcept {
87  return str_;
88  }
89 
90 private:
91  std::string str_;
92  bool valid_ = false;
93 };
94 
95 /// Detail namespace for internal functionality only
96 namespace detail {
97 
98 /// Used to handle data storage for a type-erased object
99 class storage {
100 public:
101  virtual ~storage() = default;
102  virtual std::unique_ptr<storage> clone() const = 0;
103  virtual void destroy(void* data) const noexcept = 0;
104  virtual void copy(const void* src, void*& dest) const = 0;
105  virtual void move(void*& src, void*& dest) const noexcept = 0;
106 };
107 
108 /// Used to handle data storage for a type-erased object
109 template<typename T>
110 class storage_impl : public transwarp::detail::storage {
111 public:
112  std::unique_ptr<transwarp::detail::storage> clone() const override {
113  return std::unique_ptr<transwarp::detail::storage>(new storage_impl);
114  }
115  void destroy(void* data) const noexcept override {
116  delete reinterpret_cast<T*>(data);
117  }
118  void copy(const void* src, void*& dest) const override {
119  dest = new T(*reinterpret_cast<const T*>(src));
120  }
121  void move(void*& src, void*& dest) const noexcept override {
122  dest = src;
123  src = nullptr;
124  }
125 };
126 
127 } // detail
128 
129 /// A simple value class that can hold any value
130 class any_data {
131 public:
132  any_data()
133  : data_(nullptr)
134  {}
135 
136  template<typename T>
137  any_data(T&& value)
138  : storage_(new transwarp::detail::storage_impl<typename std::decay<T>::type>),
139  data_(new typename std::decay<T>::type(std::forward<T>(value)))
140  {}
141 
142  any_data(const any_data& other)
143  : storage_(other.storage_ ? other.storage_->clone() : nullptr)
144  {
145  if (other.data_) {
146  storage_->copy(other.data_, data_);
147  } else {
148  data_ = nullptr;
149  }
150  }
151 
152  any_data& operator=(const any_data& other) {
153  if (this != &other) {
154  if (storage_) {
155  storage_->destroy(data_);
156  }
157  storage_ = other.storage_ ? other.storage_->clone() : nullptr;
158  if (other.data_) {
159  storage_->copy(other.data_, data_);
160  } else {
161  data_ = nullptr;
162  }
163  }
164  return *this;
165  }
166 
167  any_data(any_data&& other)
168  : storage_(std::move(other.storage_))
169  {
170  if (other.data_) {
171  storage_->move(other.data_, data_);
172  } else {
173  data_ = nullptr;
174  }
175  }
176 
177  any_data& operator=(any_data&& other) {
178  if (this != &other) {
179  if (storage_) {
180  storage_->destroy(data_);
181  }
182  storage_ = std::move(other.storage_);
183  if (other.data_) {
184  storage_->move(other.data_, data_);
185  } else {
186  data_ = nullptr;
187  }
188  }
189  return *this;
190  }
191 
192  ~any_data() {
193  if (data_) {
194  storage_->destroy(data_);
195  }
196  }
197 
198  bool has_value() const noexcept {
199  return data_ != nullptr;
200  }
201 
202  template<typename T>
203  const T& get() const {
204  return *reinterpret_cast<const T*>(data_);
205  }
206 
207 private:
208  std::unique_ptr<transwarp::detail::storage> storage_;
209  void* data_;
210 };
211 
212 using str_view = const std::string&;
213 #else
214 using any_data = std::any;
215 using option_str = std::optional<std::string>;
216 using str_view = std::string_view;
217 #endif
218 
219 
220 /// The possible task types
221 enum class task_type {
222  root, ///< The task has no parents
223  accept, ///< The task's functor accepts all parent futures
224  accept_any, ///< The task's functor accepts the first parent future that becomes ready
225  consume, ///< The task's functor consumes all parent results
226  consume_any, ///< The task's functor consumes the first parent result that becomes ready
227  wait, ///< The task's functor takes no arguments but waits for all parents to finish
228  wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish
229 };
230 
231 
232 /// Base class for exceptions
233 class transwarp_error : public std::runtime_error {
234 public:
235  explicit transwarp_error(const std::string& message)
236  : std::runtime_error{message}
237  {}
238 };
239 
240 
241 /// Exception thrown when a task is canceled
243 public:
244  explicit task_canceled(const std::string& task_repr)
245  : transwarp::transwarp_error{"Task canceled: " + task_repr}
246  {}
247 };
248 
249 
250 /// Exception thrown when a task was destroyed prematurely
252 public:
253  explicit task_destroyed(const std::string& task_repr)
254  : transwarp::transwarp_error{"Task destroyed: " + task_repr}
255  {}
256 };
257 
258 
259 /// Exception thrown when an invalid parameter was passed to a function
261 public:
262  explicit invalid_parameter(const std::string& parameter)
263  : transwarp::transwarp_error{"Invalid parameter: " + parameter}
264  {}
265 };
266 
267 
268 /// Exception thrown when a task is used in unintended ways
270 public:
271  explicit control_error(const std::string& message)
272  : transwarp::transwarp_error{"Control error: " + message}
273  {}
274 };
275 
276 
277 /// The root type. Used for tag dispatch
278 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
279 constexpr transwarp::root_type root{}; ///< The root task tag
280 
281 /// The accept type. Used for tag dispatch
282 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
283 constexpr transwarp::accept_type accept{}; ///< The accept task tag
284 
285 /// The accept_any type. Used for tag dispatch
286 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
287 constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag
288 
289 /// The consume type. Used for tag dispatch
290 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
291 constexpr transwarp::consume_type consume{}; ///< The consume task tag
292 
293 /// The consume_any type. Used for tag dispatch
294 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
295 constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag
296 
297 /// The wait type. Used for tag dispatch
298 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
299 constexpr transwarp::wait_type wait{}; ///< The wait task tag
300 
301 /// The wait_any type. Used for tag dispatch
302 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
303 constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag
304 
305 
306 class itask;
307 
308 
309 /// Detail namespace for internal functionality only
310 namespace detail {
311 
312 struct visit_visitor;
313 struct unvisit_visitor;
314 struct final_visitor;
315 struct schedule_visitor;
316 struct parent_visitor;
318 
319 } // detail
320 
321 
322 /// The executor interface used to perform custom task execution
323 class executor {
324 public:
325  virtual ~executor() = default;
326 
327  /// Returns the name of the executor
328  virtual std::string name() const = 0;
329 
330  /// Runs a task which is wrapped by the given functor. The functor only
331  /// captures one shared pointer and can hence be copied at low cost.
332  /// task represents the task that the functor belongs to.
333  /// This function is only ever called on the thread of the caller to schedule().
334  /// The implementer needs to ensure that this never throws exceptions
335  virtual void execute(const std::function<void()>& functor, transwarp::itask& task) = 0;
336 };
337 
338 
339 /// The task events that can be subscribed to using the listener interface
340 enum class event_type {
341  before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule())
342  after_future_changed, ///< Just after the task's future was changed (handle_event called on thread that changed the task's future)
343  before_started, ///< Just before a task starts running (handle_event called on thread that task is run on)
344  before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on)
345  after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on)
346  after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on)
347  after_satisfied, ///< Just after a task has satisfied all its children with results (handle_event called on thread where the last child is satisfied)
348  after_custom_data_set, ///< Just after custom data was assigned (handle_event called on thread that custom data was set on)
349  count,
350 };
351 
352 
353 /// The listener interface to listen to events raised by tasks
354 class listener {
355 public:
356  virtual ~listener() = default;
357 
358  /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type).
359  /// The implementer needs to ensure that this never throws exceptions.
360  virtual void handle_event(transwarp::event_type event, transwarp::itask& task) = 0;
361 };
362 
363 
364 /// An edge between two tasks
365 class edge {
366 public:
368  : parent_(&parent), child_(&child)
369  {}
370 
371  // default copy/move semantics
372  edge(const edge&) = default;
373  edge& operator=(const edge&) = default;
374  edge(edge&&) = default;
375  edge& operator=(edge&&) = default;
376 
377  /// Returns the parent task
378  const transwarp::itask& parent() const noexcept {
379  return *parent_;
380  }
381 
382  /// Returns the parent task
383  transwarp::itask& parent() noexcept {
384  return *parent_;
385  }
386 
387  /// Returns the child task
388  const transwarp::itask& child() const noexcept {
389  return *child_;
390  }
391 
392  /// Returns the child task
393  transwarp::itask& child() noexcept {
394  return *child_;
395  }
396 
397 private:
398  transwarp::itask* parent_;
399  transwarp::itask* child_;
400 };
401 
402 
403 class timer;
404 class releaser;
405 
406 /// An interface for the task class
407 class itask : public std::enable_shared_from_this<itask> {
408 public:
409  virtual ~itask() = default;
410 
411  virtual void finalize() = 0;
412  virtual std::size_t id() const noexcept = 0;
413  virtual std::size_t level() const noexcept = 0;
414  virtual transwarp::task_type type() const noexcept = 0;
415  virtual const transwarp::option_str& name() const noexcept = 0;
416  virtual std::shared_ptr<transwarp::executor> executor() const noexcept = 0;
417  virtual std::int64_t priority() const noexcept = 0;
418  virtual const transwarp::any_data& custom_data() const noexcept = 0;
419  virtual bool canceled() const noexcept = 0;
420  virtual std::int64_t avg_idletime_us() const noexcept = 0;
421  virtual std::int64_t avg_waittime_us() const noexcept = 0;
422  virtual std::int64_t avg_runtime_us() const noexcept = 0;
423  virtual void set_executor(std::shared_ptr<transwarp::executor> executor) = 0;
424  virtual void set_executor_all(std::shared_ptr<transwarp::executor> executor) = 0;
425  virtual void remove_executor() = 0;
426  virtual void remove_executor_all() = 0;
427  virtual void set_priority(std::int64_t priority) = 0;
428  virtual void set_priority_all(std::int64_t priority) = 0;
429  virtual void reset_priority() = 0;
430  virtual void reset_priority_all() = 0;
431  virtual void set_custom_data(transwarp::any_data custom_data) = 0;
432  virtual void set_custom_data_all(transwarp::any_data custom_data) = 0;
433  virtual void remove_custom_data() = 0;
434  virtual void remove_custom_data_all() = 0;
435  virtual void add_listener(std::shared_ptr<transwarp::listener> listener) = 0;
436  virtual void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
437  virtual void add_listener_all(std::shared_ptr<transwarp::listener> listener) = 0;
438  virtual void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) = 0;
439  virtual void remove_listener(const std::shared_ptr<transwarp::listener>& listener) = 0;
440  virtual void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
441  virtual void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) = 0;
442  virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) = 0;
443  virtual void remove_listeners() = 0;
444  virtual void remove_listeners(transwarp::event_type event) = 0;
445  virtual void remove_listeners_all() = 0;
446  virtual void remove_listeners_all(transwarp::event_type event) = 0;
447  virtual void schedule() = 0;
448  virtual void schedule(transwarp::executor& executor) = 0;
449  virtual void schedule(bool reset) = 0;
450  virtual void schedule(transwarp::executor& executor, bool reset) = 0;
451  virtual void schedule_all() = 0;
452  virtual void schedule_all(transwarp::executor& executor) = 0;
453  virtual void schedule_all(bool reset_all) = 0;
454  virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0;
455  virtual void set_exception(std::exception_ptr exception) = 0;
456  virtual bool was_scheduled() const noexcept = 0;
457  virtual void wait() const = 0;
458  virtual bool is_ready() const = 0;
459  virtual bool has_result() const = 0;
460  virtual void reset() = 0;
461  virtual void reset_all() = 0;
462  virtual void cancel(bool enabled) noexcept = 0;
463  virtual void cancel_all(bool enabled) noexcept = 0;
464  virtual std::vector<itask*> parents() const = 0;
465  virtual const std::vector<itask*>& tasks() = 0;
466  virtual std::vector<transwarp::edge> edges() = 0;
467 
468 protected:
469  virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0;
470 
471 private:
472  friend struct transwarp::detail::visit_visitor;
474  friend struct transwarp::detail::final_visitor;
476  friend struct transwarp::detail::parent_visitor;
477  friend class transwarp::timer;
478  friend class transwarp::releaser;
480 
481  virtual void visit(const std::function<void(itask&)>& visitor) = 0;
482  virtual void unvisit() noexcept = 0;
483  virtual void set_id(std::size_t id) noexcept = 0;
484  virtual void set_level(std::size_t level) noexcept = 0;
485  virtual void set_type(transwarp::task_type type) noexcept = 0;
486  virtual void set_name(transwarp::option_str name) noexcept = 0;
487  virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
488  virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
489  virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
490  virtual void increment_childcount() noexcept = 0;
491  virtual void decrement_refcount() = 0;
492  virtual void reset_future() = 0;
493 };
494 
495 
496 /// String conversion for the task_type enumeration
497 inline
498 std::string to_string(const transwarp::task_type& type) {
499  switch (type) {
500  case transwarp::task_type::root: return "root";
501  case transwarp::task_type::accept: return "accept";
502  case transwarp::task_type::accept_any: return "accept_any";
503  case transwarp::task_type::consume: return "consume";
504  case transwarp::task_type::consume_any: return "consume_any";
505  case transwarp::task_type::wait: return "wait";
506  case transwarp::task_type::wait_any: return "wait_any";
507  }
508  throw transwarp::invalid_parameter{"task type"};
509 }
510 
511 
512 /// String conversion for the itask class
513 inline
514 std::string to_string(const transwarp::itask& task, transwarp::str_view separator="\n") {
515  std::string s;
516  s += '"';
517  const transwarp::option_str& name = task.name();
518  if (name) {
519  s += std::string{"<"} + *name + std::string{">"} + separator.data();
520  }
521  s += transwarp::to_string(task.type());
522  s += std::string{" id="} + std::to_string(task.id());
523  s += std::string{" lev="} + std::to_string(task.level());
524  const std::shared_ptr<transwarp::executor> exec = task.executor();
525  if (exec) {
526  s += separator.data() + std::string{"<"} + exec->name() + std::string{">"};
527  }
528  const std::int64_t avg_idletime_us = task.avg_idletime_us();
529  if (avg_idletime_us >= 0) {
530  s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us);
531  }
532  const std::int64_t avg_waittime_us = task.avg_waittime_us();
533  if (avg_waittime_us >= 0) {
534  s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us);
535  }
536  const std::int64_t avg_runtime_us = task.avg_runtime_us();
537  if (avg_runtime_us >= 0) {
538  s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us);
539  }
540  return s + '"';
541 }
542 
543 
544 /// String conversion for the edge class
545 inline
546 std::string to_string(const transwarp::edge& edge, transwarp::str_view separator="\n") {
547  return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator);
548 }
549 
550 
551 /// Creates a dot-style string from the given edges
552 inline
553 std::string to_string(const std::vector<transwarp::edge>& edges, transwarp::str_view separator="\n") {
554  std::string dot = std::string{"digraph {"} + separator.data();
555  for (const transwarp::edge& edge : edges) {
556  dot += transwarp::to_string(edge, separator) + separator.data();
557  }
558  dot += std::string{"}"};
559  return dot;
560 }
561 
562 
563 /// Removes reference and const from a type
564 template<typename T>
565 struct decay {
566  using type = typename std::remove_const<typename std::remove_reference<T>::type>::type;
567 };
568 
569 
570 /// Returns the result type of a std::shared_future<T>
571 template<typename T>
572 struct result {
573  using type = decltype(std::declval<std::shared_future<T>>().get());
574 };
575 
576 
577 /// Detail namespace for internal functionality only
578 namespace detail {
579 
580 /// Clones a task
581 template<typename TaskType>
582 std::shared_ptr<TaskType> clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<TaskType>& t) {
583  const auto original_task = std::static_pointer_cast<transwarp::itask>(t);
584  const auto task_cache_it = task_cache.find(original_task);
585  if (task_cache_it != task_cache.cend()) {
586  return std::static_pointer_cast<TaskType>(task_cache_it->second);
587  } else {
588  auto cloned_task = t->clone_impl(task_cache);
589  task_cache[original_task] = cloned_task;
590  return cloned_task;
591  }
592 }
593 
594 } // detail
595 
596 
597 /// The task class
598 template<typename ResultType>
599 class task : public transwarp::itask {
600 public:
601  using result_type = ResultType;
602 
603  virtual ~task() = default;
604 
605  std::shared_ptr<task> clone() const {
606  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
607  return clone_impl(task_cache);
608  }
609 
610  virtual void set_value(const typename transwarp::decay<result_type>::type& value) = 0;
611  virtual void set_value(typename transwarp::decay<result_type>::type&& value) = 0;
612  virtual std::shared_future<result_type> future() const noexcept = 0;
613  virtual typename transwarp::result<result_type>::type get() const = 0;
614 
615 private:
616  template<typename T>
617  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
618 
619  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
620 };
621 
622 /// The task class (reference result type)
623 template<typename ResultType>
624 class task<ResultType&> : public transwarp::itask {
625 public:
626  using result_type = ResultType&;
627 
628  virtual ~task() = default;
629 
630  std::shared_ptr<task> clone() const {
631  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
632  return clone_impl(task_cache);
633  }
634 
635  virtual void set_value(typename transwarp::decay<result_type>::type& value) = 0;
636  virtual std::shared_future<result_type> future() const noexcept = 0;
637  virtual typename transwarp::result<result_type>::type get() const = 0;
638 
639 private:
640  template<typename T>
641  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
642 
643  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
644 };
645 
646 /// The task class (void result type)
647 template<>
648 class task<void> : public transwarp::itask {
649 public:
650  using result_type = void;
651 
652  virtual ~task() = default;
653 
654  std::shared_ptr<task> clone() const {
655  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
656  return clone_impl(task_cache);
657  }
658 
659  virtual void set_value() = 0;
660  virtual std::shared_future<result_type> future() const noexcept = 0;
661  virtual result_type get() const = 0;
662 
663 private:
664  template<typename T>
665  friend std::shared_ptr<T> transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const std::shared_ptr<T>& t);
666 
667  virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const = 0;
668 };
669 
670 
671 /// Detail namespace for internal functionality only
672 namespace detail {
673 
674 template<bool>
676 
677 } // detail
678 
679 
680 /// A base class for a user-defined functor that needs access to the associated
681 /// task or a cancel point to stop a task while it's running
682 class functor {
683 public:
684 
685  virtual ~functor() = default;
686 
687 protected:
688 
689  /// The associated task (only to be called after the task was constructed)
690  const transwarp::itask& transwarp_task() const noexcept {
691  return *transwarp_task_;
692  }
693 
694  /// The associated task (only to be called after the task was constructed)
696  return *transwarp_task_;
697  }
698 
699  /// If the associated task is canceled then this will throw transwarp::task_canceled
700  /// which will stop the task while it's running (only to be called after the task was constructed)
701  void transwarp_cancel_point() const {
702  if (transwarp_task_->canceled()) {
703  throw transwarp::task_canceled{std::to_string(transwarp_task_->id())};
704  }
705  }
706 
707 private:
708  template<bool>
710 
711  transwarp::itask* transwarp_task_{};
712 };
713 
714 
715 /// Detail namespace for internal functionality only
716 namespace detail {
717 
718 
719 /// A simple thread pool used to execute tasks in parallel
720 class thread_pool {
721 public:
722 
723  explicit thread_pool(const std::size_t n_threads,
724  std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
725  : on_thread_started_{std::move(on_thread_started)}
726  {
727  if (n_threads == 0) {
728  throw transwarp::invalid_parameter{"number of threads"};
729  }
730  for (std::size_t i = 0; i < n_threads; ++i) {
731  std::thread thread;
732  try {
733  thread = std::thread(&thread_pool::worker, this, i);
734  } catch (...) {
735  shutdown();
736  throw;
737  }
738  try {
739  threads_.push_back(std::move(thread));
740  } catch (...) {
741  shutdown();
742  thread.join();
743  throw;
744  }
745  }
746  }
747 
748  // delete copy/move semantics
749  thread_pool(const thread_pool&) = delete;
750  thread_pool& operator=(const thread_pool&) = delete;
751  thread_pool(thread_pool&&) = delete;
752  thread_pool& operator=(thread_pool&&) = delete;
753 
754  ~thread_pool() {
755  shutdown();
756  }
757 
758  void push(const std::function<void()>& functor) {
759  {
760  std::lock_guard<std::mutex> lock{mutex_};
761  functors_.push(functor);
762  }
763  cond_var_.notify_one();
764  }
765 
766 private:
767 
768  void worker(const std::size_t index) {
769  if (on_thread_started_) {
770  on_thread_started_(index);
771  }
772  for (;;) {
773  std::function<void()> functor;
774  {
775  std::unique_lock<std::mutex> lock{mutex_};
776  cond_var_.wait(lock, [this]{
777  return done_ || !functors_.empty();
778  });
779  if (done_ && functors_.empty()) {
780  break;
781  }
782  functor = std::move(functors_.front());
783  functors_.pop();
784  }
785  functor();
786  }
787  }
788 
789  void shutdown() {
790  {
791  std::lock_guard<std::mutex> lock{mutex_};
792  done_ = true;
793  }
794  cond_var_.notify_all();
795  for (std::thread& thread : threads_) {
796  thread.join();
797  }
798  threads_.clear();
799  }
800 
801  bool done_ = false;
802  std::function<void(std::size_t)> on_thread_started_;
803  std::vector<std::thread> threads_;
804  std::queue<std::function<void()>> functors_;
805  std::condition_variable cond_var_;
806  std::mutex mutex_;
807 };
808 
809 
810 #ifdef TRANSWARP_CPP11
811 template<std::size_t...> struct indices {};
812 
813 template<std::size_t...> struct construct_range;
814 
815 template<std::size_t end, std::size_t idx, std::size_t... i>
816 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
817 
818 template<std::size_t end, std::size_t... i>
819 struct construct_range<end, end, i...> {
820  using type = transwarp::detail::indices<i...>;
821 };
822 
823 template<std::size_t b, std::size_t e>
824 struct index_range {
825  using type = typename transwarp::detail::construct_range<e, b>::type;
826 };
827 
828 template<typename Functor, typename Tuple>
829 void call_with_each_index(transwarp::detail::indices<>, Functor&&, Tuple&&) {}
830 
831 template<std::size_t i, std::size_t... j, typename Functor, typename Tuple>
832 void call_with_each_index(transwarp::detail::indices<i, j...>, Functor&& f, Tuple&& t) {
833  f(std::get<i>(t));
834  transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>{}, std::forward<Functor>(f), std::forward<Tuple>(t));
835 }
836 #endif
837 
838 template<typename Functor, typename Tuple>
839 void apply_to_each(Functor&& f, Tuple&& t) {
840 #ifdef TRANSWARP_CPP11
841  constexpr std::size_t n = std::tuple_size<typename std::decay<Tuple>::type>::value;
842  using index_t = typename transwarp::detail::index_range<0, n>::type;
843  transwarp::detail::call_with_each_index(index_t{}, std::forward<Functor>(f), std::forward<Tuple>(t));
844 #else
845  std::apply([&f](auto&&... arg){(..., std::forward<Functor>(f)(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
846 #endif
847 }
848 
849 template<typename Functor, typename ElementType>
850 void apply_to_each(Functor&& f, const std::vector<ElementType>& v) {
851  std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
852 }
853 
854 template<typename Functor, typename ElementType>
855 void apply_to_each(Functor&& f, std::vector<ElementType>& v) {
856  std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
857 }
858 
859 
860 template<int offset, typename... ParentResults>
862  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
863  std::get<offset>(target) = std::get<offset>(source)->future();
865  }
866 };
867 
868 template<typename... ParentResults>
869 struct assign_futures_impl<-1, ParentResults...> {
870  static void work(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&, std::tuple<std::shared_future<ParentResults>...>&) {}
871 };
872 
873 /// Returns the futures from the given tuple of tasks
874 template<typename... ParentResults>
875 std::tuple<std::shared_future<ParentResults>...> get_futures(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& input) {
876  std::tuple<std::shared_future<ParentResults>...> result;
877  assign_futures_impl<static_cast<int>(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result);
878  return result;
879 }
880 
881 /// Returns the futures from the given vector of tasks
882 template<typename ParentResultType>
883 std::vector<std::shared_future<ParentResultType>> get_futures(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& input) {
884  std::vector<std::shared_future<ParentResultType>> result;
885  result.reserve(input.size());
886  for (const std::shared_ptr<transwarp::task<ParentResultType>>& task : input) {
887  result.emplace_back(task->future());
888  }
889  return result;
890 }
891 
892 
893 /// Runs the task with the given arguments, hence, invoking the task's functor
894 template<typename Result, typename Task, typename... Args>
895 Result run_task(std::size_t task_id, const std::weak_ptr<Task>& task, Args&&... args) {
896  const std::shared_ptr<Task> t = task.lock();
897  if (!t) {
898  throw transwarp::task_destroyed{std::to_string(task_id)};
899  }
900  if (t->canceled()) {
901  throw transwarp::task_canceled{std::to_string(task_id)};
902  }
904  return (*t->functor_)(std::forward<Args>(args)...);
905 }
906 
907 
909  template<typename T>
910  void operator()(const T& p) const {
911  p->future().wait();
912  }
913 };
914 
915 /// Waits for all parents to finish
916 template<typename... ParentResults>
917 void wait_for_all(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
918  transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
919 }
920 
921 /// Waits for all parents to finish
922 template<typename ParentResultType>
923 void wait_for_all(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
924  transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents);
925 }
926 
927 
928 template<typename Parent>
929 Parent wait_for_any_impl() {
930  return {};
931 }
932 
933 template<typename Parent, typename ParentResult, typename... ParentResults>
934 Parent wait_for_any_impl(const std::shared_ptr<transwarp::task<ParentResult>>& parent, const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
935  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
936  if (status == std::future_status::ready) {
937  return parent;
938  }
939  return transwarp::detail::wait_for_any_impl<Parent>(parents...);
940 }
941 
942 /// Waits for the first parent to finish
943 template<typename Parent, typename... ParentResults>
944 Parent wait_for_any(const std::shared_ptr<transwarp::task<ParentResults>>& ...parents) {
945  for (;;) {
946  Parent parent = transwarp::detail::wait_for_any_impl<Parent>(parents...);
947  if (parent) {
948  return parent;
949  }
950  }
951 }
952 
953 
954 /// Waits for the first parent to finish
955 template<typename ParentResultType>
956 std::shared_ptr<transwarp::task<ParentResultType>> wait_for_any(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
957  for (;;) {
958  for (const std::shared_ptr<transwarp::task<ParentResultType>>& parent : parents) {
959  const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
960  if (status == std::future_status::ready) {
961  return parent;
962  }
963  }
964  }
965 }
966 
967 
968 template<typename OneResult>
970  explicit cancel_all_but_one_functor(const std::shared_ptr<transwarp::task<OneResult>>& one) noexcept
971  : one_(one) {}
972 
973  template<typename T>
974  void operator()(const T& parent) const {
975  if (one_ != parent) {
976  parent->cancel(true);
977  }
978  }
979 
980  const std::shared_ptr<transwarp::task<OneResult>>& one_;
981 };
982 
983 /// Cancels all tasks but one
984 template<typename OneResult, typename... ParentResults>
985 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
986  transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
987 }
988 
989 /// Cancels all tasks but one
990 template<typename OneResult, typename ParentResultType>
991 void cancel_all_but_one(const std::shared_ptr<transwarp::task<OneResult>>& one, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
992  transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor<OneResult>{one}, parents);
993 }
994 
995 
997  template<typename T>
998  void operator()(const T& task) const {
999  task->decrement_refcount();
1000  }
1001 };
1002 
1003 /// Decrements the refcount of all parents
1004 template<typename... ParentResults>
1005 void decrement_refcount(const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1006  transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1007 }
1008 
1009 /// Decrements the refcount of all parents
1010 template<typename ParentResultType>
1011 void decrement_refcount(const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1012  transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents);
1013 }
1014 
1015 
1016 template<typename TaskType, bool done, int total, int... n>
1017 struct call_impl {
1018  template<typename Result, typename Task, typename... ParentResults>
1019  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1020  return call_impl<TaskType, total == 1 + static_cast<int>(sizeof...(n)), total, n..., static_cast<int>(sizeof...(n))>::template
1021  work<Result>(task_id, task, parents);
1022  }
1023 };
1024 
1025 template<typename TaskType>
1027 
1028 template<int total, int... n>
1029 struct call_impl<transwarp::root_type, true, total, n...> {
1030  template<typename Result, typename Task, typename... ParentResults>
1031  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>&) {
1032  return transwarp::detail::run_task<Result>(task_id, task);
1033  }
1034 };
1035 
1036 template<>
1038  template<typename Result, typename Task, typename ParentResultType>
1039  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>&) {
1040  return transwarp::detail::run_task<Result>(task_id, task);
1041  }
1042 };
1043 
1044 template<int total, int... n>
1045 struct call_impl<transwarp::accept_type, true, total, n...> {
1046  template<typename Result, typename Task, typename... ParentResults>
1047  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1049  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1051  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures)...);
1052  }
1053 };
1054 
1055 template<>
1057  template<typename Result, typename Task, typename ParentResultType>
1058  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1060  std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1062  return transwarp::detail::run_task<Result>(task_id, task, std::move(futures));
1063  }
1064 };
1065 
1066 template<int total, int... n>
1067 struct call_impl<transwarp::accept_any_type, true, total, n...> {
1068  template<typename Result, typename Task, typename... ParentResults>
1069  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1070  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1071  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1073  auto future = parent->future();
1075  return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1076  }
1077 };
1078 
1079 template<>
1081  template<typename Result, typename Task, typename ParentResultType>
1082  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1083  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1085  auto future = parent->future();
1087  return transwarp::detail::run_task<Result>(task_id, task, std::move(future));
1088  }
1089 };
1090 
1091 template<int total, int... n>
1092 struct call_impl<transwarp::consume_type, true, total, n...> {
1093  template<typename Result, typename Task, typename... ParentResults>
1094  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1096  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1098  return transwarp::detail::run_task<Result>(task_id, task, std::get<n>(futures).get()...);
1099  }
1100 };
1101 
1102 template<>
1104  template<typename Result, typename Task, typename ParentResultType>
1105  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1107  const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1109  std::vector<ParentResultType> results;
1110  results.reserve(futures.size());
1111  for (const std::shared_future<ParentResultType>& future : futures) {
1112  results.emplace_back(future.get());
1113  }
1114  return transwarp::detail::run_task<Result>(task_id, task, std::move(results));
1115  }
1116 };
1117 
1118 template<int total, int... n>
1119 struct call_impl<transwarp::consume_any_type, true, total, n...> {
1120  template<typename Result, typename Task, typename... ParentResults>
1121  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1122  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; /// Use first type as reference
1123  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1125  const auto future = parent->future();
1127  return transwarp::detail::run_task<Result>(task_id, task, future.get());
1128  }
1129 };
1130 
1131 template<>
1133  template<typename Result, typename Task, typename ParentResultType>
1134  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1135  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1137  const auto future = parent->future();
1139  return transwarp::detail::run_task<Result>(task_id, task, future.get());
1140  }
1141 };
1142 
1144  template<typename T>
1145  void operator()(const std::shared_future<T>& f) const {
1146  f.get();
1147  }
1148 };
1149 
1150 template<int total, int... n>
1151 struct call_impl<transwarp::wait_type, true, total, n...> {
1152  template<typename Result, typename Task, typename... ParentResults>
1153  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1155  const std::tuple<std::shared_future<ParentResults>...> futures = transwarp::detail::get_futures(parents);
1157  transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1158  return transwarp::detail::run_task<Result>(task_id, task);
1159  }
1160 };
1161 
1162 template<>
1164  template<typename Result, typename Task, typename ParentResultType>
1165  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1167  const std::vector<std::shared_future<ParentResultType>> futures = transwarp::detail::get_futures(parents);
1169  transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated
1170  return transwarp::detail::run_task<Result>(task_id, task);
1171  }
1172 };
1173 
1174 template<int total, int... n>
1175 struct call_impl<transwarp::wait_any_type, true, total, n...> {
1176  template<typename Result, typename Task, typename... ParentResults>
1177  static Result work(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1178  using parent_t = typename std::remove_reference<decltype(std::get<0>(parents))>::type; // Use first type as reference
1179  parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(parents)...);
1181  const auto future = parent->future();
1183  future.get(); // Ensures that exceptions are propagated
1184  return transwarp::detail::run_task<Result>(task_id, task);
1185  }
1186 };
1187 
1188 template<>
1190  template<typename Result, typename Task, typename ParentResultType>
1191  static Result work(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1192  std::shared_ptr<transwarp::task<ParentResultType>> parent = transwarp::detail::wait_for_any(parents);
1194  const auto future = parent->future();
1196  future.get(); // Ensures that exceptions are propagated
1197  return transwarp::detail::run_task<Result>(task_id, task);
1198  }
1199 };
1200 
1201 /// Calls the functor of the given task with the results from the tuple of parents.
1202 /// Throws transwarp::task_canceled if the task is canceled.
1203 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1204 template<typename TaskType, typename Result, typename Task, typename... ParentResults>
1205 Result call(std::size_t task_id, const Task& task, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& parents) {
1206  constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
1208  work<Result>(task_id, task, parents);
1209 }
1210 
1211 /// Calls the functor of the given task with the results from the vector of parents.
1212 /// Throws transwarp::task_canceled if the task is canceled.
1213 /// Throws transwarp::task_destroyed in case the task was destroyed prematurely.
1214 template<typename TaskType, typename Result, typename Task, typename ParentResultType>
1215 Result call(std::size_t task_id, const Task& task, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& parents) {
1217  work<Result>(task_id, task, parents);
1218 }
1219 
1220 
1221 template<typename Functor>
1223  explicit call_with_each_functor(const Functor& f) noexcept
1224  : f_(f) {}
1225 
1226  template<typename T>
1227  void operator()(const T& task) const {
1228  if (!task) {
1229  throw transwarp::invalid_parameter{"task pointer"};
1230  }
1231  f_(*task);
1232  }
1233 
1234  const Functor& f_;
1235 };
1236 
1237 /// Calls the functor with every element in the tuple
1238 template<typename Functor, typename... ParentResults>
1239 void call_with_each(const Functor& f, const std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>& t) {
1240  transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, t);
1241 }
1242 
1243 /// Calls the functor with every element in the vector
1244 template<typename Functor, typename ParentResultType>
1245 void call_with_each(const Functor& f, const std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>& v) {
1246  transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor<Functor>{f}, v);
1247 }
1248 
1249 
1250 /// Sets level of a task and increments the child count
1252  explicit parent_visitor(transwarp::itask& task) noexcept
1253  : task_(task) {}
1254 
1255  void operator()(transwarp::itask& task) const {
1256  if (task_.level() <= task.level()) {
1257  // A child's level is always larger than any of its parents' levels
1258  task_.set_level(task.level() + 1);
1259  }
1260  task.increment_childcount();
1261  }
1262 
1263  transwarp::itask& task_;
1264 };
1265 
1266 /// Applies final bookkeeping to the task and collects the task
1268  explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1269  : tasks_(tasks) {}
1270 
1271  void operator()(transwarp::itask& task) noexcept {
1272  tasks_.push_back(&task);
1273  task.set_id(id_++);
1274  }
1275 
1276  std::vector<transwarp::itask*>& tasks_;
1277  std::size_t id_ = 0;
1278 };
1279 
1280 /// Generates edges
1282  explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1283  : edges_(edges) {}
1284 
1285  void operator()(transwarp::itask& task) {
1286  for (transwarp::itask* parent : task.parents()) {
1287  edges_.emplace_back(*parent, task);
1288  }
1289  }
1290 
1291  std::vector<transwarp::edge>& edges_;
1292 };
1293 
1294 /// Schedules using the given executor
1296  schedule_visitor(bool reset, transwarp::executor* executor) noexcept
1297  : reset_(reset), executor_(executor) {}
1298 
1299  void operator()(transwarp::itask& task) {
1300  task.schedule_impl(reset_, executor_);
1301  }
1302 
1303  bool reset_;
1304  transwarp::executor* executor_;
1305 };
1306 
1307 /// Resets the given task
1309 
1310  void operator()(transwarp::itask& task) const {
1311  task.reset();
1312  }
1313 };
1314 
1315 /// Cancels or resumes the given task
1317  explicit cancel_visitor(bool enabled) noexcept
1318  : enabled_{enabled} {}
1319 
1320  void operator()(transwarp::itask& task) const noexcept {
1321  task.cancel(enabled_);
1322  }
1323 
1324  bool enabled_;
1325 };
1326 
1327 /// Assigns an executor to the given task
1329  explicit set_executor_visitor(std::shared_ptr<transwarp::executor> executor) noexcept
1330  : executor_{std::move(executor)} {}
1331 
1332  void operator()(transwarp::itask& task) const noexcept {
1333  task.set_executor(executor_);
1334  }
1335 
1336  std::shared_ptr<transwarp::executor> executor_;
1337 };
1338 
1339 /// Removes the executor from the given task
1341 
1342  void operator()(transwarp::itask& task) const noexcept {
1343  task.remove_executor();
1344  }
1345 };
1346 
1347 /// Assigns a priority to the given task
1349  explicit set_priority_visitor(std::int64_t priority) noexcept
1350  : priority_{priority} {}
1351 
1352  void operator()(transwarp::itask& task) const noexcept {
1353  task.set_priority(priority_);
1354  }
1355 
1356  std::int64_t priority_;
1357 };
1358 
1359 /// Resets the priority of the given task
1361 
1362  void operator()(transwarp::itask& task) const noexcept {
1363  task.reset_priority();
1364  }
1365 };
1366 
1367 /// Assigns custom data to the given task
1369  explicit set_custom_data_visitor(transwarp::any_data custom_data) noexcept
1370  : custom_data_{std::move(custom_data)} {}
1371 
1372  void operator()(transwarp::itask& task) const noexcept {
1373  task.set_custom_data(custom_data_);
1374  }
1375 
1376  transwarp::any_data custom_data_;
1377 };
1378 
1379 /// Removes custom data from the given task
1381 
1382  void operator()(transwarp::itask& task) const noexcept {
1383  task.remove_custom_data();
1384  }
1385 };
1386 
1387 /// Pushes the given task into the vector of tasks
1389  explicit push_task_visitor(std::vector<transwarp::itask*>& tasks)
1390  : tasks_(tasks) {}
1391 
1392  void operator()(transwarp::itask& task) {
1393  tasks_.push_back(&task);
1394  }
1395 
1396  std::vector<transwarp::itask*>& tasks_;
1397 };
1398 
1399 /// Adds a new listener to the given task
1401  explicit add_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1402  : listener_(std::move(listener))
1403  {}
1404 
1405  void operator()(transwarp::itask& task) {
1406  task.add_listener(listener_);
1407  }
1408 
1409  std::shared_ptr<transwarp::listener> listener_;
1410 };
1411 
1412 /// Adds a new listener per event type to the given task
1414  add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1415  : event_(event), listener_(std::move(listener))
1416  {}
1417 
1418  void operator()(transwarp::itask& task) {
1419  task.add_listener(event_, listener_);
1420  }
1421 
1422  transwarp::event_type event_;
1423  std::shared_ptr<transwarp::listener> listener_;
1424 };
1425 
1426 /// Removes a listener from the given task
1428  explicit remove_listener_visitor(std::shared_ptr<transwarp::listener> listener)
1429  : listener_(std::move(listener))
1430  {}
1431 
1432  void operator()(transwarp::itask& task) {
1433  task.remove_listener(listener_);
1434  }
1435 
1436  std::shared_ptr<transwarp::listener> listener_;
1437 };
1438 
1439 /// Removes a listener per event type from the given task
1441  remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener)
1442  : event_(event), listener_(std::move(listener))
1443  {}
1444 
1445  void operator()(transwarp::itask& task) {
1446  task.remove_listener(event_, listener_);
1447  }
1448 
1449  transwarp::event_type event_;
1450  std::shared_ptr<transwarp::listener> listener_;
1451 };
1452 
1453 /// Removes all listeners from the given task
1455 
1456  void operator()(transwarp::itask& task) {
1457  task.remove_listeners();
1458  }
1459 
1460 };
1461 
1462 /// Removes all listeners per event type from the given task
1465  : event_(event)
1466  {}
1467 
1468  void operator()(transwarp::itask& task) {
1469  task.remove_listeners(event_);
1470  }
1471 
1472  transwarp::event_type event_;
1473 };
1474 
1475 /// Visits the given task using the visitor given in the constructor
1477  explicit visit_visitor(const std::function<void(transwarp::itask&)>& visitor) noexcept
1478  : visitor_(visitor) {}
1479 
1480  void operator()(transwarp::itask& task) const {
1481  task.visit(visitor_);
1482  }
1483 
1484  const std::function<void(transwarp::itask&)>& visitor_;
1485 };
1486 
1487 /// Unvisits the given task
1489 
1490  void operator()(transwarp::itask& task) const noexcept {
1491  task.unvisit();
1492  }
1493 };
1494 
1495 /// Determines the result type of the Functor dispatching on the task type
1496 template<typename TaskType, typename Functor, typename... ParentResults>
1498  static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1499  std::is_same<TaskType, transwarp::accept_type>::value ||
1500  std::is_same<TaskType, transwarp::accept_any_type>::value ||
1501  std::is_same<TaskType, transwarp::consume_type>::value ||
1502  std::is_same<TaskType, transwarp::consume_any_type>::value ||
1503  std::is_same<TaskType, transwarp::wait_type>::value ||
1504  std::is_same<TaskType, transwarp::wait_any_type>::value,
1505  "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1506 };
1507 
1508 template<typename Functor, typename... ParentResults>
1509 struct functor_result<transwarp::root_type, Functor, ParentResults...> {
1510  static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks");
1511  using type = decltype(std::declval<Functor>()());
1512 };
1513 
1514 template<typename Functor, typename... ParentResults>
1515 struct functor_result<transwarp::accept_type, Functor, ParentResults...> {
1516  static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent");
1517  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1518 };
1519 
1520 template<typename Functor, typename ParentResultType>
1521 struct functor_result<transwarp::accept_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1522  using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1523 };
1524 
1525 template<typename Functor, typename... ParentResults>
1526 struct functor_result<transwarp::accept_any_type, Functor, ParentResults...> {
1527  static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent");
1528  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1529  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1530 };
1531 
1532 template<typename Functor, typename ParentResultType>
1533 struct functor_result<transwarp::accept_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1534  using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1535 };
1536 
1537 template<typename Functor, typename... ParentResults>
1538 struct functor_result<transwarp::consume_type, Functor, ParentResults...> {
1539  static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent");
1540  using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1541 };
1542 
1543 template<typename Functor, typename ParentResultType>
1544 struct functor_result<transwarp::consume_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1545  using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1546 };
1547 
1548 template<typename Functor, typename... ParentResults>
1549 struct functor_result<transwarp::consume_any_type, Functor, ParentResults...> {
1550  static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent");
1551  using arg_t = typename std::tuple_element<0, std::tuple<ParentResults...>>::type; // Using first type as reference
1552  using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1553 };
1554 
1555 template<typename Functor, typename ParentResultType>
1556 struct functor_result<transwarp::consume_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1557  using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1558 };
1559 
1560 template<typename Functor, typename... ParentResults>
1561 struct functor_result<transwarp::wait_type, Functor, ParentResults...> {
1562  static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent");
1563  using type = decltype(std::declval<Functor>()());
1564 };
1565 
1566 template<typename Functor, typename ParentResultType>
1567 struct functor_result<transwarp::wait_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1568  using type = decltype(std::declval<Functor>()());
1569 };
1570 
1571 template<typename Functor, typename... ParentResults>
1572 struct functor_result<transwarp::wait_any_type, Functor, ParentResults...> {
1573  static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent");
1574  using type = decltype(std::declval<Functor>()());
1575 };
1576 
1577 template<typename Functor, typename ParentResultType>
1578 struct functor_result<transwarp::wait_any_type, Functor, std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1579  using type = decltype(std::declval<Functor>()());
1580 };
1581 
1582 
1583 template<bool is_transwarp_functor>
1584 struct assign_task_if_impl;
1585 
1586 template<>
1587 struct assign_task_if_impl<false> {
1588  template<typename Functor>
1589  void operator()(Functor&, transwarp::itask&) const noexcept {}
1590 };
1591 
1592 template<>
1593 struct assign_task_if_impl<true> {
1594  template<typename Functor>
1595  void operator()(Functor& functor, transwarp::itask& task) const noexcept {
1596  functor.transwarp_task_ = &task;
1597  }
1598 };
1599 
1600 /// Assigns the task to the given functor if the functor is a subclass of transwarp::functor
1601 template<typename Functor>
1602 void assign_task_if(Functor& functor, transwarp::itask& task) noexcept {
1604 }
1605 
1606 
1607 /// Returns a ready future with the given value as its state
1608 template<typename ResultType, typename Value>
1609 std::shared_future<ResultType> make_future_with_value(Value&& value) {
1610  std::promise<ResultType> promise;
1611  promise.set_value(std::forward<Value>(value));
1612  return promise.get_future();
1613 }
1614 
1615 /// Returns a ready future
1616 inline
1617 std::shared_future<void> make_ready_future() {
1618  std::promise<void> promise;
1619  promise.set_value();
1620  return promise.get_future();
1621 }
1622 
1623 /// Returns a ready future with the given exception as its state
1624 template<typename ResultType>
1625 std::shared_future<ResultType> make_future_with_exception(std::exception_ptr exception) {
1626  if (!exception) {
1627  throw transwarp::invalid_parameter{"exception pointer"};
1628  }
1629  std::promise<ResultType> promise;
1630  promise.set_exception(exception);
1631  return promise.get_future();
1632 }
1633 
1634 
1636  explicit clone_task_functor(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) noexcept
1637  : task_cache_(task_cache) {}
1638 
1639  template<typename T>
1640  void operator()(T& t) {
1641  t = transwarp::detail::clone_task(task_cache_, t);
1642  }
1643 
1644  std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache_;
1645 };
1646 
1647 
1649  explicit push_task_functor(std::vector<transwarp::itask*>& tasks) noexcept
1650  : tasks_(tasks) {}
1651 
1652  template<typename T>
1653  void operator()(T& t) {
1654  tasks_.push_back(t.get());
1655  }
1656 
1657  std::vector<transwarp::itask*>& tasks_;
1658 };
1659 
1660 
1661 /// Determines the type of the parents
1662 template<typename... ParentResults>
1663 struct parents {
1664  using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1665  static std::size_t size(const type&) {
1666  return std::tuple_size<type>::value;
1667  }
1668  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1669  type cloned = obj;
1670  transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1671  return cloned;
1672  }
1673  static std::vector<transwarp::itask*> tasks(const type& parents) {
1674  std::vector<transwarp::itask*> tasks;
1675  transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1676  return tasks;
1677  }
1678 };
1679 
1680 /// Determines the type of the parents. Specialization for vector parents
1681 template<typename ParentResultType>
1682 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1683  using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1684  static std::size_t size(const type& obj) {
1685  return obj.size();
1686  }
1687  static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache, const type& obj) {
1688  type cloned = obj;
1689  transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned);
1690  return cloned;
1691  }
1692  static std::vector<transwarp::itask*> tasks(const type& parents) {
1693  std::vector<transwarp::itask*> tasks;
1694  transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents);
1695  return tasks;
1696  }
1697 };
1698 
1699 
1700 template<typename ResultType, typename TaskType>
1702 protected:
1703 
1704  template<typename Task, typename Parents>
1705  void call(std::size_t task_id,
1706  const std::weak_ptr<Task>& task,
1707  const Parents& parents) {
1708  promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id, task, parents));
1709  }
1710 
1711  std::promise<ResultType> promise_;
1712 };
1713 
1714 template<typename TaskType>
1715 class base_runner<void, TaskType> {
1716 protected:
1717 
1718  template<typename Task, typename Parents>
1719  void call(std::size_t task_id,
1720  const std::weak_ptr<Task>& task,
1721  const Parents& parents) {
1722  transwarp::detail::call<TaskType, void>(task_id, task, parents);
1723  promise_.set_value();
1724  }
1725 
1726  std::promise<void> promise_;
1727 };
1728 
1729 /// A callable to run a task given its parents
1730 template<typename ResultType, typename TaskType, typename Task, typename Parents>
1731 class runner : public transwarp::detail::base_runner<ResultType, TaskType> {
1732 public:
1733 
1734  runner(std::size_t task_id,
1735  const std::weak_ptr<Task>& task,
1736  const typename transwarp::decay<Parents>::type& parents)
1737  : task_id_(task_id),
1738  task_(task),
1739  parents_(parents)
1740  {}
1741 
1742  std::future<ResultType> future() {
1743  return this->promise_.get_future();
1744  }
1745 
1746  void operator()() {
1747  if (const std::shared_ptr<Task> t = task_.lock()) {
1748  t->raise_event(transwarp::event_type::before_started);
1749  }
1750  try {
1751  this->call(task_id_, task_, parents_);
1752  } catch (const transwarp::task_canceled&) {
1753  this->promise_.set_exception(std::current_exception());
1754  if (const std::shared_ptr<Task> t = task_.lock()) {
1755  t->raise_event(transwarp::event_type::after_canceled);
1756  }
1757  } catch (...) {
1758  this->promise_.set_exception(std::current_exception());
1759  }
1760  if (const std::shared_ptr<Task> t = task_.lock()) {
1761  t->raise_event(transwarp::event_type::after_finished);
1762  }
1763  }
1764 
1765 private:
1766  const std::size_t task_id_;
1767  const std::weak_ptr<Task> task_;
1768  const typename transwarp::decay<Parents>::type parents_;
1769 };
1770 
1771 
1772 /// A simple circular buffer (FIFO).
1773 /// ValueType must support default construction. The buffer lets you push
1774 /// new values onto the back and pop old values off the front.
1775 template<typename ValueType>
1777 public:
1778 
1779  static_assert(std::is_default_constructible<ValueType>::value, "ValueType must be default constructible");
1780 
1781  using value_type = ValueType;
1782 
1783  /// Constructs a circular buffer with a given fixed capacity
1784  explicit
1786  : data_(capacity)
1787  {
1788  if (capacity < 1) {
1789  throw transwarp::invalid_parameter{"capacity"};
1790  }
1791  }
1792 
1793  // delete copy/move semantics
1794  circular_buffer(const circular_buffer&) = delete;
1795  circular_buffer& operator=(const circular_buffer&) = delete;
1796  circular_buffer(circular_buffer&& other) = delete;
1797  circular_buffer& operator=(circular_buffer&&) = delete;
1798 
1799  /// Pushes a new value onto the end of the buffer. If that exceeds the capacity
1800  /// of the buffer then the oldest value gets dropped (the one at the front).
1801  template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1802  void push(T&& value) {
1803  data_[end_] = std::forward<T>(value);
1804  increment();
1805  }
1806 
1807  /// Returns the value at the front of the buffer (the oldest value).
1808  /// This is undefined if the buffer is empty
1809  const value_type& front() const {
1810  return data_[front_];
1811  }
1812 
1813  /// Removes the value at the front of the buffer (the oldest value)
1814  void pop() {
1815  if (!empty()) {
1816  data_[front_] = ValueType{};
1817  decrement();
1818  }
1819  }
1820 
1821  /// Returns the capacity of the buffer
1822  std::size_t capacity() const {
1823  return data_.size();
1824  }
1825 
1826  /// Returns the number of populated values of the buffer. Its maximum value
1827  /// equals the capacity of the buffer
1828  std::size_t size() const {
1829  return size_;
1830  }
1831 
1832  /// Returns whether the buffer is empty
1833  bool empty() const {
1834  return size_ == 0;
1835  }
1836 
1837  /// Returns whether the buffer is full
1838  bool full() const {
1839  return size_ == data_.size();
1840  }
1841 
1842  /// Swaps this buffer with the given buffer
1843  void swap(circular_buffer& buffer) {
1844  std::swap(end_, buffer.end_);
1845  std::swap(front_, buffer.front_);
1846  std::swap(size_, buffer.size_);
1847  std::swap(data_, buffer.data_);
1848  }
1849 
1850 private:
1851 
1852  void increment_or_wrap(std::size_t& value) const {
1853  if (value == data_.size() - 1) {
1854  value = 0;
1855  } else {
1856  ++value;
1857  }
1858  }
1859 
1860  void increment() {
1861  increment_or_wrap(end_);
1862  if (full()) {
1863  increment_or_wrap(front_);
1864  } else {
1865  ++size_;
1866  }
1867  }
1868 
1869  void decrement() {
1870  increment_or_wrap(front_);
1871  --size_;
1872  }
1873 
1874  std::size_t end_{};
1875  std::size_t front_{};
1876  std::size_t size_{};
1877  std::vector<value_type> data_;
1878 };
1879 
1880 
1881 class spinlock {
1882 public:
1883 
1884  void lock() noexcept {
1885  while (locked_.test_and_set(std::memory_order_acquire));
1886  }
1887 
1888  void unlock() noexcept {
1889  locked_.clear(std::memory_order_release);
1890  }
1891 
1892 private:
1893  std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1894 };
1895 
1896 
1897 } // detail
1898 
1899 
1900 /// A functor not doing nothing
1902  void operator()() const noexcept {}
1903 };
1904 
1905 /// An object to use in places where a no-op functor is required
1906 constexpr no_op_functor no_op{};
1907 
1908 
1909 /// Executor for sequential execution. Runs functors sequentially on the same thread
1911 public:
1912 
1913  sequential() = default;
1914 
1915  // delete copy/move semantics
1916  sequential(const sequential&) = delete;
1917  sequential& operator=(const sequential&) = delete;
1918  sequential(sequential&&) = delete;
1919  sequential& operator=(sequential&&) = delete;
1920 
1921  /// Returns the name of the executor
1922  std::string name() const override {
1923  return "transwarp::sequential";
1924  }
1925 
1926  /// Runs the functor on the current thread
1927  void execute(const std::function<void()>& functor, transwarp::itask&) override {
1928  functor();
1929  }
1930 };
1931 
1932 
1933 /// Executor for parallel execution. Uses a simple thread pool
1935 public:
1936 
1937  explicit parallel(const std::size_t n_threads,
1938  std::function<void(std::size_t thread_index)> on_thread_started = nullptr)
1939  : pool_{n_threads, std::move(on_thread_started)}
1940  {}
1941 
1942  // delete copy/move semantics
1943  parallel(const parallel&) = delete;
1944  parallel& operator=(const parallel&) = delete;
1945  parallel(parallel&&) = delete;
1946  parallel& operator=(parallel&&) = delete;
1947 
1948  /// Returns the name of the executor
1949  std::string name() const override {
1950  return "transwarp::parallel";
1951  }
1952 
1953  /// Pushes the functor into the thread pool for asynchronous execution
1954  void execute(const std::function<void()>& functor, transwarp::itask&) override {
1955  pool_.push(functor);
1956  }
1957 
1958 private:
1960 };
1961 
1962 
1963 /// Detail namespace for internal functionality only
1964 namespace detail {
1965 
1966 const transwarp::option_str nullopt_string;
1967 const transwarp::any_data any_empty;
1968 
1969 
1970 template<typename ResultType, bool is_void>
1972 
1973 template<typename ResultType>
1974 struct make_future_functor<ResultType, true> {
1975  template<typename Future, typename OtherFuture>
1976  void operator()(Future& future, const OtherFuture& other) const {
1977  other.get();
1979  }
1980 };
1981 
1982 template<typename ResultType>
1983 struct make_future_functor<ResultType, false> {
1984  template<typename Future, typename OtherFuture>
1985  void operator()(Future& future, const OtherFuture& other) const {
1986  future = transwarp::detail::make_future_with_value<ResultType>(other.get());
1987  }
1988 };
1989 
1990 
1991 /// Common task functionality shared across `task_impl` and `value_task`
1992 template<typename ResultType>
1993 class task_common : public transwarp::task<ResultType> {
1994 public:
1995  /// The result type of this task
1996  using result_type = ResultType;
1997 
1998  /// The task's id
1999  std::size_t id() const noexcept override {
2000  return id_;
2001  }
2002 
2003  /// The optional task name
2004  const transwarp::option_str& name() const noexcept override {
2005 #ifndef TRANSWARP_DISABLE_TASK_NAME
2006  return name_;
2007 #else
2008  return transwarp::detail::nullopt_string;
2009 #endif
2010  }
2011 
2012  /// The task priority (defaults to 0)
2013  std::int64_t priority() const noexcept override {
2014 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2015  return priority_;
2016 #else
2017  return 0;
2018 #endif
2019  }
2020 
2021  /// The custom task data (may not hold a value)
2022  const transwarp::any_data& custom_data() const noexcept override {
2023 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2024  return custom_data_;
2025 #else
2026  return transwarp::detail::any_empty;
2027 #endif
2028  }
2029 
2030  /// Sets a task priority (defaults to 0). transwarp will not directly use this.
2031  /// This is only useful if something else is using the priority (e.g. a custom executor)
2032  void set_priority(std::int64_t priority) override {
2033 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2035  priority_ = priority;
2036 #else
2037  (void)priority;
2038 #endif
2039  }
2040 
2041  /// Resets the task priority to 0
2042  void reset_priority() override {
2043 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2045  priority_ = 0;
2046 #endif
2047  }
2048 
2049  /// Assigns custom data to this task. transwarp will not directly use this.
2050  /// This is only useful if something else is using this custom data (e.g. a custom executor)
2051  void set_custom_data(transwarp::any_data custom_data) override {
2052 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2054  if (!custom_data.has_value()) {
2055  throw transwarp::invalid_parameter{"custom data"};
2056  }
2057  custom_data_ = std::move(custom_data);
2059 #else
2060  (void)custom_data;
2061 #endif
2062  }
2063 
2064  /// Removes custom data from this task
2065  void remove_custom_data() override {
2066 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2068  custom_data_ = {};
2070 #endif
2071  }
2072 
2073  /// Returns the future associated to the underlying execution
2074  std::shared_future<result_type> future() const noexcept override {
2075  return future_;
2076  }
2077 
2078  /// Adds a new listener for all event types
2079  void add_listener(std::shared_ptr<transwarp::listener> listener) override {
2082  ensure_listeners_object();
2083  for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2084  (*listeners_)[static_cast<transwarp::event_type>(i)].push_back(listener);
2085  }
2086  }
2087 
2088  /// Adds a new listener for the given event type only
2089  void add_listener(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2092  ensure_listeners_object();
2093  (*listeners_)[event].push_back(std::move(listener));
2094  }
2095 
2096  /// Removes the listener for all event types
2097  void remove_listener(const std::shared_ptr<transwarp::listener>& listener) override {
2100  if (!listeners_) {
2101  return;
2102  }
2103  for (int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2104  auto listeners_pair = listeners_->find(static_cast<transwarp::event_type>(i));
2105  if (listeners_pair != listeners_->end()) {
2106  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2107  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2108  }
2109  }
2110  }
2111 
2112  /// Removes the listener for the given event type only
2113  void remove_listener(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2116  if (!listeners_) {
2117  return;
2118  }
2119  auto listeners_pair = listeners_->find(event);
2120  if (listeners_pair != listeners_->end()) {
2121  std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2122  l.erase(std::remove(l.begin(), l.end(), listener), l.end());
2123  }
2124  }
2125 
2126  /// Removes all listeners
2127  void remove_listeners() override {
2129  if (!listeners_) {
2130  return;
2131  }
2132  listeners_->clear();
2133  }
2134 
2135  /// Removes all listeners for the given event type
2138  if (!listeners_) {
2139  return;
2140  }
2141  auto listeners_pair = listeners_->find(event);
2142  if (listeners_pair != listeners_->end()) {
2143  listeners_pair->second.clear();
2144  }
2145  }
2146 
2147 protected:
2148 
2149  using listeners_t = std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>>;
2150  using tasks_t = std::vector<transwarp::itask*>;
2151 
2152  /// Checks if the task is currently running and throws transwarp::control_error if it is
2154  if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2155  throw transwarp::control_error{"task currently running: " + transwarp::to_string(*this, " ")};
2156  }
2157  }
2158 
2159  /// Raises the given event to all listeners
2161  if (!listeners_) {
2162  return;
2163  }
2164  auto listeners_pair = listeners_->find(event);
2165  if (listeners_pair != listeners_->end()) {
2166  for (const std::shared_ptr<transwarp::listener>& listener : listeners_pair->second) {
2167  listener->handle_event(event, *this);
2168  }
2169  }
2170  }
2171 
2172  /// Check for non-null listener pointer
2173  void check_listener(const std::shared_ptr<transwarp::listener>& listener) const {
2174  if (!listener) {
2175  throw transwarp::invalid_parameter{"listener pointer"};
2176  }
2177  }
2178 
2179  void ensure_listeners_object() {
2180  if (!listeners_) {
2181  listeners_.reset(new listeners_t);
2182  }
2183  }
2184 
2185  /// Assigns the given id
2186  void set_id(std::size_t id) noexcept override {
2187  id_ = id;
2188  }
2189 
2190  /// Assigns the given name
2191  void set_name(transwarp::option_str name) noexcept override {
2192 #ifndef TRANSWARP_DISABLE_TASK_NAME
2193  name_ = std::move(name);
2194 #else
2195  (void)name;
2196 #endif
2197  }
2198 
2199  void copy_from(const task_common& task) {
2200  id_ = task.id_;
2201 #ifndef TRANSWARP_DISABLE_TASK_NAME
2202  name_ = task.name_;
2203 #endif
2204 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2205  priority_ = task.priority_;
2206 #endif
2207 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2208  custom_data_ = task.custom_data_;
2209 #endif
2210  if (task.has_result()) {
2211  try {
2213  } catch (...) {
2214  future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2215  }
2216  }
2217  visited_ = task.visited_;
2218  if (task.listeners_) {
2219  listeners_.reset(new listeners_t(*task.listeners_));
2220  }
2221  }
2222 
2223  std::size_t id_ = 0;
2224 #ifndef TRANSWARP_DISABLE_TASK_NAME
2225  transwarp::option_str name_;
2226 #endif
2227 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2228  std::int64_t priority_ = 0;
2229 #endif
2230 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2231  transwarp::any_data custom_data_;
2232 #endif
2233  std::shared_future<result_type> future_;
2234  bool visited_ = false;
2235  std::unique_ptr<listeners_t> listeners_;
2236  std::unique_ptr<tasks_t> tasks_;
2237 };
2238 
2239 
2240 /// The base task class that contains the functionality that can be used
2241 /// with all result types (void and non-void).
2242 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2244 public:
2245  /// The task type
2246  using task_type = TaskType;
2247 
2248  /// The result type of this task
2249  using result_type = ResultType;
2250 
2251  /// Can be called to explicitly finalize this task making this task
2252  /// the terminal task in the graph. This is also done implicitly when
2253  /// calling, e.g., any of the *_all methods. It should normally not be
2254  /// necessary to call this method directly
2255  void finalize() override {
2256  if (!this->tasks_) {
2257  this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t);
2258  visit(transwarp::detail::final_visitor{*this->tasks_});
2259  unvisit();
2260  auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) {
2261  const std::size_t l_level = l->level();
2262  const std::size_t l_id = l->id();
2263  const std::size_t r_level = r->level();
2264  const std::size_t r_id = r->id();
2265  return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2266  };
2267  std::sort(this->tasks_->begin(), this->tasks_->end(), compare);
2268  }
2269  }
2270 
2271  /// The task's level
2272  std::size_t level() const noexcept override {
2273  return level_;
2274  }
2275 
2276  /// The task's type
2277  transwarp::task_type type() const noexcept override {
2278  return type_;
2279  }
2280 
2281  /// The task's executor (may be null)
2282  std::shared_ptr<transwarp::executor> executor() const noexcept override {
2283  return executor_;
2284  }
2285 
2286  /// Returns whether the associated task is canceled
2287  bool canceled() const noexcept override {
2288  return canceled_.load();
2289  }
2290 
2291  /// Returns the average idletime in microseconds (-1 if never set)
2292  std::int64_t avg_idletime_us() const noexcept override {
2293 #ifndef TRANSWARP_DISABLE_TASK_TIME
2294  return avg_idletime_us_.load();
2295 #else
2296  return -1;
2297 #endif
2298  }
2299 
2300  /// Returns the average waittime in microseconds (-1 if never set)
2301  std::int64_t avg_waittime_us() const noexcept override {
2302 #ifndef TRANSWARP_DISABLE_TASK_TIME
2303  return avg_waittime_us_.load();
2304 #else
2305  return -1;
2306 #endif
2307  }
2308 
2309  /// Returns the average runtime in microseconds (-1 if never set)
2310  std::int64_t avg_runtime_us() const noexcept override {
2311 #ifndef TRANSWARP_DISABLE_TASK_TIME
2312  return avg_runtime_us_.load();
2313 #else
2314  return -1;
2315 #endif
2316  }
2317 
2318  /// Assigns an executor to this task which takes precedence over
2319  /// the executor provided in schedule() or schedule_all()
2320  void set_executor(std::shared_ptr<transwarp::executor> executor) override {
2321  this->ensure_task_not_running();
2322  if (!executor) {
2323  throw transwarp::invalid_parameter{"executor pointer"};
2324  }
2325  executor_ = std::move(executor);
2326  }
2327 
2328  /// Assigns an executor to all tasks which takes precedence over
2329  /// the executor provided in schedule() or schedule_all()
2330  void set_executor_all(std::shared_ptr<transwarp::executor> executor) override {
2331  this->ensure_task_not_running();
2333  visit_all(visitor);
2334  }
2335 
2336  /// Removes the executor from this task
2337  void remove_executor() override {
2338  this->ensure_task_not_running();
2339  executor_.reset();
2340  }
2341 
2342  /// Removes the executor from all tasks
2343  void remove_executor_all() override {
2344  this->ensure_task_not_running();
2346  visit_all(visitor);
2347  }
2348 
2349  /// Schedules this task for execution on the caller thread.
2350  /// The task-specific executor gets precedence if it exists.
2351  /// This overload will reset the underlying future.
2352  void schedule() override {
2353  this->ensure_task_not_running();
2354  this->schedule_impl(true);
2355  }
2356 
2357  /// Schedules this task for execution on the caller thread.
2358  /// The task-specific executor gets precedence if it exists.
2359  /// reset denotes whether schedule should reset the underlying
2360  /// future and schedule even if the future is already valid.
2361  void schedule(bool reset) override {
2362  this->ensure_task_not_running();
2363  this->schedule_impl(reset);
2364  }
2365 
2366  /// Schedules this task for execution using the provided executor.
2367  /// The task-specific executor gets precedence if it exists.
2368  /// This overload will reset the underlying future.
2370  this->ensure_task_not_running();
2371  this->schedule_impl(true, &executor);
2372  }
2373 
2374  /// Schedules this task for execution using the provided executor.
2375  /// The task-specific executor gets precedence if it exists.
2376  /// reset denotes whether schedule should reset the underlying
2377  /// future and schedule even if the future is already valid.
2378  void schedule(transwarp::executor& executor, bool reset) override {
2379  this->ensure_task_not_running();
2380  this->schedule_impl(reset, &executor);
2381  }
2382 
2383  /// Schedules all tasks in the graph for execution on the caller thread.
2384  /// The task-specific executors get precedence if they exist.
2385  /// This overload will reset the underlying futures.
2386  void schedule_all() override {
2387  this->ensure_task_not_running();
2388  schedule_all_impl(true);
2389  }
2390 
2391  /// Schedules all tasks in the graph for execution using the provided executor.
2392  /// The task-specific executors get precedence if they exist.
2393  /// This overload will reset the underlying futures.
2395  this->ensure_task_not_running();
2396  schedule_all_impl(true, &executor);
2397  }
2398 
2399  /// Schedules all tasks in the graph for execution on the caller thread.
2400  /// The task-specific executors get precedence if they exist.
2401  /// reset_all denotes whether schedule_all should reset the underlying
2402  /// futures and schedule even if the futures are already present.
2403  void schedule_all(bool reset_all) override {
2404  this->ensure_task_not_running();
2405  schedule_all_impl(reset_all);
2406  }
2407 
2408  /// Schedules all tasks in the graph for execution using the provided executor.
2409  /// The task-specific executors get precedence if they exist.
2410  /// reset_all denotes whether schedule_all should reset the underlying
2411  /// futures and schedule even if the futures are already present.
2413  this->ensure_task_not_running();
2414  schedule_all_impl(reset_all, &executor);
2415  }
2416 
2417  /// Assigns an exception to this task. Scheduling will have no effect after an exception
2418  /// has been set. Calling reset() will remove the exception and re-enable scheduling.
2419  void set_exception(std::exception_ptr exception) override {
2420  this->ensure_task_not_running();
2421  this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2422  schedule_mode_ = false;
2424  }
2425 
2426  /// Returns whether the task was scheduled and not reset afterwards.
2427  /// This means that the underlying future is valid
2428  bool was_scheduled() const noexcept override {
2429  return this->future_.valid();
2430  }
2431 
2432  /// Waits for the task to complete. Should only be called if was_scheduled()
2433  /// is true, throws transwarp::control_error otherwise
2434  void wait() const override {
2436  this->future_.wait();
2437  }
2438 
2439  /// Returns whether the task has finished processing. Should only be called
2440  /// if was_scheduled() is true, throws transwarp::control_error otherwise
2441  bool is_ready() const override {
2443  return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2444  }
2445 
2446  /// Returns whether this task contains a result
2447  bool has_result() const noexcept override {
2448  return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2449  }
2450 
2451  /// Resets this task
2452  void reset() override {
2453  this->ensure_task_not_running();
2454  this->future_ = std::shared_future<result_type>{};
2455  cancel(false);
2456  schedule_mode_ = true;
2457 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2458  refcount_ = childcount_;
2459 #endif
2461  }
2462 
2463  /// Resets all tasks in the graph
2464  void reset_all() override {
2465  this->ensure_task_not_running();
2467  visit_all(visitor);
2468  }
2469 
2470  /// If enabled then this task is canceled which will
2471  /// throw transwarp::task_canceled when retrieving the task result.
2472  /// Passing false is equivalent to resume.
2473  void cancel(bool enabled) noexcept override {
2474  canceled_ = enabled;
2475  }
2476 
2477  /// If enabled then all pending tasks in the graph are canceled which will
2478  /// throw transwarp::task_canceled when retrieving the task result.
2479  /// Passing false is equivalent to resume.
2480  void cancel_all(bool enabled) noexcept override {
2481  transwarp::detail::cancel_visitor visitor{enabled};
2482  visit_all(visitor);
2483  }
2484 
2485  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
2486  /// This is only useful if something else is using the priority (e.g. a custom executor)
2487  void set_priority_all(std::int64_t priority) override {
2488 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2489  this->ensure_task_not_running();
2491  visit_all(visitor);
2492 #else
2493  (void)priority;
2494 #endif
2495  }
2496 
2497  /// Resets the priority of all tasks to 0
2498  void reset_priority_all() override {
2499 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2500  this->ensure_task_not_running();
2502  visit_all(visitor);
2503 #endif
2504  }
2505 
2506  /// Assigns custom data to all tasks. transwarp will not directly use this.
2507  /// This is only useful if something else is using this custom data (e.g. a custom executor)
2508  void set_custom_data_all(transwarp::any_data custom_data) override {
2509 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2510  this->ensure_task_not_running();
2512  visit_all(visitor);
2513 #else
2514  (void)custom_data;
2515 #endif
2516  }
2517 
2518  /// Removes custom data from all tasks
2519  void remove_custom_data_all() override {
2520 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2521  this->ensure_task_not_running();
2523  visit_all(visitor);
2524 #endif
2525  }
2526 
2527  /// Adds a new listener for all event types and for all parents
2528  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
2529  this->ensure_task_not_running();
2531  visit_all(visitor);
2532  }
2533 
2534  /// Adds a new listener for the given event type only and for all parents
2535  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
2536  this->ensure_task_not_running();
2538  visit_all(visitor);
2539  }
2540 
2541  /// Removes the listener for all event types and for all parents
2542  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
2543  this->ensure_task_not_running();
2545  visit_all(visitor);
2546  }
2547 
2548  /// Removes the listener for the given event type only and for all parents
2549  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
2550  this->ensure_task_not_running();
2552  visit_all(visitor);
2553  }
2554 
2555  /// Removes all listeners and for all parents
2556  void remove_listeners_all() override {
2557  this->ensure_task_not_running();
2559  visit_all(visitor);
2560  }
2561 
2562  /// Removes all listeners for the given event type and for all parents
2564  this->ensure_task_not_running();
2566  visit_all(visitor);
2567  }
2568 
2569  /// Returns the task's parents (may be empty)
2570  std::vector<transwarp::itask*> parents() const override {
2572  }
2573 
2574  /// Returns all tasks in the graph in breadth order
2575  const std::vector<transwarp::itask*>& tasks() override {
2576  finalize();
2577  return *this->tasks_;
2578  }
2579 
2580  /// Returns all edges in the graph. This is mainly for visualizing
2581  /// the tasks and their interdependencies. Pass the result into transwarp::to_string
2582  /// to retrieve a dot-style graph representation for easy viewing.
2583  std::vector<transwarp::edge> edges() override {
2584  std::vector<transwarp::edge> edges;
2586  visit_all(visitor);
2587  return edges;
2588  }
2589 
2590 protected:
2591 
2592  task_impl_base() {}
2593 
2594  template<typename F>
2595  task_impl_base(F&& functor, std::shared_ptr<transwarp::task<ParentResults>>... parents)
2596  : functor_(new Functor(std::forward<F>(functor))),
2597  parents_(std::move(parents)...)
2598  {
2599  init();
2600  }
2601 
2602  template<typename F, typename P>
2603  task_impl_base(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2604  : functor_(new Functor(std::forward<F>(functor))),
2605  parents_(std::move(parents))
2606  {
2607  init();
2608  if (parents_.empty()) {
2610  }
2611  }
2612 
2613  void init() {
2614  set_type(task_type::value);
2615  transwarp::detail::assign_task_if(*functor_, *this);
2617  }
2618 
2619  template<typename R, typename Y, typename T, typename P>
2620  friend class transwarp::detail::runner;
2621 
2622  template<typename R, typename T, typename... A>
2623  friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr<T>&, A&&...);
2624 
2625  /// Assigns the given level
2626  void set_level(std::size_t level) noexcept override {
2627  level_ = level;
2628  }
2629 
2630  /// Assigns the given type
2631  void set_type(transwarp::task_type type) noexcept override {
2632  type_ = type;
2633  }
2634 
2635  /// Assigns the given idletime
2636  void set_avg_idletime_us(std::int64_t idletime) noexcept override {
2637 #ifndef TRANSWARP_DISABLE_TASK_TIME
2638  avg_idletime_us_ = idletime;
2639 #else
2640  (void)idletime;
2641 #endif
2642  }
2643 
2644  /// Assigns the given waittime
2645  void set_avg_waittime_us(std::int64_t waittime) noexcept override {
2646 #ifndef TRANSWARP_DISABLE_TASK_TIME
2647  avg_waittime_us_ = waittime;
2648 #else
2649  (void)waittime;
2650 #endif
2651  }
2652 
2653  /// Assigns the given runtime
2654  void set_avg_runtime_us(std::int64_t runtime) noexcept override {
2655 #ifndef TRANSWARP_DISABLE_TASK_TIME
2656  avg_runtime_us_ = runtime;
2657 #else
2658  (void)runtime;
2659 #endif
2660  }
2661 
2662  /// Checks if the task was scheduled and throws transwarp::control_error if it's not
2664  if (!this->future_.valid()) {
2665  throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")};
2666  }
2667  }
2668 
2669  /// Schedules this task for execution using the provided executor.
2670  /// The task-specific executor gets precedence if it exists.
2671  /// Runs the task on the same thread as the caller if neither the global
2672  /// nor the task-specific executor is found.
2673  void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override {
2674  if (schedule_mode_ && (reset || !this->future_.valid())) {
2675  if (reset) {
2676  cancel(false);
2677  }
2678 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2679  refcount_ = childcount_;
2680 #endif
2681  std::weak_ptr<task_impl_base> self = std::static_pointer_cast<task_impl_base>(this->shared_from_this());
2682  using runner_t = transwarp::detail::runner<result_type, task_type, task_impl_base, decltype(parents_)>;
2683  std::shared_ptr<runner_t> runner = std::shared_ptr<runner_t>(new runner_t(this->id(), self, parents_));
2685  this->future_ = runner->future();
2687  if (this->executor_) {
2688  this->executor_->execute([runner]{ (*runner)(); }, *this);
2689  } else if (executor) {
2690  executor->execute([runner]{ (*runner)(); }, *this);
2691  } else {
2692  (*runner)();
2693  }
2694  }
2695  }
2696 
2697  /// Schedules all tasks in the graph for execution using the provided executor.
2698  /// The task-specific executors get precedence if they exist.
2699  /// Runs tasks on the same thread as the caller if neither the global
2700  /// nor a task-specific executor is found.
2703  visit_all(visitor);
2704  }
2705 
2706  /// Visits each task in a depth-first traversal
2707  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
2708  if (!this->visited_) {
2710  visitor(*this);
2711  this->visited_ = true;
2712  }
2713  }
2714 
2715  /// Traverses through each task and marks them as not visited.
2716  void unvisit() noexcept override {
2717  if (this->visited_) {
2718  this->visited_ = false;
2720  }
2721  }
2722 
2723  /// Visits all tasks
2724  template<typename Visitor>
2725  void visit_all(Visitor& visitor) {
2726  finalize();
2727  for (transwarp::itask* t : *this->tasks_) {
2728  visitor(*t);
2729  }
2730  }
2731 
2732  void increment_childcount() noexcept override {
2733 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2734  ++childcount_;
2735 #endif
2736  }
2737 
2738  void decrement_refcount() override {
2739 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2740  if (--refcount_ == 0) {
2742  }
2743 #endif
2744  }
2745 
2746  void reset_future() override {
2747  this->future_ = std::shared_future<result_type>{};
2749  }
2750 
2751  std::size_t level_ = 0;
2753  std::shared_ptr<transwarp::executor> executor_;
2754  std::atomic<bool> canceled_{false};
2755  bool schedule_mode_ = true;
2756 #ifndef TRANSWARP_DISABLE_TASK_TIME
2757  std::atomic<std::int64_t> avg_idletime_us_{-1};
2758  std::atomic<std::int64_t> avg_waittime_us_{-1};
2759  std::atomic<std::int64_t> avg_runtime_us_{-1};
2760 #endif
2761  std::unique_ptr<Functor> functor_;
2762  typename transwarp::detail::parents<ParentResults...>::type parents_;
2763 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2764  std::size_t childcount_ = 0;
2765  std::atomic<std::size_t> refcount_{0};
2766 #endif
2767 };
2768 
2769 
2770 /// A task proxy
2771 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2772 class task_impl_proxy : public transwarp::detail::task_impl_base<ResultType, TaskType, Functor, ParentResults...> {
2773 public:
2774  /// The task type
2775  using task_type = TaskType;
2776 
2777  /// The result type of this task
2778  using result_type = ResultType;
2779 
2780  /// Assigns a value to this task. Scheduling will have no effect after a value
2781  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2782  void set_value(const typename transwarp::decay<result_type>::type& value) override {
2783  this->ensure_task_not_running();
2784  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2785  this->schedule_mode_ = false;
2787  }
2788 
2789  /// Assigns a value to this task. Scheduling will have no effect after a value
2790  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2791  void set_value(typename transwarp::decay<result_type>::type&& value) override {
2792  this->ensure_task_not_running();
2793  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2794  this->schedule_mode_ = false;
2796  }
2797 
2798  /// Returns the result of this task. Throws any exceptions that the underlying
2799  /// functor throws. Should only be called if was_scheduled() is true,
2800  /// throws transwarp::control_error otherwise
2801  typename transwarp::result<result_type>::type get() const override {
2802  this->ensure_task_was_scheduled();
2803  return this->future_.get();
2804  }
2805 
2806 protected:
2807 
2808  task_impl_proxy() = default;
2809 
2810  template<typename F>
2812  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2813  {}
2814 
2815  template<typename F, typename P>
2816  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2817  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2818  {}
2819 
2820 };
2821 
2822 /// A task proxy for reference result type.
2823 template<typename ResultType, typename TaskType, typename Functor, typename... ParentResults>
2824 class task_impl_proxy<ResultType&, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<ResultType&, TaskType, Functor, ParentResults...> {
2825 public:
2826  /// The task type
2827  using task_type = TaskType;
2828 
2829  /// The result type of this task
2830  using result_type = ResultType&;
2831 
2832  /// Assigns a value to this task. Scheduling will have no effect after a value
2833  /// has been set. Calling reset() will remove the value and re-enable scheduling.
2834  void set_value(typename transwarp::decay<result_type>::type& value) override {
2835  this->ensure_task_not_running();
2836  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2837  this->schedule_mode_ = false;
2839  }
2840 
2841  /// Returns the result of this task. Throws any exceptions that the underlying
2842  /// functor throws. Should only be called if was_scheduled() is true,
2843  /// throws transwarp::control_error otherwise
2844  typename transwarp::result<result_type>::type get() const override {
2845  this->ensure_task_was_scheduled();
2846  return this->future_.get();
2847  }
2848 
2849 protected:
2850 
2851  task_impl_proxy() = default;
2852 
2853  template<typename F>
2855  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2856  {}
2857 
2858  template<typename F, typename P>
2859  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2860  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2861  {}
2862 
2863 };
2864 
2865 /// A task proxy for void result type.
2866 template<typename TaskType, typename Functor, typename... ParentResults>
2867 class task_impl_proxy<void, TaskType, Functor, ParentResults...> : public transwarp::detail::task_impl_base<void, TaskType, Functor, ParentResults...> {
2868 public:
2869  /// The task type
2870  using task_type = TaskType;
2871 
2872  /// The result type of this task
2873  using result_type = void;
2874 
2875  /// Assigns a value to this task. Scheduling will have no effect after a call
2876  /// to this. Calling reset() will reset this and re-enable scheduling.
2877  void set_value() override {
2878  this->ensure_task_not_running();
2879  this->future_ = transwarp::detail::make_ready_future();
2880  this->schedule_mode_ = false;
2882  }
2883 
2884  /// Blocks until the task finishes. Throws any exceptions that the underlying
2885  /// functor throws. Should only be called if was_scheduled() is true,
2886  /// throws transwarp::control_error otherwise
2887  void get() const override {
2888  this->ensure_task_was_scheduled();
2889  this->future_.get();
2890  }
2891 
2892 protected:
2893 
2894  task_impl_proxy() = default;
2895 
2896  template<typename F>
2898  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2899  {}
2900 
2901  template<typename F, typename P>
2902  task_impl_proxy(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2903  : transwarp::detail::task_impl_base<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2904  {}
2905 
2906 };
2907 
2908 } // detail
2909 
2910 
2911 /// A task representing a piece of work given by functor and parent tasks.
2912 /// By connecting tasks a directed acyclic graph is built.
2913 /// Tasks should be created using the make_task factory functions.
2914 template<typename TaskType, typename Functor, typename... ParentResults>
2915 class task_impl : public transwarp::detail::task_impl_proxy<typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type, TaskType, Functor, ParentResults...> {
2916 public:
2917  /// The task type
2918  using task_type = TaskType;
2919 
2920  /// The result type of this task
2921  using result_type = typename transwarp::detail::functor_result<TaskType, Functor, ParentResults...>::type;
2922 
2923  /// A task is defined by functor and parent tasks.
2924  /// Note: Don't use this constructor directly, use transwarp::make_task
2925  template<typename F>
2927  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents)...)
2928  {}
2929 
2930  /// A task is defined by functor and parent tasks.
2931  /// Note: Don't use this constructor directly, use transwarp::make_task
2932  template<typename F, typename P>
2933  task_impl(F&& functor, std::vector<std::shared_ptr<transwarp::task<P>>> parents)
2934  : transwarp::detail::task_impl_proxy<result_type, task_type, Functor, ParentResults...>(std::forward<F>(functor), std::move(parents))
2935  {}
2936 
2937  // delete copy/move semantics
2938  task_impl(const task_impl&) = delete;
2939  task_impl& operator=(const task_impl&) = delete;
2940  task_impl(task_impl&&) = delete;
2941  task_impl& operator=(task_impl&&) = delete;
2942 
2943  /// Gives this task a name and returns a ptr to itself
2944  std::shared_ptr<task_impl> named(std::string name) {
2945 #ifndef TRANSWARP_DISABLE_TASK_NAME
2946 #ifdef TRANSWARP_CPP11
2947  this->set_name(transwarp::option_str{std::move(name)});
2948 #else
2949  this->set_name(std::make_optional(std::move(name)));
2950 #endif
2951 #else
2952  (void)name;
2953 #endif
2954  return std::static_pointer_cast<task_impl>(this->shared_from_this());
2955  }
2956 
2957  /// Creates a continuation to this task
2958  template<typename TaskType_, typename Functor_>
2959  auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
2961  return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<task_impl>(this->shared_from_this())));
2962  }
2963 
2964  /// Clones this task and casts the result to a ptr to task_impl
2965  std::shared_ptr<task_impl> clone_cast() const {
2966  return std::static_pointer_cast<task_impl>(this->clone());
2967  }
2968 
2969 private:
2970 
2971  task_impl() = default;
2972 
2973  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) const override {
2974  auto t = std::shared_ptr<task_impl>{new task_impl};
2975  t->copy_from(*this);
2976  t->level_ = this->level_;
2977  t->type_ = this->type_;
2978  t->executor_ = this->executor_;
2979  t->canceled_ = this->canceled_.load();
2980  t->schedule_mode_ = this->schedule_mode_;
2981 #ifndef TRANSWARP_DISABLE_TASK_TIME
2982  t->avg_idletime_us_ = this->avg_idletime_us_.load();
2983  t->avg_waittime_us_ = this->avg_waittime_us_.load();
2984  t->avg_runtime_us_ = this->avg_runtime_us_.load();
2985 #endif
2986  t->functor_.reset(new Functor(*this->functor_));
2987  t->parents_ = transwarp::detail::parents<ParentResults...>::clone(task_cache, this->parents_);
2988  t->executor_ = this->executor_;
2989 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2990  t->childcount_ = this->childcount_;
2991 #endif
2992  return t;
2993  }
2994 
2995 };
2996 
2997 
2998 /// A value task that stores a single value and doesn't require scheduling.
2999 /// Value tasks should be created using the make_value_task factory functions.
3000 template<typename ResultType>
3001 class value_task : public transwarp::detail::task_common<ResultType> {
3002 public:
3003  /// The task type
3005 
3006  /// The result type of this task
3007  using result_type = ResultType;
3008 
3009  /// A value task is defined by a given value.
3010  /// Note: Don't use this constructor directly, use transwarp::make_value_task
3011  template<typename T>
3012  value_task(T&& value)
3013  {
3014  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
3015  this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3016  }
3017 
3018  // delete copy/move semantics
3019  value_task(const value_task&) = delete;
3020  value_task& operator=(const value_task&) = delete;
3021  value_task(value_task&&) = delete;
3022  value_task& operator=(value_task&&) = delete;
3023 
3024  /// Gives this task a name and returns a ptr to itself
3025  std::shared_ptr<value_task> named(std::string name) {
3026 #ifndef TRANSWARP_DISABLE_TASK_NAME
3027 #ifdef TRANSWARP_CPP11
3028  this->set_name(transwarp::option_str{std::move(name)});
3029 #else
3030  this->set_name(std::make_optional(std::move(name)));
3031 #endif
3032 #else
3033  (void)name;
3034 #endif
3035  return std::static_pointer_cast<value_task>(this->shared_from_this());
3036  }
3037 
3038  /// Creates a continuation to this task
3039  template<typename TaskType_, typename Functor_>
3040  auto then(TaskType_, Functor_&& functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
3042  return std::shared_ptr<task_t>(new task_t(std::forward<Functor_>(functor), std::static_pointer_cast<value_task>(this->shared_from_this())));
3043  }
3044 
3045  /// Clones this task and casts the result to a ptr to value_task
3046  std::shared_ptr<value_task> clone_cast() const {
3047  return std::static_pointer_cast<value_task>(this->clone());
3048  }
3049 
3050  /// Nothing to be done to finalize a value task
3051  void finalize() override {}
3052 
3053  /// The task's level
3054  std::size_t level() const noexcept override {
3055  return 0;
3056  }
3057 
3058  /// The task's type
3059  transwarp::task_type type() const noexcept override {
3061  }
3062 
3063  /// Value tasks don't have executors as they don't run
3064  std::shared_ptr<transwarp::executor> executor() const noexcept override {
3065  return nullptr;
3066  }
3067 
3068  /// Value tasks cannot be canceled
3069  bool canceled() const noexcept override {
3070  return false;
3071  }
3072 
3073  /// Returns -1 as value tasks don't run
3074  std::int64_t avg_idletime_us() const noexcept override {
3075  return -1;
3076  }
3077 
3078  /// Returns -1 as value tasks don't run
3079  std::int64_t avg_waittime_us() const noexcept override {
3080  return -1;
3081  }
3082 
3083  /// Returns -1 as value tasks don't run
3084  std::int64_t avg_runtime_us() const noexcept override {
3085  return -1;
3086  }
3087 
3088  /// No-op because a value task never runs
3089  void set_executor(std::shared_ptr<transwarp::executor>) override {}
3090 
3091  /// No-op because a value task never runs and doesn't have parents
3092  void set_executor_all(std::shared_ptr<transwarp::executor>) override {}
3093 
3094  /// No-op because a value task never runs
3095  void remove_executor() override {}
3096 
3097  /// No-op because a value task never runs and doesn't have parents
3098  void remove_executor_all() override {}
3099 
3100  /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this.
3101  /// This is only useful if something else is using the priority
3102  void set_priority_all(std::int64_t priority) override {
3103 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3104  this->set_priority(priority);
3105 #else
3106  (void)priority;
3107 #endif
3108  }
3109 
3110  /// Resets the priority of all tasks to 0
3111  void reset_priority_all() override {
3112 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3113  this->reset_priority();
3114 #endif
3115  }
3116 
3117  /// Assigns custom data to all tasks. transwarp will not directly use this.
3118  /// This is only useful if something else is using this custom data
3119  void set_custom_data_all(transwarp::any_data custom_data) override {
3120 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3121  this->set_custom_data(std::move(custom_data));
3122 #else
3123  (void)custom_data;
3124 #endif
3125  }
3126 
3127  /// Removes custom data from all tasks
3128  void remove_custom_data_all() override {
3129 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3130  this->remove_custom_data();
3131 #endif
3132  }
3133 
3134  /// No-op because a value task never runs
3135  void schedule() override {}
3136 
3137  /// No-op because a value task never runs
3138  void schedule(transwarp::executor&) override {}
3139 
3140  /// No-op because a value task never runs
3141  void schedule(bool) override {}
3142 
3143  /// No-op because a value task never runs
3144  void schedule(transwarp::executor&, bool) override {}
3145 
3146  /// No-op because a value task never runs and doesn't have parents
3147  void schedule_all() override {}
3148 
3149  /// No-op because a value task never runs and doesn't have parents
3151 
3152  /// No-op because a value task never runs and doesn't have parents
3153  void schedule_all(bool) override {}
3154 
3155  /// No-op because a value task never runs and doesn't have parents
3156  void schedule_all(transwarp::executor&, bool) override {}
3157 
3158  /// Assigns a value to this task
3159  void set_value(const typename transwarp::decay<result_type>::type& value) override {
3160  this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
3162  }
3163 
3164  /// Assigns a value to this task
3165  void set_value(typename transwarp::decay<result_type>::type&& value) override {
3166  this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
3168  }
3169 
3170  /// Assigns an exception to this task
3171  void set_exception(std::exception_ptr exception) override {
3172  this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
3174  }
3175 
3176  /// Returns the value of this task. Throws an exception if this task has an exception assigned to it
3177  typename transwarp::result<result_type>::type get() const override {
3178  return this->future_.get();
3179  }
3180 
3181  /// Returns true because a value task is scheduled once on construction
3182  bool was_scheduled() const noexcept override {
3183  return true;
3184  }
3185 
3186  /// No-op because a value task never runs
3187  void wait() const override {}
3188 
3189  /// Returns true because a value task is always ready
3190  bool is_ready() const override {
3191  return true;
3192  }
3193 
3194  /// Returns true because a value task always contains a result
3195  bool has_result() const noexcept override {
3196  return true;
3197  }
3198 
3199  /// No-op because a value task never runs
3200  void reset() override {}
3201 
3202  /// No-op because a value task never runs and doesn't have parents
3203  void reset_all() override {}
3204 
3205  /// No-op because a value task never runs
3206  void cancel(bool) noexcept override {}
3207 
3208  /// No-op because a value task never runs and doesn't have parents
3209  void cancel_all(bool) noexcept override {}
3210 
3211  /// Adds a new listener for all event types and for all parents
3212  void add_listener_all(std::shared_ptr<transwarp::listener> listener) override {
3213  this->add_listener(listener);
3214  }
3215 
3216  /// Adds a new listener for the given event type only and for all parents
3217  void add_listener_all(transwarp::event_type event, std::shared_ptr<transwarp::listener> listener) override {
3218  this->add_listener(event, listener);
3219  }
3220 
3221  /// Removes the listener for all event types and for all parents
3222  void remove_listener_all(const std::shared_ptr<transwarp::listener>& listener) override {
3223  this->remove_listener(listener);
3224  }
3225 
3226  /// Removes the listener for the given event type only and for all parents
3227  void remove_listener_all(transwarp::event_type event, const std::shared_ptr<transwarp::listener>& listener) override {
3228  this->remove_listener(event, listener);
3229  }
3230 
3231  /// Removes all listeners and for all parents
3232  void remove_listeners_all() override {
3233  this->remove_listeners();
3234  }
3235 
3236  /// Removes all listeners for the given event type and for all parents
3238  this->remove_listeners(event);
3239  }
3240 
3241  /// Empty because a value task doesn't have parents
3242  std::vector<transwarp::itask*> parents() const override {
3243  return {};
3244  }
3245 
3246  /// Returns all tasks in the graph in breadth order
3247  const std::vector<transwarp::itask*>& tasks() override {
3248  return *this->tasks_;
3249  }
3250 
3251  /// Returns empty edges because a value task doesn't have parents
3252  std::vector<transwarp::edge> edges() override {
3253  return {};
3254  }
3255 
3256 private:
3257 
3258  value_task()
3259  {
3260  this->tasks_.reset(new typename transwarp::detail::task_common<result_type>::tasks_t{this});
3261  }
3262 
3263  std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&) const override {
3264  auto t = std::shared_ptr<value_task>(new value_task);
3265  t->copy_from(*this);
3266  return t;
3267  }
3268 
3269  /// No-op as value tasks are always at level 0
3270  void set_level(std::size_t) noexcept override {}
3271 
3272  /// No-op as value tasks are always root tasks
3273  void set_type(transwarp::task_type) noexcept override {}
3274 
3275  /// No-op as value tasks don't run
3276  void set_avg_idletime_us(std::int64_t) noexcept override {}
3277 
3278  /// No-op as value tasks don't run
3279  void set_avg_waittime_us(std::int64_t) noexcept override {}
3280 
3281  /// No-op as value tasks don't run
3282  void set_avg_runtime_us(std::int64_t) noexcept override {}
3283 
3284  /// No-op because a value task never runs
3285  void schedule_impl(bool, transwarp::executor*) override {}
3286 
3287  /// Visits this task
3288  void visit(const std::function<void(transwarp::itask&)>& visitor) override {
3289  if (!this->visited_) {
3290  visitor(*this);
3291  this->visited_ = true;
3292  }
3293  }
3294 
3295  /// Marks this task as not visited
3296  void unvisit() noexcept override {
3297  this->visited_ = false;
3298  }
3299 
3300  void increment_childcount() noexcept override {}
3301 
3302  void decrement_refcount() override {}
3303 
3304  void reset_future() override {}
3305 
3306 };
3307 
3308 
3309 /// A factory function to create a new task
3310 template<typename TaskType, typename Functor, typename... Parents>
3311 auto make_task(TaskType, Functor&& functor, std::shared_ptr<Parents>... parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>> {
3312  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, typename Parents::result_type...>;
3313  return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)...));
3314 }
3315 
3316 
3317 /// A factory function to create a new task with vector parents
3318 template<typename TaskType, typename Functor, typename ParentType>
3319 auto make_task(TaskType, Functor&& functor, std::vector<ParentType> parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>> {
3320  using task_t = transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>;
3321  return std::shared_ptr<task_t>(new task_t(std::forward<Functor>(functor), std::move(parents)));
3322 }
3323 
3324 
3325 /// A factory function to create a new value task
3326 template<typename Value>
3327 auto make_value_task(Value&& value) -> std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>> {
3329  return std::shared_ptr<task_t>(new task_t(std::forward<Value>(value)));
3330 }
3331 
3332 
3333 /// A function similar to std::for_each but returning a transwarp task for
3334 /// deferred, possibly asynchronous execution. This function creates a graph
3335 /// with std::distance(first, last) root tasks
3336 template<typename InputIt, typename UnaryOperation>
3337 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3338  const auto distance = std::distance(first, last);
3339  if (distance <= 0) {
3340  throw transwarp::invalid_parameter{"first or last"};
3341  }
3342  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3343  tasks.reserve(static_cast<std::size_t>(distance));
3344  for (; first != last; ++first) {
3345  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); }));
3346  }
3348 }
3349 
3350 /// A function similar to std::for_each but returning a transwarp task for
3351 /// deferred, possibly asynchronous execution. This function creates a graph
3352 /// with std::distance(first, last) root tasks.
3353 /// Overload for automatic scheduling by passing an executor.
3354 template<typename InputIt, typename UnaryOperation>
3355 auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3356  auto task = transwarp::for_each(first, last, unary_op);
3357  task->schedule_all(executor);
3358  return task;
3359 }
3360 
3361 
3362 /// A function similar to std::transform but returning a transwarp task for
3363 /// deferred, possibly asynchronous execution. This function creates a graph
3364 /// with std::distance(first1, last1) root tasks
3365 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3366 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3367  const auto distance = std::distance(first1, last1);
3368  if (distance <= 0) {
3369  throw transwarp::invalid_parameter{"first1 or last1"};
3370  }
3371  std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3372  tasks.reserve(static_cast<std::size_t>(distance));
3373  for (; first1 != last1; ++first1, ++d_first) {
3374  tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); }));
3375  }
3377 }
3378 
3379 /// A function similar to std::transform but returning a transwarp task for
3380 /// deferred, possibly asynchronous execution. This function creates a graph
3381 /// with std::distance(first1, last1) root tasks.
3382 /// Overload for automatic scheduling by passing an executor.
3383 template<typename InputIt, typename OutputIt, typename UnaryOperation>
3384 auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3385  auto task = transwarp::transform(first1, last1, d_first, unary_op);
3386  task->schedule_all(executor);
3387  return task;
3388 }
3389 
3390 
3391 /// A task pool that allows running multiple instances of the same task in parallel.
3392 template<typename ResultType>
3393 class task_pool {
3394 public:
3395 
3396  /// Constructs a task pool
3398  std::size_t minimum_size,
3399  std::size_t maximum_size)
3400  : task_(std::move(task)),
3401  minimum_(minimum_size),
3402  maximum_(maximum_size),
3403  finished_(maximum_size)
3404  {
3405  if (minimum_ < 1) {
3406  throw transwarp::invalid_parameter{"minimum size"};
3407  }
3408  if (minimum_ > maximum_) {
3409  throw transwarp::invalid_parameter{"minimum or maximum size"};
3410  }
3411  task_->add_listener(transwarp::event_type::after_finished, listener_);
3412  for (std::size_t i=0; i<minimum_; ++i) {
3413  idle_.push(task_->clone());
3414  }
3415  }
3416 
3417  /// Constructs a task pool with reasonable defaults for minimum and maximum
3418  explicit
3420  : task_pool(std::move(task), 32, 65536)
3421  {}
3422 
3423  // delete copy/move semantics
3424  task_pool(const task_pool&) = delete;
3425  task_pool& operator=(const task_pool&) = delete;
3426  task_pool(task_pool&&) = delete;
3427  task_pool& operator=(task_pool&&) = delete;
3428 
3429  /// Returns the next idle task.
3430  /// If there are no idle tasks then it will attempt to double the
3431  /// pool size. If that fails then it will return a nullptr. On successful
3432  /// retrieval of an idle task the function will mark that task as busy.
3433  std::shared_ptr<transwarp::task<ResultType>> next_task(bool maybe_resize=true) {
3434  const transwarp::itask* finished_task{};
3435  {
3436  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3437  if (!finished_.empty()) {
3438  finished_task = finished_.front(); finished_.pop();
3439  }
3440  }
3441 
3442  std::shared_ptr<transwarp::task<ResultType>> task;
3443  if (finished_task) {
3444  task = busy_.find(finished_task)->second;
3445  } else {
3446  if (maybe_resize && idle_.empty()) {
3447  resize(size() * 2); // double pool size
3448  }
3449  if (idle_.empty()) {
3450  return nullptr;
3451  }
3452  task = idle_.front(); idle_.pop();
3453  busy_.emplace(task.get(), task);
3454  }
3455 
3456  auto future = task->future();
3457  if (future.valid()) {
3458  future.wait(); // will return immediately
3459  }
3460  return task;
3461  }
3462 
3463  /// Just like next_task() but waits for a task to become available.
3464  /// The returned task will always be a valid pointer
3465  std::shared_ptr<transwarp::task<ResultType>> wait_for_next_task(bool maybe_resize=true) {
3466  for (;;) {
3467  std::shared_ptr<transwarp::task<ResultType>> g = next_task(maybe_resize);
3468  if (g) {
3469  return g;
3470  }
3471  }
3472  }
3473 
3474  /// Returns the current total size of the pool (sum of idle and busy tasks)
3475  std::size_t size() const {
3476  return idle_.size() + busy_.size();
3477  }
3478 
3479  /// Returns the minimum size of the pool
3480  std::size_t minimum_size() const {
3481  return minimum_;
3482  }
3483 
3484  /// Returns the maximum size of the pool
3485  std::size_t maximum_size() const {
3486  return maximum_;
3487  }
3488 
3489  /// Returns the number of idle tasks in the pool
3490  std::size_t idle_count() const {
3491  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3492  return idle_.size() + finished_.size();
3493  }
3494 
3495  /// Returns the number of busy tasks in the pool
3496  std::size_t busy_count() const {
3497  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3498  return busy_.size() - finished_.size();
3499  }
3500 
3501  /// Resizes the task pool to the given new size if possible
3502  void resize(std::size_t new_size) {
3503  reclaim();
3504  if (new_size > size()) { // grow
3505  const std::size_t count = new_size - size();
3506  for (std::size_t i=0; i<count; ++i) {
3507  if (size() == maximum_) {
3508  break;
3509  }
3510  idle_.push(task_->clone());
3511  }
3512  } else if (new_size < size()) { // shrink
3513  const std::size_t count = size() - new_size;
3514  for (std::size_t i=0; i<count; ++i) {
3515  if (idle_.empty() || size() == minimum_) {
3516  break;
3517  }
3518  idle_.pop();
3519  }
3520  }
3521  }
3522 
3523  /// Reclaims finished tasks by marking them as idle again
3524  void reclaim() {
3525  decltype(finished_) finished{finished_.capacity()};
3526  {
3527  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3528  finished_.swap(finished);
3529  }
3530  while (!finished.empty()) {
3531  const transwarp::itask* task = finished.front(); finished.pop();
3532  const auto it = busy_.find(task);
3533  idle_.push(it->second);
3534  busy_.erase(it);
3535  }
3536  }
3537 
3538 private:
3539 
3540  class finished_listener : public transwarp::listener {
3541  public:
3542 
3543  explicit
3544  finished_listener(task_pool<ResultType>& pool)
3545  : pool_(pool)
3546  {}
3547 
3548  // Called on a potentially high-priority thread
3549  void handle_event(transwarp::event_type, transwarp::itask& task) override {
3550  std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3551  pool_.finished_.push(static_cast<const transwarp::itask*>(&task));
3552  }
3553 
3554  private:
3555  task_pool<ResultType>& pool_;
3556  };
3557 
3558  std::shared_ptr<transwarp::task<ResultType>> task_;
3559  std::size_t minimum_;
3560  std::size_t maximum_;
3561  mutable transwarp::detail::spinlock spinlock_; // protecting finished_
3563  std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3564  std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3565  std::shared_ptr<transwarp::listener> listener_{new finished_listener(*this)};
3566 };
3567 
3568 
3569 /// A timer that tracks the average idle, wait, and run time of each task it listens to.
3570 /// - idle = time between scheduling and starting the task (executor dependent)
3571 /// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish
3572 /// - run = time between invoking and finishing the task's computations
3573 class timer : public transwarp::listener {
3574 public:
3575  timer() = default;
3576 
3577  // delete copy/move semantics
3578  timer(const timer&) = delete;
3579  timer& operator=(const timer&) = delete;
3580  timer(timer&&) = delete;
3581  timer& operator=(timer&&) = delete;
3582 
3583  /// Performs the actual timing and populates the task's timing members
3585  switch (event) {
3587  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3588  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3589  auto& track = tracks_[&task];
3590  track.startidle = now;
3591  }
3592  break;
3594  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3595  track_idletime(task, now);
3596  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3597  auto& track = tracks_[&task];
3598  track.startwait = now;
3599  }
3600  break;
3602  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3603  track_waittime(task, now);
3604  }
3605  break;
3607  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3608  track_waittime(task, now);
3609  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3610  auto& track = tracks_[&task];
3611  track.running = true;
3612  track.startrun = now;
3613  }
3614  break;
3616  const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3617  track_runtime(task, now);
3618  }
3619  break;
3620  default: break;
3621  }
3622  }
3623 
3624  /// Resets all timing information
3625  void reset() {
3626  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3627  tracks_.clear();
3628  }
3629 
3630 private:
3631 
3632  void track_idletime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3633  std::int64_t avg_idletime_us;
3634  {
3635  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3636  auto& track = tracks_[&task];
3637  track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3638  ++track.idlecount;
3639  avg_idletime_us = static_cast<std::int64_t>(track.idletime / track.idlecount);
3640  }
3641  task.set_avg_idletime_us(avg_idletime_us);
3642  }
3643 
3644  void track_waittime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3645  std::int64_t avg_waittime_us;
3646  {
3647  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3648  auto& track = tracks_[&task];
3649  track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3650  ++track.waitcount;
3651  avg_waittime_us = static_cast<std::int64_t>(track.waittime / track.waitcount);
3652  }
3653  task.set_avg_waittime_us(avg_waittime_us);
3654  }
3655 
3656  void track_runtime(transwarp::itask& task, const std::chrono::time_point<std::chrono::steady_clock>& now) {
3657  std::int64_t avg_runtime_us;
3658  {
3659  std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3660  auto& track = tracks_[&task];
3661  if (!track.running) {
3662  return;
3663  }
3664  track.running = false;
3665  track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3666  ++track.runcount;
3667  avg_runtime_us = static_cast<std::int64_t>(track.runtime / track.runcount);
3668  }
3669  task.set_avg_runtime_us(avg_runtime_us);
3670  }
3671 
3672  struct track {
3673  bool running = false;
3674  std::chrono::time_point<std::chrono::steady_clock> startidle;
3675  std::chrono::time_point<std::chrono::steady_clock> startwait;
3676  std::chrono::time_point<std::chrono::steady_clock> startrun;
3677  std::chrono::microseconds::rep idletime = 0;
3678  std::chrono::microseconds::rep idlecount = 0;
3679  std::chrono::microseconds::rep waittime = 0;
3680  std::chrono::microseconds::rep waitcount = 0;
3681  std::chrono::microseconds::rep runtime = 0;
3682  std::chrono::microseconds::rep runcount = 0;
3683  };
3684 
3685  transwarp::detail::spinlock spinlock_; // protecting tracks_
3686  std::unordered_map<const transwarp::itask*, track> tracks_;
3687 };
3688 
3689 
3690 /// The releaser will release a task's future when the task's `after_satisfied`
3691 /// event was received which happens when all children received the task's result.
3692 /// The releaser should be used in cases where the task's result is only needed
3693 /// for consumption by its children and can then be discarded.
3695 public:
3696  releaser() = default;
3697 
3698  /// The executor gives control over where a task's future is released
3699  explicit releaser(std::shared_ptr<transwarp::executor> executor)
3700  : executor_(std::move(executor))
3701  {}
3702 
3703  // delete copy/move semantics
3704  releaser(const releaser&) = delete;
3705  releaser& operator=(const releaser&) = delete;
3706  releaser(releaser&&) = delete;
3707  releaser& operator=(releaser&&) = delete;
3708 
3711  if (executor_) {
3712  executor_->execute([&task]{ task.reset_future(); }, task);
3713  } else {
3714  task.reset_future();
3715  }
3716  }
3717  }
3718 
3719 private:
3720  std::shared_ptr<transwarp::executor> executor_;
3721 };
3722 
3723 
3724 } // transwarp
transwarp::consume_type
The consume type. Used for tag dispatch.
Definition: transwarp.h:290
transwarp::value_task::schedule
void schedule(transwarp::executor &, bool) override
No-op because a value task never runs.
Definition: transwarp.h:3144
transwarp::detail::task_common::remove_custom_data
void remove_custom_data() override
Removes custom data from this task.
Definition: transwarp.h:2065
transwarp::task_impl::result_type
typename transwarp::detail::functor_result< TaskType, Functor, ParentResults... >::type result_type
The result type of this task.
Definition: transwarp.h:2921
transwarp::detail::task_common::add_listener
void add_listener(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types.
Definition: transwarp.h:2079
transwarp::task_type::accept_any
@ accept_any
The task's functor accepts the first parent future that becomes ready.
transwarp::detail::call_impl< transwarp::consume_any_type, true, total, n... >::work
static Result work(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &parents)
Definition: transwarp.h:1121
transwarp::detail::task_impl_base::set_executor_all
void set_executor_all(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to all tasks which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2330
transwarp::event_type::after_custom_data_set
@ after_custom_data_set
Just after custom data was assigned (handle_event called on thread that custom data was set on)
transwarp::value_task::set_exception
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task.
Definition: transwarp.h:3171
transwarp::detail::reset_priority_visitor
Resets the priority of the given task.
Definition: transwarp.h:1360
transwarp::detail::task_impl_base::schedule_all
void schedule_all(transwarp::executor &executor, bool reset_all) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2412
transwarp::detail::edges_visitor
Generates edges.
Definition: transwarp.h:1281
transwarp::detail::task_common::ensure_task_not_running
void ensure_task_not_running() const
Checks if the task is currently running and throws transwarp::control_error if it is.
Definition: transwarp.h:2153
transwarp::task_type::consume
@ consume
The task's functor consumes all parent results.
transwarp::detail::task_impl_base::schedule
void schedule(transwarp::executor &executor) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2369
transwarp::task_pool::idle_count
std::size_t idle_count() const
Returns the number of idle tasks in the pool.
Definition: transwarp.h:3490
transwarp::detail::make_ready_future
std::shared_future< void > make_ready_future()
Returns a ready future.
Definition: transwarp.h:1617
transwarp::detail::task_common::add_listener
void add_listener(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only.
Definition: transwarp.h:2089
transwarp::executor
The executor interface used to perform custom task execution.
Definition: transwarp.h:323
transwarp::value_task::has_result
bool has_result() const noexcept override
Returns true because a value task always contains a result.
Definition: transwarp.h:3195
transwarp::task_pool::task_pool
task_pool(std::shared_ptr< transwarp::task< ResultType >> task, std::size_t minimum_size, std::size_t maximum_size)
Constructs a task pool.
Definition: transwarp.h:3397
transwarp::value_task::remove_listener_all
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:3222
transwarp::detail::call
Result call(std::size_t task_id, const Task &task, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &parents)
Calls the functor of the given task with the results from the tuple of parents. Throws transwarp::tas...
Definition: transwarp.h:1205
transwarp::detail::task_impl_base::cancel_all
void cancel_all(bool enabled) noexcept override
If enabled then all pending tasks in the graph are canceled which will throw transwarp::task_canceled...
Definition: transwarp.h:2480
transwarp::value_task::add_listener_all
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:3212
transwarp::detail::task_impl_proxy::set_value
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set....
Definition: transwarp.h:2782
transwarp::consume_any
constexpr transwarp::consume_any_type consume_any
The consume_any task tag.
Definition: transwarp.h:295
transwarp::detail::decrement_refcount_functor
Definition: transwarp.h:996
transwarp::value_task::schedule_all
void schedule_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3147
transwarp::functor
A base class for a user-defined functor that needs access to the associated task or a cancel point to...
Definition: transwarp.h:682
transwarp::root
constexpr transwarp::root_type root
The root task tag.
Definition: transwarp.h:279
transwarp::detail::task_common::reset_priority
void reset_priority() override
Resets the task priority to 0.
Definition: transwarp.h:2042
transwarp::detail::circular_buffer
A simple circular buffer (FIFO). ValueType must support default construction. The buffer lets you pus...
Definition: transwarp.h:1776
transwarp::task_destroyed
Exception thrown when a task was destroyed prematurely.
Definition: transwarp.h:251
transwarp::detail::task_impl_base::remove_listener_all
void remove_listener_all(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types and for all parents.
Definition: transwarp.h:2542
transwarp::detail::thread_pool
A simple thread pool used to execute tasks in parallel.
Definition: transwarp.h:720
transwarp::detail::task_impl_proxy< void, TaskType, Functor, ParentResults... >::get
void get() const override
Blocks until the task finishes. Throws any exceptions that the underlying functor throws....
Definition: transwarp.h:2887
transwarp::detail::task_impl_base::avg_waittime_us
std::int64_t avg_waittime_us() const noexcept override
Returns the average waittime in microseconds (-1 if never set)
Definition: transwarp.h:2301
transwarp::task_impl::task_type
TaskType task_type
The task type.
Definition: transwarp.h:2918
transwarp::value_task::set_custom_data_all
void set_custom_data_all(transwarp::any_data custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:3119
transwarp::value_task::edges
std::vector< transwarp::edge > edges() override
Returns empty edges because a value task doesn't have parents.
Definition: transwarp.h:3252
transwarp::detail::task_impl_base::remove_listeners_all
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:2563
transwarp::value_task::schedule_all
void schedule_all(bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3153
transwarp::task_pool::reclaim
void reclaim()
Reclaims finished tasks by marking them as idle again.
Definition: transwarp.h:3524
transwarp::task_impl::clone_cast
std::shared_ptr< task_impl > clone_cast() const
Clones this task and casts the result to a ptr to task_impl.
Definition: transwarp.h:2965
transwarp::detail::add_listener_per_event_visitor
Adds a new listener per event type to the given task.
Definition: transwarp.h:1413
transwarp::detail::task_impl_base::reset_priority_all
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:2498
transwarp::detail::call_impl
Definition: transwarp.h:1017
transwarp::event_type::before_started
@ before_started
Just before a task starts running (handle_event called on thread that task is run on)
transwarp::detail::wait_for_all_functor
Definition: transwarp.h:908
transwarp::detail::task_impl_base::set_custom_data_all
void set_custom_data_all(transwarp::any_data custom_data) override
Assigns custom data to all tasks. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2508
transwarp::detail::push_task_visitor
Pushes the given task into the vector of tasks.
Definition: transwarp.h:1388
transwarp
The transwarp namespace.
Definition: transwarp.h:62
transwarp::task_type::wait
@ wait
The task's functor takes no arguments but waits for all parents to finish.
transwarp::detail::task_impl_base::add_listener_all
void add_listener_all(std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for all event types and for all parents.
Definition: transwarp.h:2528
transwarp::value_task::avg_idletime_us
std::int64_t avg_idletime_us() const noexcept override
Returns -1 as value tasks don't run.
Definition: transwarp.h:3074
transwarp::value_task::level
std::size_t level() const noexcept override
The task's level.
Definition: transwarp.h:3054
transwarp::detail::functor_result
Determines the result type of the Functor dispatching on the task type.
Definition: transwarp.h:1497
transwarp::detail::circular_buffer::full
bool full() const
Returns whether the buffer is full.
Definition: transwarp.h:1838
transwarp::event_type::after_finished
@ after_finished
Just after a task has finished running (handle_event called on thread that task is run on)
transwarp::value_task::remove_listeners_all
void remove_listeners_all(transwarp::event_type event) override
Removes all listeners for the given event type and for all parents.
Definition: transwarp.h:3237
transwarp::detail::task_common
Common task functionality shared across task_impl and value_task
Definition: transwarp.h:1993
transwarp::value_task::get
transwarp::result< result_type >::type get() const override
Returns the value of this task. Throws an exception if this task has an exception assigned to it.
Definition: transwarp.h:3177
transwarp::detail::task_impl_base::cancel
void cancel(bool enabled) noexcept override
If enabled then this task is canceled which will throw transwarp::task_canceled when retrieving the t...
Definition: transwarp.h:2473
transwarp::detail::task_impl_proxy::get
transwarp::result< result_type >::type get() const override
Returns the result of this task. Throws any exceptions that the underlying functor throws....
Definition: transwarp.h:2801
transwarp::accept_any
constexpr transwarp::accept_any_type accept_any
The accept_any task tag.
Definition: transwarp.h:287
transwarp::value_task::set_executor_all
void set_executor_all(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3092
transwarp::functor::transwarp_task
transwarp::itask & transwarp_task() noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:695
transwarp::detail::task_impl_base::has_result
bool has_result() const noexcept override
Returns whether this task contains a result.
Definition: transwarp.h:2447
transwarp::detail::task_impl_base::remove_listeners_all
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:2556
transwarp::value_task::schedule
void schedule() override
No-op because a value task never runs.
Definition: transwarp.h:3135
transwarp::detail::task_impl_base::set_priority_all
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this....
Definition: transwarp.h:2487
transwarp::detail::reset_visitor
Resets the given task.
Definition: transwarp.h:1308
transwarp::value_task::reset_priority_all
void reset_priority_all() override
Resets the priority of all tasks to 0.
Definition: transwarp.h:3111
transwarp::detail::task_common::id
std::size_t id() const noexcept override
The task's id.
Definition: transwarp.h:1999
transwarp::task_type::accept
@ accept
The task's functor accepts all parent futures.
transwarp::detail::remove_listener_per_event_visitor
Removes a listener per event type from the given task.
Definition: transwarp.h:1440
transwarp::detail::task_common::custom_data
const transwarp::any_data & custom_data() const noexcept override
The custom task data (may not hold a value)
Definition: transwarp.h:2022
transwarp::detail::add_listener_visitor
Adds a new listener to the given task.
Definition: transwarp.h:1400
transwarp::releaser::handle_event
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
This may be called from arbitrary threads depending on the event type (see transwarp::event_type)....
Definition: transwarp.h:3709
transwarp::detail::task_impl_base::set_level
void set_level(std::size_t level) noexcept override
Assigns the given level.
Definition: transwarp.h:2626
transwarp::accept
constexpr transwarp::accept_type accept
The accept task tag.
Definition: transwarp.h:283
transwarp::detail::task_common::remove_listener
void remove_listener(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only.
Definition: transwarp.h:2113
transwarp::detail::task_impl_base::tasks
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:2575
transwarp::releaser
The releaser will release a task's future when the task's after_satisfied event was received which ha...
Definition: transwarp.h:3694
transwarp::detail::clone_task_functor
Definition: transwarp.h:1635
transwarp::detail::task_impl_base::unvisit
void unvisit() noexcept override
Traverses through each task and marks them as not visited.
Definition: transwarp.h:2716
transwarp::task_type::consume_any
@ consume_any
The task's functor consumes the first parent result that becomes ready.
transwarp::detail::task_common::check_listener
void check_listener(const std::shared_ptr< transwarp::listener > &listener) const
Check for non-null listener pointer.
Definition: transwarp.h:2173
transwarp::listener
The listener interface to listen to events raised by tasks.
Definition: transwarp.h:354
transwarp::detail::task_impl_base::set_avg_idletime_us
void set_avg_idletime_us(std::int64_t idletime) noexcept override
Assigns the given idletime.
Definition: transwarp.h:2636
transwarp::no_op
constexpr no_op_functor no_op
An object to use in places where a no-op functor is required.
Definition: transwarp.h:1906
transwarp::detail::task_impl_proxy< void, TaskType, Functor, ParentResults... >::task_type
TaskType task_type
The task type.
Definition: transwarp.h:2870
transwarp::detail::parents
Determines the type of the parents.
Definition: transwarp.h:1663
transwarp::detail::call_impl_vector
Definition: transwarp.h:1026
transwarp::value_task
A value task that stores a single value and doesn't require scheduling. Value tasks should be created...
Definition: transwarp.h:3001
transwarp::make_value_task
auto make_value_task(Value &&value) -> std::shared_ptr< transwarp::value_task< typename transwarp::decay< Value >::type >>
A factory function to create a new value task.
Definition: transwarp.h:3327
transwarp::value_task::is_ready
bool is_ready() const override
Returns true because a value task is always ready.
Definition: transwarp.h:3190
transwarp::detail::task_common::remove_listeners
void remove_listeners(transwarp::event_type event) override
Removes all listeners for the given event type.
Definition: transwarp.h:2136
transwarp::accept_type
The accept type. Used for tag dispatch.
Definition: transwarp.h:282
transwarp::event_type
event_type
The task events that can be subscribed to using the listener interface.
Definition: transwarp.h:340
transwarp::detail::wait_for_all
void wait_for_all(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &parents)
Waits for all parents to finish.
Definition: transwarp.h:917
transwarp::value_task::then
auto then(TaskType_, Functor_ &&functor) -> std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type >>
Creates a continuation to this task.
Definition: transwarp.h:3040
transwarp::detail::task_common::raise_event
void raise_event(transwarp::event_type event)
Raises the given event to all listeners.
Definition: transwarp.h:2160
transwarp::detail::task_common::set_name
void set_name(transwarp::option_str name) noexcept override
Assigns the given name.
Definition: transwarp.h:2191
transwarp::detail::visit_visitor
Visits the given task using the visitor given in the constructor.
Definition: transwarp.h:1476
transwarp::detail::task_impl_proxy< ResultType &, TaskType, Functor, ParentResults... >::set_value
void set_value(typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set....
Definition: transwarp.h:2834
transwarp::detail::remove_executor_visitor
Removes the executor from the given task.
Definition: transwarp.h:1340
transwarp::task_pool::next_task
std::shared_ptr< transwarp::task< ResultType > > next_task(bool maybe_resize=true)
Returns the next idle task. If there are no idle tasks then it will attempt to double the pool size....
Definition: transwarp.h:3433
transwarp::detail::task_impl_base::edges
std::vector< transwarp::edge > edges() override
Returns all edges in the graph. This is mainly for visualizing the tasks and their interdependencies....
Definition: transwarp.h:2583
transwarp::wait_any_type
The wait_any type. Used for tag dispatch.
Definition: transwarp.h:302
transwarp::edge::child
const transwarp::itask & child() const noexcept
Returns the child task.
Definition: transwarp.h:388
transwarp::event_type::after_satisfied
@ after_satisfied
Just after a task has satisfied all its children with results (handle_event called on thread where th...
transwarp::event_type::after_future_changed
@ after_future_changed
Just after the task's future was changed (handle_event called on thread that changed the task's futur...
transwarp::value_task::reset_all
void reset_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3203
transwarp::releaser::releaser
releaser(std::shared_ptr< transwarp::executor > executor)
The executor gives control over where a task's future is released.
Definition: transwarp.h:3699
transwarp::task_pool::minimum_size
std::size_t minimum_size() const
Returns the minimum size of the pool.
Definition: transwarp.h:3480
transwarp::detail::cancel_all_but_one_functor
Definition: transwarp.h:969
transwarp::no_op_functor
A functor not doing nothing.
Definition: transwarp.h:1901
transwarp::value_task::remove_executor_all
void remove_executor_all() override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3098
transwarp::task_impl::then
auto then(TaskType_, Functor_ &&functor) -> std::shared_ptr< transwarp::task_impl< TaskType_, typename std::decay< Functor_ >::type, result_type >>
Creates a continuation to this task.
Definition: transwarp.h:2959
transwarp::detail::task_impl_proxy::result_type
ResultType result_type
The result type of this task.
Definition: transwarp.h:2778
transwarp::control_error
Exception thrown when a task is used in unintended ways.
Definition: transwarp.h:269
transwarp::value_task::reset
void reset() override
No-op because a value task never runs.
Definition: transwarp.h:3200
transwarp::task_type::wait_any
@ wait_any
The task's functor takes no arguments but waits for the first parent to finish.
transwarp::value_task::finalize
void finalize() override
Nothing to be done to finalize a value task.
Definition: transwarp.h:3051
transwarp::detail::task_common::remove_listeners
void remove_listeners() override
Removes all listeners.
Definition: transwarp.h:2127
transwarp::value_task::executor
std::shared_ptr< transwarp::executor > executor() const noexcept override
Value tasks don't have executors as they don't run.
Definition: transwarp.h:3064
transwarp::edge::child
transwarp::itask & child() noexcept
Returns the child task.
Definition: transwarp.h:393
transwarp::detail::assign_futures_impl
Definition: transwarp.h:861
transwarp::detail::task_impl_base::add_listener_all
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:2535
transwarp::detail::circular_buffer::swap
void swap(circular_buffer &buffer)
Swaps this buffer with the given buffer.
Definition: transwarp.h:1843
transwarp::value_task::remove_custom_data_all
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:3128
transwarp::value_task::was_scheduled
bool was_scheduled() const noexcept override
Returns true because a value task is scheduled once on construction.
Definition: transwarp.h:3182
transwarp::detail::task_impl_base
The base task class that contains the functionality that can be used with all result types (void and ...
Definition: transwarp.h:2243
transwarp::task_pool::size
std::size_t size() const
Returns the current total size of the pool (sum of idle and busy tasks)
Definition: transwarp.h:3475
transwarp::detail::task_impl_base::avg_runtime_us
std::int64_t avg_runtime_us() const noexcept override
Returns the average runtime in microseconds (-1 if never set)
Definition: transwarp.h:2310
transwarp::detail::task_common::priority
std::int64_t priority() const noexcept override
The task priority (defaults to 0)
Definition: transwarp.h:2013
transwarp::detail::get_futures
std::tuple< std::shared_future< ParentResults >... > get_futures(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &input)
Returns the futures from the given tuple of tasks.
Definition: transwarp.h:875
transwarp::detail::set_executor_visitor
Assigns an executor to the given task.
Definition: transwarp.h:1328
transwarp::detail::task_impl_base::set_exception
void set_exception(std::exception_ptr exception) override
Assigns an exception to this task. Scheduling will have no effect after an exception has been set....
Definition: transwarp.h:2419
transwarp::detail::task_impl_base::set_executor
void set_executor(std::shared_ptr< transwarp::executor > executor) override
Assigns an executor to this task which takes precedence over the executor provided in schedule() or s...
Definition: transwarp.h:2320
transwarp::detail::task_impl_base::wait
void wait() const override
Waits for the task to complete. Should only be called if was_scheduled() is true, throws transwarp::c...
Definition: transwarp.h:2434
transwarp::detail::unvisit_visitor
Unvisits the given task.
Definition: transwarp.h:1488
transwarp::value_task::canceled
bool canceled() const noexcept override
Value tasks cannot be canceled.
Definition: transwarp.h:3069
transwarp::detail::remove_listeners_visitor
Removes all listeners from the given task.
Definition: transwarp.h:1454
transwarp::detail::make_future_with_value
std::shared_future< ResultType > make_future_with_value(Value &&value)
Returns a ready future with the given value as its state.
Definition: transwarp.h:1609
transwarp::for_each
auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void >>>>>
A function similar to std::for_each but returning a transwarp task for deferred, possibly asynchronou...
Definition: transwarp.h:3337
transwarp::task_pool::busy_count
std::size_t busy_count() const
Returns the number of busy tasks in the pool.
Definition: transwarp.h:3496
transwarp::value_task::clone_cast
std::shared_ptr< value_task > clone_cast() const
Clones this task and casts the result to a ptr to value_task.
Definition: transwarp.h:3046
transwarp::detail::task_impl_base::executor
std::shared_ptr< transwarp::executor > executor() const noexcept override
The task's executor (may be null)
Definition: transwarp.h:2282
transwarp::detail::task_impl_base::schedule_all_impl
void schedule_all_impl(bool reset_all, transwarp::executor *executor=nullptr)
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2701
transwarp::edge
An edge between two tasks.
Definition: transwarp.h:365
transwarp::detail::task_impl_base::reset
void reset() override
Resets this task.
Definition: transwarp.h:2452
transwarp::detail::task_impl_base::level
std::size_t level() const noexcept override
The task's level.
Definition: transwarp.h:2272
transwarp::value_task::schedule
void schedule(transwarp::executor &) override
No-op because a value task never runs.
Definition: transwarp.h:3138
transwarp::task_type
task_type
The possible task types.
Definition: transwarp.h:221
transwarp::task_pool::task_pool
task_pool(std::shared_ptr< transwarp::task< ResultType >> task)
Constructs a task pool with reasonable defaults for minimum and maximum.
Definition: transwarp.h:3419
transwarp::detail::run_task
Result run_task(std::size_t task_id, const std::weak_ptr< Task > &task, Args &&... args)
Runs the task with the given arguments, hence, invoking the task's functor.
Definition: transwarp.h:895
transwarp::detail::remove_listeners_per_event_visitor
Removes all listeners per event type from the given task.
Definition: transwarp.h:1463
transwarp::detail::circular_buffer::circular_buffer
circular_buffer(std::size_t capacity)
Constructs a circular buffer with a given fixed capacity.
Definition: transwarp.h:1785
transwarp::detail::call_with_each_functor
Definition: transwarp.h:1222
transwarp::make_task
auto make_task(TaskType, Functor &&functor, std::shared_ptr< Parents >... parents) -> std::shared_ptr< transwarp::task_impl< TaskType, typename std::decay< Functor >::type, typename Parents::result_type... >>
A factory function to create a new task.
Definition: transwarp.h:3311
transwarp::event_type::before_scheduled
@ before_scheduled
Just before a task is scheduled (handle_event called on thread of caller to schedule())
transwarp::value_task::set_value
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task.
Definition: transwarp.h:3165
transwarp::detail::task_common::future
std::shared_future< result_type > future() const noexcept override
Returns the future associated to the underlying execution.
Definition: transwarp.h:2074
transwarp::value_task::cancel
void cancel(bool) noexcept override
No-op because a value task never runs.
Definition: transwarp.h:3206
transwarp::detail::task_impl_base::schedule
void schedule(bool reset) override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2361
transwarp::value_task::set_value
void set_value(const typename transwarp::decay< result_type >::type &value) override
Assigns a value to this task.
Definition: transwarp.h:3159
transwarp::detail::task_impl_base::result_type
ResultType result_type
The result type of this task.
Definition: transwarp.h:2249
transwarp::detail::future_get_functor
Definition: transwarp.h:1143
transwarp::value_task::remove_executor
void remove_executor() override
No-op because a value task never runs.
Definition: transwarp.h:3095
transwarp::sequential
Executor for sequential execution. Runs functors sequentially on the same thread.
Definition: transwarp.h:1910
transwarp::value_task::add_listener_all
void add_listener_all(transwarp::event_type event, std::shared_ptr< transwarp::listener > listener) override
Adds a new listener for the given event type only and for all parents.
Definition: transwarp.h:3217
transwarp::detail::circular_buffer::empty
bool empty() const
Returns whether the buffer is empty.
Definition: transwarp.h:1833
transwarp::value_task::set_executor
void set_executor(std::shared_ptr< transwarp::executor >) override
No-op because a value task never runs.
Definition: transwarp.h:3089
transwarp::detail::task_impl_base::remove_executor_all
void remove_executor_all() override
Removes the executor from all tasks.
Definition: transwarp.h:2343
transwarp::value_task::set_priority_all
void set_priority_all(std::int64_t priority) override
Sets a priority to all tasks (defaults to 0). transwarp will not directly use this....
Definition: transwarp.h:3102
transwarp::detail::runner
A callable to run a task given its parents.
Definition: transwarp.h:1731
transwarp::edge::parent
transwarp::itask & parent() noexcept
Returns the parent task.
Definition: transwarp.h:383
transwarp::value_task::tasks
const std::vector< transwarp::itask * > & tasks() override
Returns all tasks in the graph in breadth order.
Definition: transwarp.h:3247
transwarp::detail::task_impl_base::parents
std::vector< transwarp::itask * > parents() const override
Returns the task's parents (may be empty)
Definition: transwarp.h:2570
transwarp::itask
An interface for the task class.
Definition: transwarp.h:407
transwarp::detail::cancel_all_but_one
void cancel_all_but_one(const std::shared_ptr< transwarp::task< OneResult >> &one, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &parents)
Cancels all tasks but one.
Definition: transwarp.h:985
transwarp::detail::task_impl_base::canceled
bool canceled() const noexcept override
Returns whether the associated task is canceled.
Definition: transwarp.h:2287
transwarp::detail::task_impl_base< ResultType, TaskType, Functor, ParentResults... >::task_type
TaskType task_type
The task type.
Definition: transwarp.h:2246
transwarp::detail::task_impl_base::avg_idletime_us
std::int64_t avg_idletime_us() const noexcept override
Returns the average idletime in microseconds (-1 if never set)
Definition: transwarp.h:2292
transwarp::detail::clone_task
std::shared_ptr< TaskType > clone_task(std::unordered_map< std::shared_ptr< transwarp::itask >, std::shared_ptr< transwarp::itask >> &task_cache, const std::shared_ptr< TaskType > &t)
Clones a task.
Definition: transwarp.h:582
transwarp::accept_any_type
The accept_any type. Used for tag dispatch.
Definition: transwarp.h:286
transwarp::detail::task_impl_proxy< void, TaskType, Functor, ParentResults... >::set_value
void set_value() override
Assigns a value to this task. Scheduling will have no effect after a call to this....
Definition: transwarp.h:2877
transwarp::detail::task_common::set_priority
void set_priority(std::int64_t priority) override
Sets a task priority (defaults to 0). transwarp will not directly use this. This is only useful if so...
Definition: transwarp.h:2032
transwarp::task_impl::task_impl
task_impl(F &&functor, std::vector< std::shared_ptr< transwarp::task< P >>> parents)
A task is defined by functor and parent tasks. Note: Don't use this constructor directly,...
Definition: transwarp.h:2933
transwarp::sequential::execute
void execute(const std::function< void()> &functor, transwarp::itask &) override
Runs the functor on the current thread.
Definition: transwarp.h:1927
transwarp::root_type
The root type. Used for tag dispatch.
Definition: transwarp.h:278
transwarp::detail::circular_buffer::front
const value_type & front() const
Returns the value at the front of the buffer (the oldest value). This is undefined if the buffer is e...
Definition: transwarp.h:1809
transwarp::detail::final_visitor
Applies final bookkeeping to the task and collects the task.
Definition: transwarp.h:1267
transwarp::detail::base_runner
Definition: transwarp.h:1701
transwarp::task_impl
A task representing a piece of work given by functor and parent tasks. By connecting tasks a directed...
Definition: transwarp.h:2915
transwarp::wait_type
The wait type. Used for tag dispatch.
Definition: transwarp.h:298
transwarp::value_task::value_task
value_task(T &&value)
A value task is defined by a given value. Note: Don't use this constructor directly,...
Definition: transwarp.h:3012
transwarp::detail::task_impl_base::schedule_impl
void schedule_impl(bool reset, transwarp::executor *executor=nullptr) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2673
transwarp::executor::execute
virtual void execute(const std::function< void()> &functor, transwarp::itask &task)=0
Runs a task which is wrapped by the given functor. The functor only captures one shared pointer and c...
transwarp::value_task::named
std::shared_ptr< value_task > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:3025
transwarp::detail::set_custom_data_visitor
Assigns custom data to the given task.
Definition: transwarp.h:1368
transwarp::task_pool::resize
void resize(std::size_t new_size)
Resizes the task pool to the given new size if possible.
Definition: transwarp.h:3502
transwarp::timer::handle_event
void handle_event(const transwarp::event_type event, transwarp::itask &task) override
Performs the actual timing and populates the task's timing members.
Definition: transwarp.h:3584
transwarp::detail::task_common::remove_listener
void remove_listener(const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for all event types.
Definition: transwarp.h:2097
transwarp::detail::push_task_functor
Definition: transwarp.h:1648
transwarp::parallel::name
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1949
transwarp::result
Returns the result type of a std::shared_future<T>
Definition: transwarp.h:572
transwarp::wait
constexpr transwarp::wait_type wait
The wait task tag.
Definition: transwarp.h:299
transwarp::functor::transwarp_task
const transwarp::itask & transwarp_task() const noexcept
The associated task (only to be called after the task was constructed)
Definition: transwarp.h:690
transwarp::detail::task_impl_base::finalize
void finalize() override
Can be called to explicitly finalize this task making this task the terminal task in the graph....
Definition: transwarp.h:2255
transwarp::decay
Removes reference and const from a type.
Definition: transwarp.h:565
transwarp::detail::task_impl_base::ensure_task_was_scheduled
void ensure_task_was_scheduled() const
Checks if the task was scheduled and throws transwarp::control_error if it's not.
Definition: transwarp.h:2663
transwarp::value_task::avg_waittime_us
std::int64_t avg_waittime_us() const noexcept override
Returns -1 as value tasks don't run.
Definition: transwarp.h:3079
transwarp::task_pool
A task pool that allows running multiple instances of the same task in parallel.
Definition: transwarp.h:3393
transwarp::detail::task_impl_base::schedule
void schedule() override
Schedules this task for execution on the caller thread. The task-specific executor gets precedence if...
Definition: transwarp.h:2352
transwarp::detail::circular_buffer::pop
void pop()
Removes the value at the front of the buffer (the oldest value)
Definition: transwarp.h:1814
transwarp::value_task::remove_listeners_all
void remove_listeners_all() override
Removes all listeners and for all parents.
Definition: transwarp.h:3232
transwarp::task_pool::maximum_size
std::size_t maximum_size() const
Returns the maximum size of the pool.
Definition: transwarp.h:3485
transwarp::detail::task_impl_proxy::set_value
void set_value(typename transwarp::decay< result_type >::type &&value) override
Assigns a value to this task. Scheduling will have no effect after a value has been set....
Definition: transwarp.h:2791
transwarp::task_canceled
Exception thrown when a task is canceled.
Definition: transwarp.h:242
transwarp::detail::wait_for_any
Parent wait_for_any(const std::shared_ptr< transwarp::task< ParentResults >> &...parents)
Waits for the first parent to finish.
Definition: transwarp.h:944
transwarp::invalid_parameter
Exception thrown when an invalid parameter was passed to a function.
Definition: transwarp.h:260
transwarp::detail::task_impl_proxy< ResultType &, TaskType, Functor, ParentResults... >::task_type
TaskType task_type
The task type.
Definition: transwarp.h:2827
transwarp::edge::parent
const transwarp::itask & parent() const noexcept
Returns the parent task.
Definition: transwarp.h:378
transwarp::detail::task_impl_base::remove_custom_data_all
void remove_custom_data_all() override
Removes custom data from all tasks.
Definition: transwarp.h:2519
transwarp::functor::transwarp_cancel_point
void transwarp_cancel_point() const
If the associated task is canceled then this will throw transwarp::task_canceled which will stop the ...
Definition: transwarp.h:701
transwarp::listener::handle_event
virtual void handle_event(transwarp::event_type event, transwarp::itask &task)=0
This may be called from arbitrary threads depending on the event type (see transwarp::event_type)....
transwarp::detail::set_priority_visitor
Assigns a priority to the given task.
Definition: transwarp.h:1348
transwarp::detail::call_with_each
void call_with_each(const Functor &f, const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &t)
Calls the functor with every element in the tuple.
Definition: transwarp.h:1239
transwarp::sequential::name
std::string name() const override
Returns the name of the executor.
Definition: transwarp.h:1922
transwarp::detail::task_common::set_id
void set_id(std::size_t id) noexcept override
Assigns the given id.
Definition: transwarp.h:2186
transwarp::transform
auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr< transwarp::task_impl< transwarp::wait_type, transwarp::no_op_functor, std::vector< std::shared_ptr< transwarp::task< void >>>>>
A function similar to std::transform but returning a transwarp task for deferred, possibly asynchrono...
Definition: transwarp.h:3366
transwarp::detail::remove_listener_visitor
Removes a listener from the given task.
Definition: transwarp.h:1427
transwarp::detail::task_impl_base::visit_all
void visit_all(Visitor &visitor)
Visits all tasks.
Definition: transwarp.h:2725
transwarp::detail::task_impl_base::set_avg_waittime_us
void set_avg_waittime_us(std::int64_t waittime) noexcept override
Assigns the given waittime.
Definition: transwarp.h:2645
transwarp::detail::task_impl_base::type
transwarp::task_type type() const noexcept override
The task's type.
Definition: transwarp.h:2277
transwarp::event_type::after_canceled
@ after_canceled
Just after a task was canceled (handle_event called on thread that task is run on)
transwarp::detail::task_impl_base::set_avg_runtime_us
void set_avg_runtime_us(std::int64_t runtime) noexcept override
Assigns the given runtime.
Definition: transwarp.h:2654
transwarp::timer
A timer that tracks the average idle, wait, and run time of each task it listens to.
Definition: transwarp.h:3573
transwarp::task_type::root
@ root
The task has no parents.
transwarp::to_string
std::string to_string(const transwarp::task_type &type)
String conversion for the task_type enumeration.
Definition: transwarp.h:498
transwarp::task_impl::named
std::shared_ptr< task_impl > named(std::string name)
Gives this task a name and returns a ptr to itself.
Definition: transwarp.h:2944
transwarp::detail::task_impl_base::reset_all
void reset_all() override
Resets all tasks in the graph.
Definition: transwarp.h:2464
transwarp::detail::remove_custom_data_visitor
Removes custom data from the given task.
Definition: transwarp.h:1380
transwarp::detail::circular_buffer::capacity
std::size_t capacity() const
Returns the capacity of the buffer.
Definition: transwarp.h:1822
transwarp::event_type::before_invoked
@ before_invoked
Just before a task's functor is invoked (handle_event called on thread that task is run on)
transwarp::detail::circular_buffer::size
std::size_t size() const
Returns the number of populated values of the buffer. Its maximum value equals the capacity of the bu...
Definition: transwarp.h:1828
transwarp::detail::circular_buffer::push
void push(T &&value)
Pushes a new value onto the end of the buffer. If that exceeds the capacity of the buffer then the ol...
Definition: transwarp.h:1802
transwarp::detail::task_impl_proxy
A task proxy.
Definition: transwarp.h:2772
transwarp::detail::task_impl_proxy< ResultType &, TaskType, Functor, ParentResults... >::get
transwarp::result< result_type >::type get() const override
Returns the result of this task. Throws any exceptions that the underlying functor throws....
Definition: transwarp.h:2844
transwarp::detail::task_impl_base::is_ready
bool is_ready() const override
Returns whether the task has finished processing. Should only be called if was_scheduled() is true,...
Definition: transwarp.h:2441
transwarp::detail::task_impl_base::visit
void visit(const std::function< void(transwarp::itask &)> &visitor) override
Visits each task in a depth-first traversal.
Definition: transwarp.h:2707
transwarp::value_task::schedule
void schedule(bool) override
No-op because a value task never runs.
Definition: transwarp.h:3141
transwarp::consume_any_type
The consume_any type. Used for tag dispatch.
Definition: transwarp.h:294
transwarp::value_task::wait
void wait() const override
No-op because a value task never runs.
Definition: transwarp.h:3187
transwarp::detail::assign_task_if
void assign_task_if(Functor &functor, transwarp::itask &task) noexcept
Assigns the task to the given functor if the functor is a subclass of transwarp::functor.
Definition: transwarp.h:1602
transwarp::detail::cancel_visitor
Cancels or resumes the given task.
Definition: transwarp.h:1316
transwarp::detail::task_impl_base::remove_executor
void remove_executor() override
Removes the executor from this task.
Definition: transwarp.h:2337
transwarp::parallel
Executor for parallel execution. Uses a simple thread pool.
Definition: transwarp.h:1934
transwarp::detail::make_future_with_exception
std::shared_future< ResultType > make_future_with_exception(std::exception_ptr exception)
Returns a ready future with the given exception as its state.
Definition: transwarp.h:1625
transwarp::value_task::schedule_all
void schedule_all(transwarp::executor &, bool) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3156
transwarp::transwarp_error
Base class for exceptions.
Definition: transwarp.h:233
transwarp::value_task::cancel_all
void cancel_all(bool) noexcept override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3209
transwarp::detail::spinlock
Definition: transwarp.h:1881
transwarp::parallel::execute
void execute(const std::function< void()> &functor, transwarp::itask &) override
Pushes the functor into the thread pool for asynchronous execution.
Definition: transwarp.h:1954
transwarp::value_task::remove_listener_all
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:3227
transwarp::detail::task_impl_base::was_scheduled
bool was_scheduled() const noexcept override
Returns whether the task was scheduled and not reset afterwards. This means that the underlying futur...
Definition: transwarp.h:2428
transwarp::detail::task_common::name
const transwarp::option_str & name() const noexcept override
The optional task name.
Definition: transwarp.h:2004
transwarp::detail::task_impl_base::schedule_all
void schedule_all() override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2386
transwarp::task_pool::wait_for_next_task
std::shared_ptr< transwarp::task< ResultType > > wait_for_next_task(bool maybe_resize=true)
Just like next_task() but waits for a task to become available. The returned task will always be a va...
Definition: transwarp.h:3465
transwarp::detail::task_impl_base::set_type
void set_type(transwarp::task_type type) noexcept override
Assigns the given type.
Definition: transwarp.h:2631
transwarp::detail::decrement_refcount
void decrement_refcount(const std::tuple< std::shared_ptr< transwarp::task< ParentResults >>... > &parents)
Decrements the refcount of all parents.
Definition: transwarp.h:1005
transwarp::detail::make_future_functor
Definition: transwarp.h:1971
transwarp::detail::task_impl_base::remove_listener_all
void remove_listener_all(transwarp::event_type event, const std::shared_ptr< transwarp::listener > &listener) override
Removes the listener for the given event type only and for all parents.
Definition: transwarp.h:2549
transwarp::value_task::type
transwarp::task_type type() const noexcept override
The task's type.
Definition: transwarp.h:3059
transwarp::detail::parent_visitor
Sets level of a task and increments the child count.
Definition: transwarp.h:1251
transwarp::consume
constexpr transwarp::consume_type consume
The consume task tag.
Definition: transwarp.h:291
transwarp::wait_any
constexpr transwarp::wait_any_type wait_any
The wait_any task tag.
Definition: transwarp.h:303
transwarp::task_impl::task_impl
task_impl(F &&functor, std::shared_ptr< transwarp::task< ParentResults >>... parents)
A task is defined by functor and parent tasks. Note: Don't use this constructor directly,...
Definition: transwarp.h:2926
transwarp::detail::assign_task_if_impl
Definition: transwarp.h:675
transwarp::value_task::parents
std::vector< transwarp::itask * > parents() const override
Empty because a value task doesn't have parents.
Definition: transwarp.h:3242
transwarp::executor::name
virtual std::string name() const =0
Returns the name of the executor.
transwarp::task
The task class.
Definition: transwarp.h:599
transwarp::timer::reset
void reset()
Resets all timing information.
Definition: transwarp.h:3625
transwarp::detail::task_impl_base::schedule_all
void schedule_all(transwarp::executor &executor) override
Schedules all tasks in the graph for execution using the provided executor. The task-specific executo...
Definition: transwarp.h:2394
transwarp::value_task::avg_runtime_us
std::int64_t avg_runtime_us() const noexcept override
Returns -1 as value tasks don't run.
Definition: transwarp.h:3084
transwarp::detail::task_common::set_custom_data
void set_custom_data(transwarp::any_data custom_data) override
Assigns custom data to this task. transwarp will not directly use this. This is only useful if someth...
Definition: transwarp.h:2051
transwarp::detail::schedule_visitor
Schedules using the given executor.
Definition: transwarp.h:1295
transwarp::value_task::schedule_all
void schedule_all(transwarp::executor &) override
No-op because a value task never runs and doesn't have parents.
Definition: transwarp.h:3150
transwarp::detail::task_impl_base::schedule_all
void schedule_all(bool reset_all) override
Schedules all tasks in the graph for execution on the caller thread. The task-specific executors get ...
Definition: transwarp.h:2403
transwarp::detail::task_impl_base::schedule
void schedule(transwarp::executor &executor, bool reset) override
Schedules this task for execution using the provided executor. The task-specific executor gets preced...
Definition: transwarp.h:2378