Taskflow  2.7.0
observer.hpp
1 #pragma once
2 
3 #include "task.hpp"
4 
5 namespace tf {
6 
17 
18  friend class Executor;
19 
20  public:
21 
25  virtual ~ObserverInterface() = default;
26 
31  virtual void set_up(size_t num_workers) = 0;
32 
38  virtual void on_entry(size_t worker_id, TaskView task_view) = 0;
39 
45  virtual void on_exit(size_t worker_id, TaskView task_view) = 0;
46 };
47 
48 // ----------------------------------------------------------------------------
49 // ChromeObserver definition
50 // ----------------------------------------------------------------------------
51 
59 
60  friend class Executor;
61 
62  // data structure to record each task execution
63  struct Segment {
64 
65  std::string name;
66 
69 
70  Segment(
71  const std::string& n,
73  );
74 
75  Segment(
76  const std::string& n,
79  );
80  };
81 
82  // data structure to store the entire execution timeline
83  struct Timeline {
87  };
88 
89  public:
90 
95  inline void dump(std::ostream& ostream) const;
96 
101  inline std::string dump() const;
102 
106  inline void clear();
107 
112  inline size_t num_tasks() const;
113 
114  private:
115 
116  inline void set_up(size_t num_workers) override final;
117  inline void on_entry(size_t worker_id, TaskView task_view) override final;
118  inline void on_exit(size_t worker_id, TaskView task_view) override final;
119 
120  Timeline _timeline;
121 };
122 
123 // constructor
124 inline ChromeObserver::Segment::Segment(
125  const std::string& n,
127 ) :
128  name {n}, beg {b} {
129 }
130 
131 // constructor
132 inline ChromeObserver::Segment::Segment(
133  const std::string& n,
136 ) :
137  name {n}, beg {b}, end {e} {
138 }
139 
140 // Procedure: set_up
141 inline void ChromeObserver::set_up(size_t num_workers) {
142  _timeline.segments.resize(num_workers);
143  _timeline.stacks.resize(num_workers);
144 
145  for(size_t w=0; w<num_workers; ++w) {
146  _timeline.segments[w].reserve(32);
147  }
148 
149  _timeline.origin = std::chrono::steady_clock::now();
150 }
151 
152 // Procedure: on_entry
153 inline void ChromeObserver::on_entry(size_t w, TaskView) {
154  _timeline.stacks[w].push(std::chrono::steady_clock::now());
155 }
156 
157 // Procedure: on_exit
158 inline void ChromeObserver::on_exit(size_t w, TaskView tv) {
159  assert(!_timeline.stacks[w].empty());
160 
161  auto beg = _timeline.stacks[w].top();
162  _timeline.stacks[w].pop();
163 
164  _timeline.segments[w].emplace_back(
165  tv.name(), beg, std::chrono::steady_clock::now()
166  );
167 }
168 
169 // Function: clear
170 inline void ChromeObserver::clear() {
171  for(size_t w=0; w<_timeline.segments.size(); ++w) {
172  _timeline.segments[w].clear();
173  while(!_timeline.stacks[w].empty()) {
174  _timeline.stacks[w].pop();
175  }
176  }
177 }
178 
179 // Procedure: dump
180 inline void ChromeObserver::dump(std::ostream& os) const {
181 
182  size_t first;
183 
184  for(first = 0; first<_timeline.segments.size(); ++first) {
185  if(_timeline.segments[first].size() > 0) {
186  break;
187  }
188  }
189 
190  os << '[';
191 
192  for(size_t w=first; w<_timeline.segments.size(); w++) {
193 
194  if(w != first && _timeline.segments[w].size() > 0) {
195  os << ',';
196  }
197 
198  for(size_t i=0; i<_timeline.segments[w].size(); i++) {
199 
200  os << '{'
201  << "\"cat\":\"ChromeObserver\",";
202 
203  // name field
204  os << "\"name\":\"";
205  if(_timeline.segments[w][i].name.empty()) {
206  os << w << '_' << i;
207  }
208  else {
209  os << _timeline.segments[w][i].name;
210  }
211  os << "\",";
212 
213  // segment field
214  os << "\"ph\":\"X\","
215  << "\"pid\":1,"
216  << "\"tid\":" << w << ','
218  _timeline.segments[w][i].beg - _timeline.origin
219  ).count() << ','
221  _timeline.segments[w][i].end - _timeline.segments[w][i].beg
222  ).count();
223 
224  if(i != _timeline.segments[w].size() - 1) {
225  os << "},";
226  }
227  else {
228  os << '}';
229  }
230  }
231  }
232  os << "]\n";
233 }
234 
235 // Function: dump
237  std::ostringstream oss;
238  dump(oss);
239  return oss.str();
240 }
241 
242 // Function: num_tasks
243 inline size_t ChromeObserver::num_tasks() const {
244  return std::accumulate(
245  _timeline.segments.begin(), _timeline.segments.end(), size_t{0},
246  [](size_t sum, const auto& exe){
247  return sum + exe.size();
248  }
249  );
250 }
251 
252 // ----------------------------------------------------------------------------
253 // TFProfObserver definition
254 // ----------------------------------------------------------------------------
255 
263 
264  friend class Executor;
265 
266  // data structure to record each task execution
267  struct Segment {
268 
269  std::string name;
270  TaskType type;
271 
274 
275  Segment(
276  const std::string& n,
277  TaskType t,
279  );
280 
281  Segment(
282  const std::string& n,
283  TaskType t,
286  );
287  };
288 
289  // data structure to store the entire execution timeline
290  struct Timeline {
294  };
295 
296  public:
297 
302  inline void dump(std::ostream& ostream) const;
303 
308  inline std::string dump() const;
309 
313  inline void clear();
314 
319  inline size_t num_tasks() const;
320 
321  private:
322 
323  inline void set_up(size_t num_workers) override final;
324  inline void on_entry(size_t worker_id, TaskView task_view) override final;
325  inline void on_exit(size_t worker_id, TaskView task_view) override final;
326 
327  Timeline _timeline;
328 
329  UUID _uuid;
330 };
331 
332 // constructor
333 inline TFProfObserver::Segment::Segment(
334  const std::string& n,
335  TaskType t,
337 ) :
338  name {n}, type {t}, beg {b} {
339 }
340 
341 // constructor
342 inline TFProfObserver::Segment::Segment(
343  const std::string& n,
344  TaskType t,
347 ) :
348  name {n}, type {t}, beg {b}, end {e} {
349 }
350 
351 // Procedure: set_up
352 inline void TFProfObserver::set_up(size_t num_workers) {
353 
354  _timeline.segments.resize(num_workers);
355  _timeline.stacks.resize(num_workers);
356 
357  _timeline.origin = std::chrono::steady_clock::now();
358 }
359 
360 // Procedure: on_entry
361 inline void TFProfObserver::on_entry(size_t w, TaskView) {
362  _timeline.stacks[w].push(std::chrono::steady_clock::now());
363 }
364 
365 // Procedure: on_exit
366 inline void TFProfObserver::on_exit(size_t w, TaskView tv) {
367 
368  assert(!_timeline.stacks[w].empty());
369 
370  if(_timeline.stacks.size() > _timeline.segments[w].size()){
371  _timeline.segments[w].resize(_timeline.stacks.size());
372  }
373 
374  auto beg = _timeline.stacks[w].top();
375  _timeline.stacks[w].pop();
376 
377  _timeline.segments[w][_timeline.stacks[w].size()].emplace_back(
378  tv.name(), tv.type(), beg, std::chrono::steady_clock::now()
379  );
380 }
381 
382 // Function: clear
383 inline void TFProfObserver::clear() {
384  for(size_t w=0; w<_timeline.segments.size(); ++w) {
385  for(size_t l=0; l<_timeline.segments[w].size(); ++l) {
386  _timeline.segments[w][l].clear();
387  }
388  while(!_timeline.stacks[w].empty()) {
389  _timeline.stacks[w].pop();
390  }
391  }
392 }
393 
394 // Procedure: dump
395 inline void TFProfObserver::dump(std::ostream& os) const {
396 
397  size_t first;
398 
399  for(first = 0; first<_timeline.segments.size(); ++first) {
400  if(_timeline.segments[first].size() > 0) {
401  break;
402  }
403  }
404 
405  // not timeline data to dump
406  if(first == _timeline.segments.size()) {
407  os << "{}\n";
408  return;
409  }
410 
411  os << "{\"executor\":\"" << _uuid << "\",\"data\":[";
412 
413  bool comma = false;
414 
415  for(size_t w=first; w<_timeline.segments.size(); w++) {
416  for(size_t l=0; l<_timeline.segments[w].size(); l++) {
417 
418  if(_timeline.segments[w][l].empty()) {
419  continue;
420  }
421 
422  if(comma) {
423  os << ',';
424  }
425  else {
426  comma = true;
427  }
428 
429  os << "{\"worker\":" << w << ",\"level\":" << l << ",\"data\":[";
430  for(size_t i=0; i<_timeline.segments[w][l].size(); ++i) {
431 
432  const auto& s = _timeline.segments[w][l][i];
433 
434  if(i) os << ',';
435 
436  // span
437  os << "{\"span\":["
439  s.beg - _timeline.origin
440  ).count() << ","
442  s.end - _timeline.origin
443  ).count() << "],";
444 
445  // name
446  os << "\"name\":\"";
447  if(s.name.empty()) {
448  os << w << '_' << i;
449  }
450  else {
451  os << s.name;
452  }
453  os << "\",";
454 
455  // category "type": "Condition Task",
456  os << "\"type\":\"" << task_type_to_string(s.type) << "\"";
457 
458  os << "}";
459  }
460  os << "]}";
461  }
462  }
463 
464  os << "]}\n";
465 }
466 
467 // Function: dump
469  std::ostringstream oss;
470  dump(oss);
471  return oss.str();
472 }
473 
474 // Function: num_tasks
475 inline size_t TFProfObserver::num_tasks() const {
476  return std::accumulate(
477  _timeline.segments.begin(), _timeline.segments.end(), size_t{0},
478  [](size_t sum, const auto& exe){
479  return sum + exe.size();
480  }
481  );
482 }
483 
484 // ----------------------------------------------------------------------------
485 // Identifier for Each Built-in Observer
486 // ----------------------------------------------------------------------------
487 
493 enum ObserverType {
494  TFPROF = 1,
495  CHROME = 2
496 };
497 
501 inline const char* observer_type_to_string(ObserverType type) {
502  const char* val;
503  switch(type) {
504  case TFPROF: val = "tfprof"; break;
505  case CHROME: val = "chrome"; break;
506  default: val = "undefined"; break;
507  }
508  return val;
509 }
510 
511 // ----------------------------------------------------------------------------
512 // Legacy Alias
513 // ----------------------------------------------------------------------------
514 using ExecutorObserverInterface = ObserverInterface;
515 using ExecutorObserver = ChromeObserver;
516 
517 
518 } // end of namespace tf -----------------------------------------------------
519 
520 
void clear()
clear the timeline data
Definition: observer.hpp:170
void clear()
clear the timeline data
Definition: observer.hpp:383
virtual void on_entry(size_t worker_id, TaskView task_view)=0
method to call before a worker thread executes a closure
std::string dump() const
dump the timelines in JSON to a std::string
Definition: observer.hpp:468
T duration_cast(T... args)
T end(T... args)
Definition: error.hpp:9
observer designed based on taskflow board format
Definition: observer.hpp:262
virtual void on_exit(size_t worker_id, TaskView task_view)=0
method to call after a worker thread executed a closure
virtual void set_up(size_t num_workers)=0
constructor-like method to call when the executor observer is fully created
class to access task information from the observer interface
Definition: task.hpp:531
size_t num_tasks() const
get the number of total tasks in the observer
Definition: observer.hpp:475
execution interface for running a taskflow graph
Definition: executor.hpp:24
The interface class for creating an executor observer.
Definition: observer.hpp:16
size_t num_tasks() const
get the number of total tasks in the observer
Definition: observer.hpp:243
virtual ~ObserverInterface()=default
virtual destructor
std::string dump() const
dump the timelines in JSON to a std::string
Definition: observer.hpp:236
observer designed based on chrome tracing format
Definition: observer.hpp:58