Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/walker.py: 100%
253 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
1"""
2The module implements generic walker through given zserio object tree.
3"""
4import functools
5import re
6import typing
8from zserio.exception import PythonRuntimeException
9from zserio.typeinfo import TypeAttribute, MemberInfo, MemberAttribute
11class WalkObserver:
12 """
13 Interface for observers which are called by the walker.
14 """
16 def begin_root(self, compound: typing.Any) -> None:
17 """
18 Called for the root compound zserio object which is to be walked-through.
20 :param compound: Root compound zserio object.
21 """
22 raise NotImplementedError()
24 def end_root(self, compound: typing.Any) -> None:
25 """
26 Called at the end of just walked root compound zserio object.
28 :param compound: Root compound zserio object.
29 """
30 raise NotImplementedError()
32 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
33 """
34 Called at the beginning of an array.
36 Note that for None arrays (i.e. non-present optionals) the visit_value with None is called instead!
38 :param array: Zserio array.
39 :param member_info: Array member info.
40 """
42 raise NotImplementedError()
44 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
45 """
46 Called at the end of an array.
48 :param array: Zserio array.
49 :param member_info: Array member info.
50 """
52 raise NotImplementedError()
54 def begin_compound(self, compound: typing.Any, member_info: MemberInfo,
55 element_index: typing.Optional[int] = None) -> None:
56 """
57 Called at the beginning of an compound field object.
59 Note that for None compounds (i.e. uninitialized or optionals) the visit_value method is called instead!
61 :param compound: Compound zserio object.
62 :param member_info: Compound member info.
63 :param element_index: Element index in array or None if the compound is not in array.
64 """
65 raise NotImplementedError()
67 def end_compound(self, compound: typing.Any, member_info: MemberInfo,
68 element_index: typing.Optional[int] = None) -> None:
69 """
70 Called at the end of just walked compound object.
72 :param compound: Compound zserio object.
73 :param member_info: Compound member info.
74 :param element_index: Element index in array or None if the compound is not in array.
75 """
76 raise NotImplementedError()
78 def visit_value(self, value: typing.Any, member_info: MemberInfo,
79 element_index: typing.Optional[int] = None) -> None:
80 """
81 Called when a simple (or an unset compound or array - i.e. None) value is reached.
83 :param value: Simple value.
84 :param member_info: Member info.
85 :param element_index: Element index in array or None if the value is not in array.
86 """
87 raise NotImplementedError()
89class WalkFilter:
90 """
91 Interface for filters which can influence the walking.
92 """
94 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
95 """
96 Called before an array.
98 Note that for None arrays (i.e. non-present optionals) the before_value with None is called instead!
100 :param array: Zserio array.
101 :param member_info: Array member info.
103 :returns: True when the walking should continue to the array.
104 """
105 raise NotImplementedError()
107 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
108 """
109 Called after an array.
111 :param array: Zserio array.
112 :param member_info: Array member info.
114 :returns: True when the walking should continue to a next sibling, False to return to the parent.
115 """
117 raise NotImplementedError()
119 def before_compound(self, compound: typing.Any, member_info: MemberInfo,
120 element_index: typing.Optional[int] = None) -> bool:
121 """
122 Called before a compound object.
124 Note that for uninitialized compounds (i.e. None) the before_value method is called instead!
126 :param compound: Compound zserio object.
127 :param member_info: Compound member info.
128 :param element_index: Element index in array or None if the compound is not in array.
130 :returns: True when the walking should continue into the compound object, False otherwise.
131 """
132 raise NotImplementedError()
134 def after_compound(self, compound: typing.Any, member_info: MemberInfo,
135 element_index: typing.Optional[int] = None) -> bool:
136 """
137 Called after a compound object.
139 :param compound: Compound zserio object.
140 :param member_info: Compound member info.
141 :param element_index: Element index in array or None if the compound is not in array.
143 :returns: True when the walking should continue to a next sibling, False to return to the parent.
144 """
145 raise NotImplementedError()
147 def before_value(self, value: typing.Any, member_info: MemberInfo,
148 element_index: typing.Optional[int] = None) -> bool:
149 """
150 Called before a simple (or an unset compound or array - i.e. None) value.
152 :param value: Simple value.
153 :param member_info: Member info.
154 :param element_index: Element index in array or None if the value is not in array.
156 :returns: True when the walking should continue to the simple value, False otherwise.
157 """
158 raise NotImplementedError()
160 def after_value(self, value: typing.Any, member_info: MemberInfo,
161 element_index: typing.Optional[int] = None) -> bool:
162 """
163 Called after a simple (or an unset compound or array - i.e. None) value.
165 :param value: Simple value.
166 :param member_info: Member info.
167 :param element_index: Element index in array or None if the value is not in array.
169 :returns: True when the walking should continue to a next sibling, False to return to the parent.
170 """
171 raise NotImplementedError()
173class Walker:
174 """
175 Walker through zserio objects, based on generated type info (see -withTypeInfoCode).
176 """
178 def __init__(self, walk_observer: WalkObserver, walk_filter: typing.Optional[WalkFilter] = None) -> None:
179 """
180 Constructor.
182 :param walk_observer: Observer to use during walking.
183 :param walk_filter: Walk filter to use.
184 """
186 self._walk_observer = walk_observer
187 self._walk_filter = walk_filter if walk_filter is not None else DefaultWalkFilter()
189 def walk(self, zserio_object: typing.Any) -> None:
190 """
191 Walks given zserio compound object which must be generated with type_info
192 (see -withTypeInfoCode options).
194 :param zserio_object: Zserio object to walk.
195 """
197 if not hasattr(zserio_object, "type_info"):
198 raise PythonRuntimeException("Walker: Type info must be enabled"
199 " (see zserio option -withTypeInfoCode)!")
201 type_info = zserio_object.type_info()
202 if TypeAttribute.FIELDS not in type_info.attributes:
203 raise PythonRuntimeException("Walker: Root object '" + type_info.schema_name +
204 "' is not a compound type!")
206 self._walk_observer.begin_root(zserio_object)
207 self._walk_fields(zserio_object, type_info)
208 self._walk_observer.end_root(zserio_object)
210 def _walk_fields(self, zserio_object, type_info) -> None:
211 fields = type_info.attributes[TypeAttribute.FIELDS]
212 if TypeAttribute.SELECTOR in type_info.attributes:
213 # union or choice
214 choice_tag = zserio_object.choice_tag
215 if choice_tag != zserio_object.UNDEFINED_CHOICE:
216 field = fields[choice_tag]
217 self._walk_field(getattr(zserio_object, field.attributes[MemberAttribute.PROPERTY_NAME]),
218 field)
219 # else: uninitialized or empty branch
220 else:
221 # structure
222 for field in fields:
223 if not self._walk_field(getattr(zserio_object,
224 field.attributes[MemberAttribute.PROPERTY_NAME]),
225 field):
226 break
228 def _walk_field(self, zserio_object: typing.Any, member_info: MemberInfo) -> bool:
229 if zserio_object is not None and MemberAttribute.ARRAY_LENGTH in member_info.attributes:
230 if self._walk_filter.before_array(zserio_object, member_info):
231 self._walk_observer.begin_array(zserio_object, member_info)
232 for index, element in enumerate(zserio_object):
233 if not self._walk_field_value(element, member_info, index):
234 break
235 self._walk_observer.end_array(zserio_object, member_info)
236 return self._walk_filter.after_array(zserio_object, member_info)
237 else:
238 return self._walk_field_value(zserio_object, member_info)
240 def _walk_field_value(self, zserio_object: typing.Any, member_info: MemberInfo,
241 element_index: typing.Optional[int] = None) -> bool:
242 type_info = member_info.type_info
243 if zserio_object is not None and TypeAttribute.FIELDS in type_info.attributes:
244 if self._walk_filter.before_compound(zserio_object, member_info, element_index):
245 self._walk_observer.begin_compound(zserio_object, member_info, element_index)
246 self._walk_fields(zserio_object, type_info)
247 self._walk_observer.end_compound(zserio_object, member_info, element_index)
248 return self._walk_filter.after_compound(zserio_object, member_info, element_index)
249 else:
250 if self._walk_filter.before_value(zserio_object, member_info, element_index):
251 self._walk_observer.visit_value(zserio_object, member_info, element_index)
252 return self._walk_filter.after_value(zserio_object, member_info, element_index)
254class DefaultWalkObserver(WalkObserver):
255 """
256 Default walk observer which just does nothing.
257 """
259 def begin_root(self, _root: typing.Any) -> None:
260 pass
262 def end_root(self, _root: typing.Any) -> None:
263 pass
265 def begin_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> None:
266 pass
268 def end_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> None:
269 pass
271 def begin_compound(self, _compound: typing.Any, member_info: MemberInfo,
272 element_index: typing.Optional[int] = None) -> None:
273 pass
275 def end_compound(self, _compound: typing.Any, _member_info: MemberInfo,
276 element_index: typing.Optional[int] = None) -> None:
277 pass
279 def visit_value(self, _value : typing.Any, member_info: MemberInfo,
280 element_index: typing.Optional[int] = None) -> None:
281 pass
283class DefaultWalkFilter(WalkFilter):
284 """
285 Default walk filter which filters nothing.
286 """
288 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
289 return True
291 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
292 return True
294 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo,
295 _element_index: typing.Optional[int] = None) -> bool:
296 return True
298 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo,
299 _element_index: typing.Optional[int] = None) -> bool:
300 return True
302 def before_value(self, _value: typing.Any, _member_info: MemberInfo,
303 _element_index: typing.Optional[int] = None) -> bool:
304 return True
306 def after_value(self, _value: typing.Any, _member_info: MemberInfo,
307 _element_index: typing.Optional[int] = None) -> bool:
308 return True
310class DepthWalkFilter(WalkFilter):
311 """
312 Walk filter which allows to walk only to the given maximum depth.
313 """
315 def __init__(self, max_depth: int):
316 """
317 Constructor.
319 :param max_depth: Maximum depth to walk to.
320 """
322 self._max_depth = max_depth
323 self._depth = 1
325 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
326 return self._enter_depth_level()
328 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
329 return self._leave_depth_level()
331 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo,
332 _element_index: typing.Optional[int] = None) -> bool:
333 return self._enter_depth_level()
335 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo,
336 _element_index: typing.Optional[int] = None) -> bool:
337 return self._leave_depth_level()
339 def before_value(self, _value: typing.Any, _member_info: MemberInfo,
340 _element_index: typing.Optional[int] = None) -> bool:
341 return self._depth <= self._max_depth
343 def after_value(self, _value: typing.Any, _member_info: MemberInfo,
344 _element_index: typing.Optional[int] = None) -> bool:
345 return True
347 def _enter_depth_level(self) -> bool:
348 enter = self._depth <= self._max_depth
349 self._depth += 1
350 return enter
352 def _leave_depth_level(self) -> bool:
353 self._depth -= 1
354 return True
356class RegexWalkFilter(WalkFilter):
357 """
358 Walk filter which allows to walk only paths matching the given regex.
360 The path is constructed from field names within the root object, thus the root object
361 itself is not part of the path.
363 Array elements have the index appended to the path so that e.g. "compound.arrayField[0]" will match
364 only the first element in the array "arrayField".
365 """
367 def __init__(self, path_regex: str) -> None:
368 """
369 Constructor.
371 :param path_regex: Path regex to use for filtering.
372 """
374 self._current_path: typing.List[str] = []
375 self._path_regex = re.compile(path_regex)
377 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
378 self._current_path.append(member_info.schema_name)
379 if self._path_regex.match(self._get_current_path()):
380 return True # the array itself matches
382 # try to find match in each element and continue into the array only if some match is found
383 # (note that array is never None)
384 for i, element in enumerate(array):
385 self._current_path[-1] = member_info.schema_name + f"[{i}]"
386 if self._match_subtree(element, member_info):
387 return True
388 self._current_path[-1] = member_info.schema_name
389 return False
391 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
392 self._current_path.pop()
393 return True
395 def before_compound(self, compound: typing.Any, member_info: MemberInfo,
396 element_index: typing.Optional[int] = None) -> bool:
397 self._append_path(member_info, element_index)
398 if self._path_regex.match(self._get_current_path()):
399 return True # the compound itself matches
401 return self._match_subtree(compound, member_info)
403 def after_compound(self, _compound: typing.Any, member_info: MemberInfo,
404 element_index: typing.Optional[int] = None) -> bool:
405 self._pop_path(member_info, element_index)
406 return True
408 def before_value(self, value: typing.Any, member_info: MemberInfo,
409 element_index: typing.Optional[int] = None) -> bool:
410 self._append_path(member_info, element_index)
411 return self._match_subtree(value, member_info)
413 def after_value(self, _value: typing.Any, member_info: MemberInfo,
414 element_index: typing.Optional[int] = None) -> bool:
415 self._pop_path(member_info, element_index)
416 return True
418 def _match_subtree(self, member: typing.Any, member_info: MemberInfo) -> bool:
419 if member is not None and TypeAttribute.FIELDS in member_info.type_info.attributes:
420 # is a not None compound, try to find match within its subtree
421 subtree_regex_filter = self._SubtreeRegexFilter(self._current_path.copy(), self._path_regex)
422 walker = Walker(DefaultWalkObserver(), subtree_regex_filter)
423 walker.walk(member)
424 return subtree_regex_filter.matches()
425 else:
426 # try to match a simple value or None compound
427 return self._path_regex.match(self._get_current_path()) is not None
429 def _get_current_path(self):
430 return self._get_current_path_impl(self._current_path)
432 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
433 self._append_path_impl(self._current_path, member_info, element_index)
435 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
436 self._pop_path_impl(self._current_path, member_info, element_index)
438 @staticmethod
439 def _get_current_path_impl(current_path: typing.List[str]) -> str:
440 return ".".join(current_path)
442 @staticmethod
443 def _append_path_impl(current_path: typing.List[str], member_info: MemberInfo,
444 element_index: typing.Optional[int]) -> None:
445 if element_index is None:
446 current_path.append(member_info.schema_name)
447 else:
448 current_path[-1] = member_info.schema_name + f"[{element_index}]" # add index
450 @staticmethod
451 def _pop_path_impl(current_path: typing.List[str], member_info: MemberInfo,
452 element_index: typing.Optional[int]) -> None:
453 if element_index is None:
454 current_path.pop()
455 else:
456 current_path[-1] = member_info.schema_name # just remove the index
458 class _SubtreeRegexFilter(WalkFilter):
459 """
460 Walks whole subtree and in case of match stops walking. Used to check whether any path
461 within the subtree matches given regex.
462 """
464 def __init__(self, current_path: typing.List[str], path_regex: typing.Pattern) -> None:
465 self._current_path = current_path
466 self._path_regex = path_regex
467 self._matches = False
469 def matches(self) -> bool:
470 """
471 Returns whether the subtree contains any matching value.
473 :returns: True when the subtree contains a matching value, False otherwise.
474 """
476 return self._matches
478 def before_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
479 self._current_path.append(member_info.schema_name)
480 self._matches = self._path_regex.match(self._get_current_path()) is not None
482 # terminate when the match is already found (note that array is never None here)
483 return not self._matches
485 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
486 self._current_path.pop()
487 return not self._matches # terminate when the match is already found
489 def before_compound(self, _compound: typing.Any, member_info: MemberInfo,
490 element_index: typing.Optional[int] = None) -> bool:
491 self._append_path(member_info, element_index)
492 self._matches = self._path_regex.match(self._get_current_path()) is not None
494 # terminate when the match is already found (note that compound is never None here)
495 return not self._matches
497 def after_compound(self, _compound: typing.Any, member_info: MemberInfo,
498 element_index: typing.Optional[int] = None) -> bool:
499 self._pop_path(member_info, element_index)
500 return not self._matches # terminate when the match is already found
502 def before_value(self, _value: typing.Any, member_info: MemberInfo,
503 element_index: typing.Optional[int] = None) -> bool:
504 self._append_path(member_info, element_index)
505 self._matches = self._path_regex.match(self._get_current_path()) is not None
507 return not self._matches # terminate when the match is already found
509 def after_value(self, _value: typing.Any, member_info: MemberInfo,
510 element_index: typing.Optional[int] = None) -> bool:
511 self._pop_path(member_info, element_index)
512 return not self._matches # terminate when the match is already found
514 def _get_current_path(self) -> str:
515 return RegexWalkFilter._get_current_path_impl(self._current_path)
517 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
518 RegexWalkFilter._append_path_impl(self._current_path, member_info, element_index)
520 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None:
521 RegexWalkFilter._pop_path_impl(self._current_path, member_info, element_index)
523class ArrayLengthWalkFilter(WalkFilter):
524 """
525 Walk filter which allows to walk only to the given maximum array length.
526 """
528 def __init__(self, max_array_length: int):
529 """
530 Constructor.
532 :param max_array_length: Maximum array length to walk to.
533 """
535 self._max_array_length = max_array_length
537 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
538 return True
540 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool:
541 return True
543 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo,
544 element_index: typing.Optional[int] = None) -> bool:
545 return self._filter_array_element(element_index)
547 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo,
548 element_index: typing.Optional[int] = None) -> bool:
549 return self._filter_array_element(element_index)
551 def before_value(self, _value: typing.Any, _member_info: MemberInfo,
552 element_index: typing.Optional[int] = None) -> bool:
553 return self._filter_array_element(element_index)
555 def after_value(self, _value: typing.Any, _member_info: MemberInfo,
556 element_index: typing.Optional[int] = None) -> bool:
557 return self._filter_array_element(element_index)
559 def _filter_array_element(self, element_index: typing.Optional[int]) -> bool:
560 return True if element_index is None else element_index < self._max_array_length
562class AndWalkFilter(WalkFilter):
563 """
564 Walk filter which implements composition of particular filters.
566 The filters are called sequentially and logical and is applied on theirs results.
567 Note that all filters are always called.
568 """
570 def __init__(self, walk_filters: typing.List[WalkFilter]) -> None:
571 """
572 Constructor.
574 :param walk_filters: List of filters to use in composition.
575 """
577 self._walk_filters = walk_filters
579 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
580 return self._apply_filters(lambda x: x.before_array(array, member_info))
582 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool:
583 return self._apply_filters(lambda x: x.after_array(array, member_info))
585 def before_compound(self, compound: typing.Any, member_info: MemberInfo,
586 element_index: typing.Optional[int] = None) -> bool:
587 return self._apply_filters(lambda x: x.before_compound(compound, member_info, element_index))
589 def after_compound(self, compound: typing.Any, member_info: MemberInfo,
590 element_index: typing.Optional[int] = None) -> bool:
591 return self._apply_filters(lambda x: x.after_compound(compound, member_info, element_index))
593 def before_value(self, value: typing.Any, member_info: MemberInfo,
594 element_index: typing.Optional[int] = None) -> bool:
595 return self._apply_filters(lambda x: x.before_value(value, member_info, element_index))
597 def after_value(self, value: typing.Any, member_info: MemberInfo,
598 element_index: typing.Optional[int] = None) -> bool:
599 return self._apply_filters(lambda x: x.after_value(value, member_info, element_index))
601 def _apply_filters(self, method):
602 return functools.reduce(lambda x, y: x and y, map(method, self._walk_filters), True)