9 #ifndef TRANSWARP_CPP11
22 #ifndef TRANSWARP_CPP11
30 #include <type_traits>
31 #include <unordered_map>
36 #ifdef TRANSWARP_MINIMUM_TASK_SIZE
38 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
39 #define TRANSWARP_DISABLE_TASK_CUSTOM_DATA
42 #ifndef TRANSWARP_DISABLE_TASK_NAME
43 #define TRANSWARP_DISABLE_TASK_NAME
46 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
47 #define TRANSWARP_DISABLE_TASK_PRIORITY
50 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
51 #define TRANSWARP_DISABLE_TASK_REFCOUNT
54 #ifndef TRANSWARP_DISABLE_TASK_TIME
55 #define TRANSWARP_DISABLE_TASK_TIME
65 #ifdef TRANSWARP_CPP11
71 option_str(std::string str)
72 : str_(std::move(str)),
77 option_str(
const option_str&) =
default;
78 option_str& operator=(
const option_str&) =
default;
79 option_str(option_str&&) =
default;
80 option_str& operator=(option_str&&) =
default;
82 operator bool()
const noexcept {
86 const std::string& operator*()
const noexcept {
101 virtual ~storage() =
default;
102 virtual std::unique_ptr<storage> clone()
const = 0;
103 virtual void destroy(
void* data)
const noexcept = 0;
104 virtual void copy(
const void* src,
void*& dest)
const = 0;
105 virtual void move(
void*& src,
void*& dest)
const noexcept = 0;
110 class storage_impl :
public transwarp::detail::storage {
112 std::unique_ptr<transwarp::detail::storage> clone()
const override {
113 return std::unique_ptr<transwarp::detail::storage>(
new storage_impl);
115 void destroy(
void* data)
const noexcept
override {
116 delete reinterpret_cast<T*
>(data);
118 void copy(
const void* src,
void*& dest)
const override {
119 dest =
new T(*
reinterpret_cast<const T*
>(src));
121 void move(
void*& src,
void*& dest)
const noexcept
override {
138 : storage_(
new transwarp::detail::storage_impl<
typename std::decay<T>::type>),
139 data_(
new typename std::decay<T>::type(std::forward<T>(value)))
142 any_data(
const any_data& other)
143 : storage_(other.storage_ ? other.storage_->clone() :
nullptr)
146 storage_->copy(other.data_, data_);
152 any_data& operator=(
const any_data& other) {
153 if (
this != &other) {
155 storage_->destroy(data_);
157 storage_ = other.storage_ ? other.storage_->clone() :
nullptr;
159 storage_->copy(other.data_, data_);
167 any_data(any_data&& other)
168 : storage_(std::move(other.storage_))
171 storage_->move(other.data_, data_);
177 any_data& operator=(any_data&& other) {
178 if (
this != &other) {
180 storage_->destroy(data_);
182 storage_ = std::move(other.storage_);
184 storage_->move(other.data_, data_);
194 storage_->destroy(data_);
198 bool has_value()
const noexcept {
199 return data_ !=
nullptr;
203 const T& get()
const {
204 return *
reinterpret_cast<const T*
>(data_);
208 std::unique_ptr<transwarp::detail::storage> storage_;
212 using str_view =
const std::string&;
214 using any_data = std::any;
215 using option_str = std::optional<std::string>;
216 using str_view = std::string_view;
236 : std::runtime_error{message}
278 struct root_type : std::integral_constant<transwarp::task_type, transwarp::task_type::root> {};
282 struct accept_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept> {};
286 struct accept_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::accept_any> {};
290 struct consume_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume> {};
294 struct consume_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::consume_any> {};
298 struct wait_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait> {};
302 struct wait_any_type : std::integral_constant<transwarp::task_type, transwarp::task_type::wait_any> {};
328 virtual std::string
name()
const = 0;
373 edge& operator=(
const edge&) =
default;
407 class itask :
public std::enable_shared_from_this<itask> {
409 virtual ~
itask() =
default;
411 virtual void finalize() = 0;
412 virtual std::size_t id()
const noexcept = 0;
413 virtual std::size_t level()
const noexcept = 0;
415 virtual const transwarp::option_str& name()
const noexcept = 0;
416 virtual std::shared_ptr<transwarp::executor>
executor()
const noexcept = 0;
417 virtual std::int64_t priority()
const noexcept = 0;
418 virtual const transwarp::any_data& custom_data()
const noexcept = 0;
419 virtual bool canceled()
const noexcept = 0;
420 virtual std::int64_t avg_idletime_us()
const noexcept = 0;
421 virtual std::int64_t avg_waittime_us()
const noexcept = 0;
422 virtual std::int64_t avg_runtime_us()
const noexcept = 0;
423 virtual void set_executor(std::shared_ptr<transwarp::executor>
executor) = 0;
424 virtual void set_executor_all(std::shared_ptr<transwarp::executor>
executor) = 0;
425 virtual void remove_executor() = 0;
426 virtual void remove_executor_all() = 0;
427 virtual void set_priority(std::int64_t priority) = 0;
428 virtual void set_priority_all(std::int64_t priority) = 0;
429 virtual void reset_priority() = 0;
430 virtual void reset_priority_all() = 0;
431 virtual void set_custom_data(transwarp::any_data custom_data) = 0;
432 virtual void set_custom_data_all(transwarp::any_data custom_data) = 0;
433 virtual void remove_custom_data() = 0;
434 virtual void remove_custom_data_all() = 0;
435 virtual void add_listener(std::shared_ptr<transwarp::listener>
listener) = 0;
437 virtual void add_listener_all(std::shared_ptr<transwarp::listener>
listener) = 0;
439 virtual void remove_listener(
const std::shared_ptr<transwarp::listener>&
listener) = 0;
441 virtual void remove_listener_all(
const std::shared_ptr<transwarp::listener>&
listener) = 0;
443 virtual void remove_listeners() = 0;
445 virtual void remove_listeners_all() = 0;
447 virtual void schedule() = 0;
449 virtual void schedule(
bool reset) = 0;
451 virtual void schedule_all() = 0;
453 virtual void schedule_all(
bool reset_all) = 0;
455 virtual void set_exception(std::exception_ptr exception) = 0;
456 virtual bool was_scheduled()
const noexcept = 0;
457 virtual void wait()
const = 0;
458 virtual bool is_ready()
const = 0;
459 virtual bool has_result()
const = 0;
460 virtual void reset() = 0;
461 virtual void reset_all() = 0;
462 virtual void cancel(
bool enabled) noexcept = 0;
463 virtual void cancel_all(
bool enabled) noexcept = 0;
464 virtual std::vector<itask*> parents()
const = 0;
465 virtual const std::vector<itask*>& tasks() = 0;
466 virtual std::vector<transwarp::edge> edges() = 0;
481 virtual void visit(
const std::function<
void(
itask&)>& visitor) = 0;
482 virtual void unvisit() noexcept = 0;
483 virtual void set_id(std::size_t
id) noexcept = 0;
484 virtual void set_level(std::size_t level) noexcept = 0;
486 virtual void set_name(transwarp::option_str name) noexcept = 0;
487 virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0;
488 virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0;
489 virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0;
490 virtual void increment_childcount() noexcept = 0;
492 virtual void reset_future() = 0;
517 const transwarp::option_str& name =
task.name();
519 s += std::string{
"<"} + *name + std::string{
">"} + separator.data();
522 s += std::string{
" id="} + std::to_string(
task.id());
523 s += std::string{
" lev="} + std::to_string(
task.level());
524 const std::shared_ptr<transwarp::executor> exec =
task.executor();
526 s += separator.data() + std::string{
"<"} + exec->name() + std::string{
">"};
528 const std::int64_t avg_idletime_us =
task.avg_idletime_us();
529 if (avg_idletime_us >= 0) {
530 s += separator.data() + std::string{
"avg-idle-us="} + std::to_string(avg_idletime_us);
532 const std::int64_t avg_waittime_us =
task.avg_waittime_us();
533 if (avg_waittime_us >= 0) {
534 s += separator.data() + std::string{
"avg-wait-us="} + std::to_string(avg_waittime_us);
536 const std::int64_t avg_runtime_us =
task.avg_runtime_us();
537 if (avg_runtime_us >= 0) {
538 s += separator.data() + std::string{
"avg-run-us="} + std::to_string(avg_runtime_us);
553 std::string
to_string(
const std::vector<transwarp::edge>& edges, transwarp::str_view separator=
"\n") {
554 std::string dot = std::string{
"digraph {"} + separator.data();
558 dot += std::string{
"}"};
566 using type =
typename std::remove_const<typename std::remove_reference<T>::type>::type;
573 using type = decltype(std::declval<std::shared_future<T>>().get());
581 template<
typename TaskType>
582 std::shared_ptr<TaskType>
clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<TaskType>& t) {
583 const auto original_task = std::static_pointer_cast<transwarp::itask>(t);
584 const auto task_cache_it = task_cache.find(original_task);
585 if (task_cache_it != task_cache.cend()) {
586 return std::static_pointer_cast<TaskType>(task_cache_it->second);
588 auto cloned_task = t->clone_impl(task_cache);
589 task_cache[original_task] = cloned_task;
598 template<
typename ResultType>
601 using result_type = ResultType;
603 virtual ~
task() =
default;
605 std::shared_ptr<task> clone()
const {
606 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
607 return clone_impl(task_cache);
610 virtual void set_value(
const typename transwarp::decay<result_type>::type& value) = 0;
611 virtual void set_value(
typename transwarp::decay<result_type>::type&& value) = 0;
612 virtual std::shared_future<result_type> future()
const noexcept = 0;
613 virtual typename transwarp::result<result_type>::type get()
const = 0;
617 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
619 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
623 template<
typename ResultType>
626 using result_type = ResultType&;
628 virtual ~
task() =
default;
630 std::shared_ptr<task> clone()
const {
631 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
632 return clone_impl(task_cache);
635 virtual void set_value(
typename transwarp::decay<result_type>::type& value) = 0;
636 virtual std::shared_future<result_type> future()
const noexcept = 0;
637 virtual typename transwarp::result<result_type>::type get()
const = 0;
641 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
643 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
650 using result_type = void;
652 virtual ~
task() =
default;
654 std::shared_ptr<task> clone()
const {
655 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>> task_cache;
656 return clone_impl(task_cache);
659 virtual void set_value() = 0;
660 virtual std::shared_future<result_type> future()
const noexcept = 0;
661 virtual result_type get()
const = 0;
665 friend std::shared_ptr<T>
transwarp::detail::clone_task(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const std::shared_ptr<T>& t);
667 virtual std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const = 0;
691 return *transwarp_task_;
696 return *transwarp_task_;
702 if (transwarp_task_->canceled()) {
724 std::function<
void(std::size_t thread_index)> on_thread_started =
nullptr)
725 : on_thread_started_{std::move(on_thread_started)}
727 if (n_threads == 0) {
730 for (std::size_t i = 0; i < n_threads; ++i) {
733 thread = std::thread(&thread_pool::worker,
this, i);
739 threads_.push_back(std::move(thread));
758 void push(
const std::function<
void()>&
functor) {
760 std::lock_guard<std::mutex> lock{mutex_};
763 cond_var_.notify_one();
768 void worker(
const std::size_t index) {
769 if (on_thread_started_) {
770 on_thread_started_(index);
775 std::unique_lock<std::mutex> lock{mutex_};
776 cond_var_.wait(lock, [
this]{
777 return done_ || !functors_.empty();
779 if (done_ && functors_.empty()) {
782 functor = std::move(functors_.front());
791 std::lock_guard<std::mutex> lock{mutex_};
794 cond_var_.notify_all();
795 for (std::thread& thread : threads_) {
802 std::function<void(std::size_t)> on_thread_started_;
803 std::vector<std::thread> threads_;
804 std::queue<std::function<void()>> functors_;
805 std::condition_variable cond_var_;
810 #ifdef TRANSWARP_CPP11
811 template<std::size_t...>
struct indices {};
813 template<std::size_t...>
struct construct_range;
815 template<std::size_t end, std::size_t idx, std::size_t... i>
816 struct construct_range<end, idx, i...> : construct_range<end, idx + 1, i..., idx> {};
818 template<std::size_t end, std::size_t... i>
819 struct construct_range<end, end, i...> {
820 using type = transwarp::detail::indices<i...>;
823 template<std::
size_t b, std::
size_t e>
825 using type =
typename transwarp::detail::construct_range<e, b>::type;
828 template<
typename Functor,
typename Tuple>
829 void call_with_each_index(transwarp::detail::indices<>, Functor&&, Tuple&&) {}
831 template<std::size_t i, std::size_t... j,
typename Functor,
typename Tuple>
832 void call_with_each_index(transwarp::detail::indices<i, j...>, Functor&& f, Tuple&& t) {
834 transwarp::detail::call_with_each_index(transwarp::detail::indices<j...>{}, std::forward<Functor>(f), std::forward<Tuple>(t));
838 template<
typename Functor,
typename Tuple>
839 void apply_to_each(Functor&& f, Tuple&& t) {
840 #ifdef TRANSWARP_CPP11
841 constexpr std::size_t n = std::tuple_size<typename std::decay<Tuple>::type>::value;
842 using index_t =
typename transwarp::detail::index_range<0, n>::type;
843 transwarp::detail::call_with_each_index(index_t{}, std::forward<Functor>(f), std::forward<Tuple>(t));
845 std::apply([&f](
auto&&... arg){(..., std::forward<Functor>(f)(std::forward<decltype(arg)>(arg)));}, std::forward<Tuple>(t));
849 template<
typename Functor,
typename ElementType>
850 void apply_to_each(Functor&& f,
const std::vector<ElementType>& v) {
851 std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
854 template<
typename Functor,
typename ElementType>
855 void apply_to_each(Functor&& f, std::vector<ElementType>& v) {
856 std::for_each(v.begin(), v.end(), std::forward<Functor>(f));
860 template<
int offset,
typename... ParentResults>
862 static void work(
const std::tuple<std::shared_ptr<
transwarp::task<ParentResults>>...>& source, std::tuple<std::shared_future<ParentResults>...>& target) {
863 std::get<offset>(target) = std::get<offset>(source)->future();
868 template<
typename... ParentResults>
874 template<
typename... ParentResults>
876 std::tuple<std::shared_future<ParentResults>...>
result;
882 template<
typename ParentResultType>
884 std::vector<std::shared_future<ParentResultType>>
result;
885 result.reserve(input.size());
894 template<
typename Result,
typename Task,
typename... Args>
895 Result
run_task(std::size_t task_id,
const std::weak_ptr<Task>&
task, Args&&... args) {
896 const std::shared_ptr<Task> t =
task.lock();
904 return (*t->functor_)(std::forward<Args>(args)...);
910 void operator()(
const T& p)
const {
916 template<
typename... ParentResults>
922 template<
typename ParentResultType>
928 template<
typename Parent>
929 Parent wait_for_any_impl() {
933 template<
typename Parent,
typename ParentResult,
typename... ParentResults>
935 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
936 if (status == std::future_status::ready) {
939 return transwarp::detail::wait_for_any_impl<Parent>(parents...);
943 template<
typename Parent,
typename... ParentResults>
946 Parent parent = transwarp::detail::wait_for_any_impl<Parent>(
parents...);
955 template<
typename ParentResultType>
959 const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1));
960 if (status == std::future_status::ready) {
968 template<
typename OneResult>
974 void operator()(
const T& parent)
const {
975 if (one_ != parent) {
976 parent->cancel(
true);
980 const std::shared_ptr<transwarp::task<OneResult>>& one_;
984 template<
typename OneResult,
typename... ParentResults>
990 template<
typename OneResult,
typename ParentResultType>
998 void operator()(
const T&
task)
const {
999 task->decrement_refcount();
1004 template<
typename... ParentResults>
1010 template<
typename ParentResultType>
1016 template<
typename TaskType,
bool done,
int total,
int... n>
1018 template<
typename Result,
typename Task,
typename... ParentResults>
1025 template<
typename TaskType>
1028 template<
int total,
int... n>
1030 template<
typename Result,
typename Task,
typename... ParentResults>
1032 return transwarp::detail::run_task<Result>(task_id,
task);
1038 template<
typename Result,
typename Task,
typename ParentResultType>
1040 return transwarp::detail::run_task<Result>(task_id,
task);
1044 template<
int total,
int... n>
1046 template<
typename Result,
typename Task,
typename... ParentResults>
1051 return transwarp::detail::run_task<Result>(task_id,
task, std::get<n>(futures)...);
1057 template<
typename Result,
typename Task,
typename ParentResultType>
1062 return transwarp::detail::run_task<Result>(task_id,
task, std::move(futures));
1066 template<
int total,
int... n>
1068 template<
typename Result,
typename Task,
typename... ParentResults>
1070 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
1071 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
1073 auto future = parent->future();
1075 return transwarp::detail::run_task<Result>(task_id,
task, std::move(future));
1081 template<
typename Result,
typename Task,
typename ParentResultType>
1085 auto future = parent->future();
1087 return transwarp::detail::run_task<Result>(task_id,
task, std::move(future));
1091 template<
int total,
int... n>
1093 template<
typename Result,
typename Task,
typename... ParentResults>
1098 return transwarp::detail::run_task<Result>(task_id,
task, std::get<n>(futures).get()...);
1104 template<
typename Result,
typename Task,
typename ParentResultType>
1109 std::vector<ParentResultType> results;
1110 results.reserve(futures.size());
1111 for (
const std::shared_future<ParentResultType>& future : futures) {
1112 results.emplace_back(future.get());
1114 return transwarp::detail::run_task<Result>(task_id,
task, std::move(results));
1118 template<
int total,
int... n>
1120 template<
typename Result,
typename Task,
typename... ParentResults>
1122 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
1123 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
1125 const auto future = parent->future();
1127 return transwarp::detail::run_task<Result>(task_id,
task, future.get());
1133 template<
typename Result,
typename Task,
typename ParentResultType>
1137 const auto future = parent->future();
1139 return transwarp::detail::run_task<Result>(task_id,
task, future.get());
1144 template<
typename T>
1145 void operator()(
const std::shared_future<T>& f)
const {
1150 template<
int total,
int... n>
1152 template<
typename Result,
typename Task,
typename... ParentResults>
1158 return transwarp::detail::run_task<Result>(task_id,
task);
1164 template<
typename Result,
typename Task,
typename ParentResultType>
1170 return transwarp::detail::run_task<Result>(task_id,
task);
1174 template<
int total,
int... n>
1176 template<
typename Result,
typename Task,
typename... ParentResults>
1178 using parent_t =
typename std::remove_reference<decltype(std::get<0>(
parents))>::type;
1179 parent_t parent = transwarp::detail::wait_for_any<parent_t>(std::get<n>(
parents)...);
1181 const auto future = parent->future();
1184 return transwarp::detail::run_task<Result>(task_id,
task);
1190 template<
typename Result,
typename Task,
typename ParentResultType>
1194 const auto future = parent->future();
1197 return transwarp::detail::run_task<Result>(task_id,
task);
1204 template<
typename TaskType,
typename Result,
typename Task,
typename... ParentResults>
1206 constexpr std::size_t n = std::tuple_size<std::tuple<std::shared_future<ParentResults>...>>::value;
1214 template<
typename TaskType,
typename Result,
typename Task,
typename ParentResultType>
1221 template<
typename Functor>
1226 template<
typename T>
1227 void operator()(
const T&
task)
const {
1238 template<
typename Functor,
typename... ParentResults>
1244 template<
typename Functor,
typename ParentResultType>
1256 if (task_.level() <=
task.level()) {
1258 task_.set_level(
task.level() + 1);
1260 task.increment_childcount();
1268 explicit final_visitor(std::vector<transwarp::itask*>& tasks) noexcept
1272 tasks_.push_back(&
task);
1276 std::vector<transwarp::itask*>& tasks_;
1277 std::size_t id_ = 0;
1282 explicit edges_visitor(std::vector<transwarp::edge>& edges) noexcept
1287 edges_.emplace_back(*parent,
task);
1291 std::vector<transwarp::edge>& edges_;
1297 : reset_(reset), executor_(
executor) {}
1300 task.schedule_impl(reset_, executor_);
1318 : enabled_{enabled} {}
1321 task.cancel(enabled_);
1330 : executor_{std::move(
executor)} {}
1333 task.set_executor(executor_);
1336 std::shared_ptr<transwarp::executor> executor_;
1343 task.remove_executor();
1350 : priority_{priority} {}
1353 task.set_priority(priority_);
1356 std::int64_t priority_;
1363 task.reset_priority();
1370 : custom_data_{std::move(custom_data)} {}
1373 task.set_custom_data(custom_data_);
1376 transwarp::any_data custom_data_;
1383 task.remove_custom_data();
1393 tasks_.push_back(&
task);
1396 std::vector<transwarp::itask*>& tasks_;
1406 task.add_listener(listener_);
1409 std::shared_ptr<transwarp::listener> listener_;
1415 : event_(event), listener_(std::move(
listener))
1419 task.add_listener(event_, listener_);
1423 std::shared_ptr<transwarp::listener> listener_;
1433 task.remove_listener(listener_);
1436 std::shared_ptr<transwarp::listener> listener_;
1442 : event_(event), listener_(std::move(
listener))
1446 task.remove_listener(event_, listener_);
1450 std::shared_ptr<transwarp::listener> listener_;
1457 task.remove_listeners();
1469 task.remove_listeners(event_);
1478 : visitor_(visitor) {}
1481 task.visit(visitor_);
1496 template<
typename TaskType,
typename Functor,
typename... ParentResults>
1498 static_assert(std::is_same<TaskType, transwarp::root_type>::value ||
1499 std::is_same<TaskType, transwarp::accept_type>::value ||
1500 std::is_same<TaskType, transwarp::accept_any_type>::value ||
1501 std::is_same<TaskType, transwarp::consume_type>::value ||
1502 std::is_same<TaskType, transwarp::consume_any_type>::value ||
1503 std::is_same<TaskType, transwarp::wait_type>::value ||
1504 std::is_same<TaskType, transwarp::wait_any_type>::value,
1505 "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any");
1508 template<
typename Functor,
typename... ParentResults>
1510 static_assert(
sizeof...(ParentResults) == 0,
"A root task cannot have parent tasks");
1511 using type = decltype(std::declval<Functor>()());
1514 template<
typename Functor,
typename... ParentResults>
1516 static_assert(
sizeof...(ParentResults) > 0,
"An accept task must have at least one parent");
1517 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResults>>()...));
1520 template<
typename Functor,
typename ParentResultType>
1522 using type = decltype(std::declval<Functor>()(std::declval<std::vector<std::shared_future<ParentResultType>>>()));
1525 template<
typename Functor,
typename... ParentResults>
1527 static_assert(
sizeof...(ParentResults) > 0,
"An accept_any task must have at least one parent");
1528 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1529 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<arg_t>>()));
1532 template<
typename Functor,
typename ParentResultType>
1534 using type = decltype(std::declval<Functor>()(std::declval<std::shared_future<ParentResultType>>()));
1537 template<
typename Functor,
typename... ParentResults>
1539 static_assert(
sizeof...(ParentResults) > 0,
"A consume task must have at least one parent");
1540 using type = decltype(std::declval<Functor>()(std::declval<ParentResults>()...));
1543 template<
typename Functor,
typename ParentResultType>
1545 using type = decltype(std::declval<Functor>()(std::declval<std::vector<ParentResultType>>()));
1548 template<
typename Functor,
typename... ParentResults>
1550 static_assert(
sizeof...(ParentResults) > 0,
"A consume_any task must have at least one parent");
1551 using arg_t =
typename std::tuple_element<0, std::tuple<ParentResults...>>::type;
1552 using type = decltype(std::declval<Functor>()(std::declval<arg_t>()));
1555 template<
typename Functor,
typename ParentResultType>
1557 using type = decltype(std::declval<Functor>()(std::declval<ParentResultType>()));
1560 template<
typename Functor,
typename... ParentResults>
1562 static_assert(
sizeof...(ParentResults) > 0,
"A wait task must have at least one parent");
1563 using type = decltype(std::declval<Functor>()());
1566 template<
typename Functor,
typename ParentResultType>
1568 using type = decltype(std::declval<Functor>()());
1571 template<
typename Functor,
typename... ParentResults>
1573 static_assert(
sizeof...(ParentResults) > 0,
"A wait_any task must have at least one parent");
1574 using type = decltype(std::declval<Functor>()());
1577 template<
typename Functor,
typename ParentResultType>
1579 using type = decltype(std::declval<Functor>()());
1583 template<
bool is_transwarp_functor>
1588 template<
typename Functor>
1594 template<
typename Functor>
1601 template<
typename Functor>
1608 template<
typename ResultType,
typename Value>
1610 std::promise<ResultType> promise;
1611 promise.set_value(std::forward<Value>(value));
1612 return promise.get_future();
1618 std::promise<void> promise;
1619 promise.set_value();
1620 return promise.get_future();
1624 template<
typename ResultType>
1629 std::promise<ResultType> promise;
1630 promise.set_exception(exception);
1631 return promise.get_future();
1636 explicit clone_task_functor(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache) noexcept
1637 : task_cache_(task_cache) {}
1639 template<
typename T>
1640 void operator()(T& t) {
1644 std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache_;
1652 template<
typename T>
1653 void operator()(T& t) {
1654 tasks_.push_back(t.get());
1657 std::vector<transwarp::itask*>& tasks_;
1662 template<
typename... ParentResults>
1664 using type = std::tuple<std::shared_ptr<transwarp::task<ParentResults>>...>;
1665 static std::size_t size(
const type&) {
1666 return std::tuple_size<type>::value;
1668 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1673 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1674 std::vector<transwarp::itask*> tasks;
1681 template<
typename ParentResultType>
1682 struct parents<std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>> {
1683 using type = std::vector<std::shared_ptr<transwarp::task<ParentResultType>>>;
1684 static std::size_t size(
const type& obj) {
1687 static type clone(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache,
const type& obj) {
1692 static std::vector<transwarp::itask*> tasks(
const type&
parents) {
1693 std::vector<transwarp::itask*> tasks;
1700 template<
typename ResultType,
typename TaskType>
1704 template<
typename Task,
typename Parents>
1705 void call(std::size_t task_id,
1706 const std::weak_ptr<Task>&
task,
1708 promise_.set_value(transwarp::detail::call<TaskType, ResultType>(task_id,
task,
parents));
1711 std::promise<ResultType> promise_;
1714 template<
typename TaskType>
1718 template<
typename Task,
typename Parents>
1719 void call(std::size_t task_id,
1720 const std::weak_ptr<Task>&
task,
1722 transwarp::detail::call<TaskType, void>(task_id,
task,
parents);
1723 promise_.set_value();
1726 std::promise<void> promise_;
1730 template<
typename ResultType,
typename TaskType,
typename Task,
typename Parents>
1734 runner(std::size_t task_id,
1735 const std::weak_ptr<Task>&
task,
1736 const typename transwarp::decay<Parents>::type&
parents)
1737 : task_id_(task_id),
1742 std::future<ResultType> future() {
1743 return this->promise_.get_future();
1747 if (
const std::shared_ptr<Task> t = task_.lock()) {
1751 this->call(task_id_, task_, parents_);
1753 this->promise_.set_exception(std::current_exception());
1754 if (
const std::shared_ptr<Task> t = task_.lock()) {
1758 this->promise_.set_exception(std::current_exception());
1760 if (
const std::shared_ptr<Task> t = task_.lock()) {
1766 const std::size_t task_id_;
1767 const std::weak_ptr<Task> task_;
1768 const typename transwarp::decay<Parents>::type parents_;
1775 template<
typename ValueType>
1779 static_assert(std::is_default_constructible<ValueType>::value,
"ValueType must be default constructible");
1781 using value_type = ValueType;
1801 template<typename T, typename = typename std::enable_if<std::is_same<typename std::decay<T>::type, value_type>::value>::type>
1803 data_[end_] = std::forward<T>(value);
1810 return data_[front_];
1816 data_[front_] = ValueType{};
1823 return data_.size();
1839 return size_ == data_.size();
1844 std::swap(end_, buffer.end_);
1845 std::swap(front_, buffer.front_);
1846 std::swap(size_, buffer.size_);
1847 std::swap(data_, buffer.data_);
1852 void increment_or_wrap(std::size_t& value)
const {
1853 if (value == data_.size() - 1) {
1861 increment_or_wrap(end_);
1863 increment_or_wrap(front_);
1870 increment_or_wrap(front_);
1875 std::size_t front_{};
1876 std::size_t size_{};
1877 std::vector<value_type> data_;
1884 void lock() noexcept {
1885 while (locked_.test_and_set(std::memory_order_acquire));
1888 void unlock() noexcept {
1889 locked_.clear(std::memory_order_release);
1893 std::atomic_flag locked_ = ATOMIC_FLAG_INIT;
1902 void operator()()
const noexcept {}
1922 std::string
name()
const override {
1923 return "transwarp::sequential";
1937 explicit parallel(
const std::size_t n_threads,
1938 std::function<
void(std::size_t thread_index)> on_thread_started =
nullptr)
1939 : pool_{n_threads, std::move(on_thread_started)}
1949 std::string
name()
const override {
1950 return "transwarp::parallel";
1966 const transwarp::option_str nullopt_string;
1967 const transwarp::any_data any_empty;
1970 template<
typename ResultType,
bool is_
void>
1973 template<
typename ResultType>
1975 template<
typename Future,
typename OtherFuture>
1976 void operator()(Future& future,
const OtherFuture& other)
const {
1982 template<
typename ResultType>
1984 template<
typename Future,
typename OtherFuture>
1985 void operator()(Future& future,
const OtherFuture& other)
const {
1986 future = transwarp::detail::make_future_with_value<ResultType>(other.get());
1992 template<
typename ResultType>
1996 using result_type = ResultType;
1999 std::size_t
id() const noexcept
override {
2004 const transwarp::option_str&
name() const noexcept
override {
2005 #ifndef TRANSWARP_DISABLE_TASK_NAME
2008 return transwarp::detail::nullopt_string;
2014 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2023 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2024 return custom_data_;
2026 return transwarp::detail::any_empty;
2033 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2043 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2052 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2066 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2074 std::shared_future<result_type>
future() const noexcept
override {
2082 ensure_listeners_object();
2083 for (
int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2092 ensure_listeners_object();
2093 (*listeners_)[event].push_back(std::move(
listener));
2103 for (
int i=0; i<static_cast<int>(transwarp::event_type::count); ++i) {
2105 if (listeners_pair != listeners_->end()) {
2106 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2107 l.erase(std::remove(l.begin(), l.end(),
listener), l.end());
2119 auto listeners_pair = listeners_->find(event);
2120 if (listeners_pair != listeners_->end()) {
2121 std::vector<std::shared_ptr<transwarp::listener>>& l = listeners_pair->second;
2122 l.erase(std::remove(l.begin(), l.end(),
listener), l.end());
2132 listeners_->clear();
2141 auto listeners_pair = listeners_->find(event);
2142 if (listeners_pair != listeners_->end()) {
2143 listeners_pair->second.clear();
2149 using listeners_t = std::map<transwarp::event_type, std::vector<std::shared_ptr<transwarp::listener>>>;
2150 using tasks_t = std::vector<transwarp::itask*>;
2154 if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) {
2164 auto listeners_pair = listeners_->find(event);
2165 if (listeners_pair != listeners_->end()) {
2166 for (
const std::shared_ptr<transwarp::listener>&
listener : listeners_pair->second) {
2179 void ensure_listeners_object() {
2181 listeners_.reset(
new listeners_t);
2186 void set_id(std::size_t
id) noexcept
override {
2192 #ifndef TRANSWARP_DISABLE_TASK_NAME
2193 name_ = std::move(
name);
2201 #ifndef TRANSWARP_DISABLE_TASK_NAME
2204 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2205 priority_ =
task.priority_;
2207 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2208 custom_data_ =
task.custom_data_;
2210 if (
task.has_result()) {
2214 future_ = transwarp::detail::make_future_with_exception<result_type>(std::current_exception());
2217 visited_ = task.visited_;
2218 if (task.listeners_) {
2219 listeners_.reset(
new listeners_t(*task.listeners_));
2223 std::size_t id_ = 0;
2224 #ifndef TRANSWARP_DISABLE_TASK_NAME
2225 transwarp::option_str name_;
2227 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2228 std::int64_t priority_ = 0;
2230 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2231 transwarp::any_data custom_data_;
2233 std::shared_future<result_type> future_;
2234 bool visited_ =
false;
2235 std::unique_ptr<listeners_t> listeners_;
2236 std::unique_ptr<tasks_t> tasks_;
2242 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2249 using result_type = ResultType;
2256 if (!this->tasks_) {
2257 this->tasks_.reset(
new typename transwarp::detail::task_common<result_type>::tasks_t);
2261 const std::size_t l_level = l->level();
2262 const std::size_t l_id = l->id();
2263 const std::size_t r_level = r->level();
2264 const std::size_t r_id = r->id();
2265 return std::tie(l_level, l_id) < std::tie(r_level, r_id);
2267 std::sort(this->tasks_->begin(), this->tasks_->end(), compare);
2272 std::size_t
level() const noexcept
override {
2282 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
2288 return canceled_.load();
2293 #ifndef TRANSWARP_DISABLE_TASK_TIME
2294 return avg_idletime_us_.load();
2302 #ifndef TRANSWARP_DISABLE_TASK_TIME
2303 return avg_waittime_us_.load();
2311 #ifndef TRANSWARP_DISABLE_TASK_TIME
2312 return avg_runtime_us_.load();
2421 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
2422 schedule_mode_ =
false;
2429 return this->future_.valid();
2436 this->future_.wait();
2443 return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2448 return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
2454 this->future_ = std::shared_future<result_type>{};
2456 schedule_mode_ =
true;
2457 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2458 refcount_ = childcount_;
2473 void cancel(
bool enabled) noexcept
override {
2474 canceled_ = enabled;
2488 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2499 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
2509 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2520 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
2570 std::vector<transwarp::itask*>
parents()
const override {
2575 const std::vector<transwarp::itask*>&
tasks()
override {
2577 return *this->tasks_;
2583 std::vector<transwarp::edge>
edges()
override {
2584 std::vector<transwarp::edge>
edges;
2594 template<
typename F>
2596 : functor_(new Functor(std::forward<F>(
functor))),
2597 parents_(std::move(
parents)...)
2602 template<
typename F,
typename P>
2604 : functor_(new Functor(std::forward<F>(functor))),
2608 if (parents_.empty()) {
2619 template<
typename R,
typename Y,
typename T,
typename P>
2622 template<
typename R,
typename T,
typename... A>
2637 #ifndef TRANSWARP_DISABLE_TASK_TIME
2638 avg_idletime_us_ = idletime;
2646 #ifndef TRANSWARP_DISABLE_TASK_TIME
2647 avg_waittime_us_ = waittime;
2655 #ifndef TRANSWARP_DISABLE_TASK_TIME
2656 avg_runtime_us_ = runtime;
2664 if (!this->future_.valid()) {
2674 if (schedule_mode_ && (
reset || !this->future_.valid())) {
2678 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2679 refcount_ = childcount_;
2681 std::weak_ptr<task_impl_base>
self = std::static_pointer_cast<task_impl_base>(this->shared_from_this());
2683 std::shared_ptr<runner_t>
runner = std::shared_ptr<runner_t>(
new runner_t(this->
id(),
self, parents_));
2685 this->future_ =
runner->future();
2687 if (this->executor_) {
2688 this->executor_->execute([
runner]{ (*runner)(); }, *
this);
2708 if (!this->visited_) {
2711 this->visited_ =
true;
2717 if (this->visited_) {
2718 this->visited_ =
false;
2724 template<
typename Visitor>
2732 void increment_childcount() noexcept
override {
2733 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2739 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2740 if (--refcount_ == 0) {
2746 void reset_future()
override {
2747 this->future_ = std::shared_future<result_type>{};
2751 std::size_t level_ = 0;
2753 std::shared_ptr<transwarp::executor> executor_;
2754 std::atomic<bool> canceled_{
false};
2755 bool schedule_mode_ =
true;
2756 #ifndef TRANSWARP_DISABLE_TASK_TIME
2757 std::atomic<std::int64_t> avg_idletime_us_{-1};
2758 std::atomic<std::int64_t> avg_waittime_us_{-1};
2759 std::atomic<std::int64_t> avg_runtime_us_{-1};
2761 std::unique_ptr<Functor> functor_;
2763 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2764 std::size_t childcount_ = 0;
2765 std::atomic<std::size_t> refcount_{0};
2771 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2778 using result_type = ResultType;
2782 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
2784 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2785 this->schedule_mode_ =
false;
2791 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
2793 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
2794 this->schedule_mode_ =
false;
2801 typename transwarp::result<result_type>::type
get()
const override {
2803 return this->future_.get();
2810 template<
typename F>
2815 template<
typename F,
typename P>
2823 template<
typename ResultType,
typename TaskType,
typename Functor,
typename... ParentResults>
2830 using result_type = ResultType&;
2834 void set_value(
typename transwarp::decay<result_type>::type& value)
override {
2835 this->ensure_task_not_running();
2836 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
2837 this->schedule_mode_ =
false;
2844 typename transwarp::result<result_type>::type
get()
const override {
2845 this->ensure_task_was_scheduled();
2846 return this->future_.get();
2853 template<
typename F>
2858 template<
typename F,
typename P>
2860 :
transwarp::detail::task_impl_base<result_type,
task_type, Functor, ParentResults...>(std::forward<F>(
functor), std::move(parents))
2866 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2873 using result_type = void;
2880 this->schedule_mode_ =
false;
2889 this->future_.get();
2896 template<
typename F>
2901 template<
typename F,
typename P>
2914 template<
typename TaskType,
typename Functor,
typename... ParentResults>
2925 template<
typename F>
2932 template<
typename F,
typename P>
2945 #ifndef TRANSWARP_DISABLE_TASK_NAME
2946 #ifdef TRANSWARP_CPP11
2947 this->
set_name(transwarp::option_str{std::move(
name)});
2954 return std::static_pointer_cast<task_impl>(this->shared_from_this());
2958 template<
typename TaskType_,
typename Functor_>
2959 auto then(TaskType_, Functor_&&
functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type,
result_type>> {
2961 return std::shared_ptr<task_t>(
new task_t(std::forward<Functor_>(
functor), std::static_pointer_cast<task_impl>(this->shared_from_this())));
2966 return std::static_pointer_cast<task_impl>(this->clone());
2973 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>& task_cache)
const override {
2974 auto t = std::shared_ptr<task_impl>{
new task_impl};
2975 t->copy_from(*
this);
2976 t->level_ = this->level_;
2977 t->type_ = this->type_;
2978 t->executor_ = this->executor_;
2979 t->canceled_ = this->canceled_.load();
2980 t->schedule_mode_ = this->schedule_mode_;
2981 #ifndef TRANSWARP_DISABLE_TASK_TIME
2982 t->avg_idletime_us_ = this->avg_idletime_us_.load();
2983 t->avg_waittime_us_ = this->avg_waittime_us_.load();
2984 t->avg_runtime_us_ = this->avg_runtime_us_.load();
2986 t->functor_.reset(
new Functor(*this->functor_));
2988 t->executor_ = this->executor_;
2989 #ifndef TRANSWARP_DISABLE_TASK_REFCOUNT
2990 t->childcount_ = this->childcount_;
3000 template<
typename ResultType>
3007 using result_type = ResultType;
3011 template<
typename T>
3014 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::forward<T>(value));
3015 this->tasks_.reset(
new typename transwarp::detail::task_common<result_type>::tasks_t{
this});
3026 #ifndef TRANSWARP_DISABLE_TASK_NAME
3027 #ifdef TRANSWARP_CPP11
3028 this->
set_name(transwarp::option_str{std::move(
name)});
3035 return std::static_pointer_cast<value_task>(this->shared_from_this());
3039 template<
typename TaskType_,
typename Functor_>
3040 auto then(TaskType_, Functor_&&
functor) -> std::shared_ptr<transwarp::task_impl<TaskType_, typename std::decay<Functor_>::type, result_type>> {
3042 return std::shared_ptr<task_t>(
new task_t(std::forward<Functor_>(
functor), std::static_pointer_cast<value_task>(this->shared_from_this())));
3047 return std::static_pointer_cast<value_task>(this->clone());
3054 std::size_t
level() const noexcept
override {
3064 std::shared_ptr<transwarp::executor>
executor() const noexcept
override {
3103 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3112 #ifndef TRANSWARP_DISABLE_TASK_PRIORITY
3120 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3129 #ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA
3159 void set_value(
const typename transwarp::decay<result_type>::type& value)
override {
3160 this->future_ = transwarp::detail::make_future_with_value<result_type>(value);
3165 void set_value(
typename transwarp::decay<result_type>::type&& value)
override {
3166 this->future_ = transwarp::detail::make_future_with_value<result_type>(std::move(value));
3172 this->future_ = transwarp::detail::make_future_with_exception<result_type>(exception);
3177 typename transwarp::result<result_type>::type
get()
const override {
3178 return this->future_.get();
3242 std::vector<transwarp::itask*>
parents()
const override {
3247 const std::vector<transwarp::itask*>&
tasks()
override {
3248 return *this->tasks_;
3252 std::vector<transwarp::edge>
edges()
override {
3260 this->tasks_.reset(
new typename transwarp::detail::task_common<result_type>::tasks_t{
this});
3263 std::shared_ptr<transwarp::task<result_type>> clone_impl(std::unordered_map<std::shared_ptr<transwarp::itask>, std::shared_ptr<transwarp::itask>>&)
const override {
3264 auto t = std::shared_ptr<value_task>(
new value_task);
3265 t->copy_from(*
this);
3270 void set_level(std::size_t) noexcept
override {}
3276 void set_avg_idletime_us(std::int64_t) noexcept
override {}
3279 void set_avg_waittime_us(std::int64_t) noexcept
override {}
3282 void set_avg_runtime_us(std::int64_t) noexcept
override {}
3288 void visit(
const std::function<
void(
transwarp::itask&)>& visitor)
override {
3289 if (!this->visited_) {
3291 this->visited_ =
true;
3296 void unvisit() noexcept
override {
3297 this->visited_ =
false;
3300 void increment_childcount() noexcept
override {}
3302 void decrement_refcount()
override {}
3304 void reset_future()
override {}
3310 template<
typename TaskType,
typename Functor,
typename... Parents>
3311 auto make_task(TaskType, Functor&&
functor, std::shared_ptr<Parents>... parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type,
typename Parents::result_type...>> {
3313 return std::shared_ptr<task_t>(
new task_t(std::forward<Functor>(
functor), std::move(parents)...));
3318 template<
typename TaskType,
typename Functor,
typename ParentType>
3319 auto make_task(TaskType, Functor&&
functor, std::vector<ParentType> parents) -> std::shared_ptr<transwarp::task_impl<TaskType, typename std::decay<Functor>::type, std::vector<ParentType>>> {
3321 return std::shared_ptr<task_t>(
new task_t(std::forward<Functor>(
functor), std::move(parents)));
3326 template<
typename Value>
3327 auto make_value_task(Value&& value) -> std::shared_ptr<transwarp::value_task<typename transwarp::decay<Value>::type>> {
3329 return std::shared_ptr<task_t>(
new task_t(std::forward<Value>(value)));
3336 template<
typename InputIt,
typename UnaryOperation>
3337 auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3338 const auto distance = std::distance(first, last);
3339 if (distance <= 0) {
3342 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3343 tasks.reserve(
static_cast<std::size_t
>(distance));
3344 for (; first != last; ++first) {
3354 template<
typename InputIt,
typename UnaryOperation>
3355 auto for_each(
transwarp::executor&
executor, InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3365 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
3366 auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3367 const auto distance = std::distance(first1, last1);
3368 if (distance <= 0) {
3371 std::vector<std::shared_ptr<transwarp::task<void>>> tasks;
3372 tasks.reserve(
static_cast<std::size_t
>(distance));
3373 for (; first1 != last1; ++first1, ++d_first) {
3383 template<
typename InputIt,
typename OutputIt,
typename UnaryOperation>
3384 auto transform(
transwarp::executor&
executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr<transwarp::task_impl<transwarp::wait_type, transwarp::no_op_functor, std::vector<std::shared_ptr<transwarp::task<void>>>>> {
3392 template<
typename ResultType>
3400 : task_(std::move(
task)),
3408 if (minimum_ > maximum_) {
3412 for (std::size_t i=0; i<minimum_; ++i) {
3413 idle_.push(task_->clone());
3433 std::shared_ptr<transwarp::task<ResultType>>
next_task(
bool maybe_resize=
true) {
3436 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3437 if (!finished_.
empty()) {
3438 finished_task = finished_.
front(); finished_.
pop();
3442 std::shared_ptr<transwarp::task<ResultType>>
task;
3443 if (finished_task) {
3444 task = busy_.find(finished_task)->second;
3446 if (maybe_resize && idle_.empty()) {
3449 if (idle_.empty()) {
3452 task = idle_.front(); idle_.pop();
3456 auto future =
task->future();
3457 if (future.valid()) {
3467 std::shared_ptr<transwarp::task<ResultType>> g =
next_task(maybe_resize);
3476 return idle_.size() + busy_.size();
3491 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3492 return idle_.size() + finished_.
size();
3497 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3498 return busy_.size() - finished_.
size();
3504 if (new_size >
size()) {
3505 const std::size_t count = new_size -
size();
3506 for (std::size_t i=0; i<count; ++i) {
3507 if (
size() == maximum_) {
3510 idle_.push(task_->clone());
3512 }
else if (new_size <
size()) {
3513 const std::size_t count =
size() - new_size;
3514 for (std::size_t i=0; i<count; ++i) {
3515 if (idle_.empty() ||
size() == minimum_) {
3525 decltype(finished_) finished{finished_.
capacity()};
3527 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3528 finished_.
swap(finished);
3530 while (!finished.empty()) {
3532 const auto it = busy_.find(
task);
3533 idle_.push(it->second);
3550 std::lock_guard<transwarp::detail::spinlock> lock{pool_.spinlock_};
3555 task_pool<ResultType>& pool_;
3558 std::shared_ptr<transwarp::task<ResultType>> task_;
3559 std::size_t minimum_;
3560 std::size_t maximum_;
3563 std::queue<std::shared_ptr<transwarp::task<ResultType>>> idle_;
3564 std::unordered_map<const transwarp::itask*, std::shared_ptr<transwarp::task<ResultType>>> busy_;
3565 std::shared_ptr<transwarp::listener> listener_{
new finished_listener(*
this)};
3587 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3588 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3589 auto& track = tracks_[&
task];
3590 track.startidle = now;
3594 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3595 track_idletime(
task, now);
3596 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3597 auto& track = tracks_[&
task];
3598 track.startwait = now;
3602 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3603 track_waittime(
task, now);
3607 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3608 track_waittime(
task, now);
3609 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3610 auto& track = tracks_[&
task];
3611 track.running =
true;
3612 track.startrun = now;
3616 const std::chrono::time_point<std::chrono::steady_clock> now = std::chrono::steady_clock::now();
3617 track_runtime(
task, now);
3626 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3632 void track_idletime(
transwarp::itask&
task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3633 std::int64_t avg_idletime_us;
3635 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3636 auto& track = tracks_[&task];
3637 track.idletime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startidle).count();
3639 avg_idletime_us =
static_cast<std::int64_t
>(track.idletime / track.idlecount);
3641 task.set_avg_idletime_us(avg_idletime_us);
3644 void track_waittime(
transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3645 std::int64_t avg_waittime_us;
3647 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3648 auto& track = tracks_[&task];
3649 track.waittime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startwait).count();
3651 avg_waittime_us =
static_cast<std::int64_t
>(track.waittime / track.waitcount);
3653 task.set_avg_waittime_us(avg_waittime_us);
3656 void track_runtime(
transwarp::itask& task,
const std::chrono::time_point<std::chrono::steady_clock>& now) {
3657 std::int64_t avg_runtime_us;
3659 std::lock_guard<transwarp::detail::spinlock> lock{spinlock_};
3660 auto& track = tracks_[&task];
3661 if (!track.running) {
3664 track.running =
false;
3665 track.runtime += std::chrono::duration_cast<std::chrono::microseconds>(now - track.startrun).count();
3667 avg_runtime_us =
static_cast<std::int64_t
>(track.runtime / track.runcount);
3669 task.set_avg_runtime_us(avg_runtime_us);
3673 bool running =
false;
3674 std::chrono::time_point<std::chrono::steady_clock> startidle;
3675 std::chrono::time_point<std::chrono::steady_clock> startwait;
3676 std::chrono::time_point<std::chrono::steady_clock> startrun;
3677 std::chrono::microseconds::rep idletime = 0;
3678 std::chrono::microseconds::rep idlecount = 0;
3679 std::chrono::microseconds::rep waittime = 0;
3680 std::chrono::microseconds::rep waitcount = 0;
3681 std::chrono::microseconds::rep runtime = 0;
3682 std::chrono::microseconds::rep runcount = 0;
3686 std::unordered_map<const transwarp::itask*, track> tracks_;
3712 executor_->execute([&
task]{
task.reset_future(); },
task);
3714 task.reset_future();
3720 std::shared_ptr<transwarp::executor> executor_;