Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/walker.py: 100%

253 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-12-13 15:12 +0000

1""" 

2The module implements generic walker through given zserio object tree. 

3""" 

4import functools 

5import re 

6import typing 

7 

8from zserio.exception import PythonRuntimeException 

9from zserio.typeinfo import TypeAttribute, MemberInfo, MemberAttribute 

10 

11class WalkObserver: 

12 """ 

13 Interface for observers which are called by the walker. 

14 """ 

15 

16 def begin_root(self, compound: typing.Any) -> None: 

17 """ 

18 Called for the root compound zserio object which is to be walked-through. 

19 

20 :param compound: Root compound zserio object. 

21 """ 

22 raise NotImplementedError() 

23 

24 def end_root(self, compound: typing.Any) -> None: 

25 """ 

26 Called at the end of just walked root compound zserio object. 

27 

28 :param compound: Root compound zserio object. 

29 """ 

30 raise NotImplementedError() 

31 

32 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

33 """ 

34 Called at the beginning of an array. 

35 

36 Note that for None arrays (i.e. non-present optionals) the visit_value with None is called instead! 

37 

38 :param array: Zserio array. 

39 :param member_info: Array member info. 

40 """ 

41 

42 raise NotImplementedError() 

43 

44 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

45 """ 

46 Called at the end of an array. 

47 

48 :param array: Zserio array. 

49 :param member_info: Array member info. 

50 """ 

51 

52 raise NotImplementedError() 

53 

54 def begin_compound(self, compound: typing.Any, member_info: MemberInfo, 

55 element_index: typing.Optional[int] = None) -> None: 

56 """ 

57 Called at the beginning of an compound field object. 

58 

59 Note that for None compounds (i.e. uninitialized or optionals) the visit_value method is called instead! 

60 

61 :param compound: Compound zserio object. 

62 :param member_info: Compound member info. 

63 :param element_index: Element index in array or None if the compound is not in array. 

64 """ 

65 raise NotImplementedError() 

66 

67 def end_compound(self, compound: typing.Any, member_info: MemberInfo, 

68 element_index: typing.Optional[int] = None) -> None: 

69 """ 

70 Called at the end of just walked compound object. 

71 

72 :param compound: Compound zserio object. 

73 :param member_info: Compound member info. 

74 :param element_index: Element index in array or None if the compound is not in array. 

75 """ 

76 raise NotImplementedError() 

77 

78 def visit_value(self, value: typing.Any, member_info: MemberInfo, 

79 element_index: typing.Optional[int] = None) -> None: 

80 """ 

81 Called when a simple (or an unset compound or array - i.e. None) value is reached. 

82 

83 :param value: Simple value. 

84 :param member_info: Member info. 

85 :param element_index: Element index in array or None if the value is not in array. 

86 """ 

87 raise NotImplementedError() 

88 

89class WalkFilter: 

90 """ 

91 Interface for filters which can influence the walking. 

92 """ 

93 

94 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

95 """ 

96 Called before an array. 

97 

98 Note that for None arrays (i.e. non-present optionals) the before_value with None is called instead! 

99 

100 :param array: Zserio array. 

101 :param member_info: Array member info. 

102 

103 :returns: True when the walking should continue to the array. 

104 """ 

105 raise NotImplementedError() 

106 

107 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

108 """ 

109 Called after an array. 

110 

111 :param array: Zserio array. 

112 :param member_info: Array member info. 

113 

114 :returns: True when the walking should continue to a next sibling, False to return to the parent. 

115 """ 

116 

117 raise NotImplementedError() 

118 

119 def before_compound(self, compound: typing.Any, member_info: MemberInfo, 

120 element_index: typing.Optional[int] = None) -> bool: 

121 """ 

122 Called before a compound object. 

123 

124 Note that for uninitialized compounds (i.e. None) the before_value method is called instead! 

125 

126 :param compound: Compound zserio object. 

127 :param member_info: Compound member info. 

128 :param element_index: Element index in array or None if the compound is not in array. 

129 

130 :returns: True when the walking should continue into the compound object, False otherwise. 

131 """ 

132 raise NotImplementedError() 

133 

134 def after_compound(self, compound: typing.Any, member_info: MemberInfo, 

135 element_index: typing.Optional[int] = None) -> bool: 

136 """ 

137 Called after a compound object. 

138 

139 :param compound: Compound zserio object. 

140 :param member_info: Compound member info. 

141 :param element_index: Element index in array or None if the compound is not in array. 

142 

143 :returns: True when the walking should continue to a next sibling, False to return to the parent. 

144 """ 

145 raise NotImplementedError() 

146 

147 def before_value(self, value: typing.Any, member_info: MemberInfo, 

148 element_index: typing.Optional[int] = None) -> bool: 

149 """ 

150 Called before a simple (or an unset compound or array - i.e. None) value. 

151 

152 :param value: Simple value. 

153 :param member_info: Member info. 

154 :param element_index: Element index in array or None if the value is not in array. 

155 

156 :returns: True when the walking should continue to the simple value, False otherwise. 

157 """ 

158 raise NotImplementedError() 

159 

160 def after_value(self, value: typing.Any, member_info: MemberInfo, 

161 element_index: typing.Optional[int] = None) -> bool: 

162 """ 

163 Called after a simple (or an unset compound or array - i.e. None) value. 

164 

165 :param value: Simple value. 

166 :param member_info: Member info. 

167 :param element_index: Element index in array or None if the value is not in array. 

168 

169 :returns: True when the walking should continue to a next sibling, False to return to the parent. 

170 """ 

171 raise NotImplementedError() 

172 

173class Walker: 

174 """ 

175 Walker through zserio objects, based on generated type info (see -withTypeInfoCode). 

176 """ 

177 

178 def __init__(self, walk_observer: WalkObserver, walk_filter: typing.Optional[WalkFilter] = None) -> None: 

179 """ 

180 Constructor. 

181 

182 :param walk_observer: Observer to use during walking. 

183 :param walk_filter: Walk filter to use. 

184 """ 

185 

186 self._walk_observer = walk_observer 

187 self._walk_filter = walk_filter if walk_filter is not None else DefaultWalkFilter() 

188 

189 def walk(self, zserio_object: typing.Any) -> None: 

190 """ 

191 Walks given zserio compound object which must be generated with type_info 

192 (see -withTypeInfoCode options). 

193 

194 :param zserio_object: Zserio object to walk. 

195 """ 

196 

197 if not hasattr(zserio_object, "type_info"): 

198 raise PythonRuntimeException("Walker: Type info must be enabled" 

199 " (see zserio option -withTypeInfoCode)!") 

200 

201 type_info = zserio_object.type_info() 

202 if TypeAttribute.FIELDS not in type_info.attributes: 

203 raise PythonRuntimeException("Walker: Root object '" + type_info.schema_name + 

204 "' is not a compound type!") 

205 

206 self._walk_observer.begin_root(zserio_object) 

207 self._walk_fields(zserio_object, type_info) 

208 self._walk_observer.end_root(zserio_object) 

209 

210 def _walk_fields(self, zserio_object, type_info) -> None: 

211 fields = type_info.attributes[TypeAttribute.FIELDS] 

212 if TypeAttribute.SELECTOR in type_info.attributes: 

213 # union or choice 

214 choice_tag = zserio_object.choice_tag 

215 if choice_tag != zserio_object.UNDEFINED_CHOICE: 

216 field = fields[choice_tag] 

217 self._walk_field(getattr(zserio_object, field.attributes[MemberAttribute.PROPERTY_NAME]), 

218 field) 

219 # else: uninitialized or empty branch 

220 else: 

221 # structure 

222 for field in fields: 

223 if not self._walk_field(getattr(zserio_object, 

224 field.attributes[MemberAttribute.PROPERTY_NAME]), 

225 field): 

226 break 

227 

228 def _walk_field(self, zserio_object: typing.Any, member_info: MemberInfo) -> bool: 

229 if zserio_object is not None and MemberAttribute.ARRAY_LENGTH in member_info.attributes: 

230 if self._walk_filter.before_array(zserio_object, member_info): 

231 self._walk_observer.begin_array(zserio_object, member_info) 

232 for index, element in enumerate(zserio_object): 

233 if not self._walk_field_value(element, member_info, index): 

234 break 

235 self._walk_observer.end_array(zserio_object, member_info) 

236 return self._walk_filter.after_array(zserio_object, member_info) 

237 else: 

238 return self._walk_field_value(zserio_object, member_info) 

239 

240 def _walk_field_value(self, zserio_object: typing.Any, member_info: MemberInfo, 

241 element_index: typing.Optional[int] = None) -> bool: 

242 type_info = member_info.type_info 

243 if zserio_object is not None and TypeAttribute.FIELDS in type_info.attributes: 

244 if self._walk_filter.before_compound(zserio_object, member_info, element_index): 

245 self._walk_observer.begin_compound(zserio_object, member_info, element_index) 

246 self._walk_fields(zserio_object, type_info) 

247 self._walk_observer.end_compound(zserio_object, member_info, element_index) 

248 return self._walk_filter.after_compound(zserio_object, member_info, element_index) 

249 else: 

250 if self._walk_filter.before_value(zserio_object, member_info, element_index): 

251 self._walk_observer.visit_value(zserio_object, member_info, element_index) 

252 return self._walk_filter.after_value(zserio_object, member_info, element_index) 

253 

254class DefaultWalkObserver(WalkObserver): 

255 """ 

256 Default walk observer which just does nothing. 

257 """ 

258 

259 def begin_root(self, _root: typing.Any) -> None: 

260 pass 

261 

262 def end_root(self, _root: typing.Any) -> None: 

263 pass 

264 

265 def begin_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

266 pass 

267 

268 def end_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> None: 

269 pass 

270 

271 def begin_compound(self, _compound: typing.Any, member_info: MemberInfo, 

272 element_index: typing.Optional[int] = None) -> None: 

273 pass 

274 

275 def end_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

276 element_index: typing.Optional[int] = None) -> None: 

277 pass 

278 

279 def visit_value(self, _value : typing.Any, member_info: MemberInfo, 

280 element_index: typing.Optional[int] = None) -> None: 

281 pass 

282 

283class DefaultWalkFilter(WalkFilter): 

284 """ 

285 Default walk filter which filters nothing. 

286 """ 

287 

288 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

289 return True 

290 

291 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

292 return True 

293 

294 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

295 _element_index: typing.Optional[int] = None) -> bool: 

296 return True 

297 

298 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

299 _element_index: typing.Optional[int] = None) -> bool: 

300 return True 

301 

302 def before_value(self, _value: typing.Any, _member_info: MemberInfo, 

303 _element_index: typing.Optional[int] = None) -> bool: 

304 return True 

305 

306 def after_value(self, _value: typing.Any, _member_info: MemberInfo, 

307 _element_index: typing.Optional[int] = None) -> bool: 

308 return True 

309 

310class DepthWalkFilter(WalkFilter): 

311 """ 

312 Walk filter which allows to walk only to the given maximum depth. 

313 """ 

314 

315 def __init__(self, max_depth: int): 

316 """ 

317 Constructor. 

318 

319 :param max_depth: Maximum depth to walk to. 

320 """ 

321 

322 self._max_depth = max_depth 

323 self._depth = 1 

324 

325 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

326 return self._enter_depth_level() 

327 

328 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

329 return self._leave_depth_level() 

330 

331 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

332 _element_index: typing.Optional[int] = None) -> bool: 

333 return self._enter_depth_level() 

334 

335 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

336 _element_index: typing.Optional[int] = None) -> bool: 

337 return self._leave_depth_level() 

338 

339 def before_value(self, _value: typing.Any, _member_info: MemberInfo, 

340 _element_index: typing.Optional[int] = None) -> bool: 

341 return self._depth <= self._max_depth 

342 

343 def after_value(self, _value: typing.Any, _member_info: MemberInfo, 

344 _element_index: typing.Optional[int] = None) -> bool: 

345 return True 

346 

347 def _enter_depth_level(self) -> bool: 

348 enter = self._depth <= self._max_depth 

349 self._depth += 1 

350 return enter 

351 

352 def _leave_depth_level(self) -> bool: 

353 self._depth -= 1 

354 return True 

355 

356class RegexWalkFilter(WalkFilter): 

357 """ 

358 Walk filter which allows to walk only paths matching the given regex. 

359 

360 The path is constructed from field names within the root object, thus the root object 

361 itself is not part of the path. 

362 

363 Array elements have the index appended to the path so that e.g. "compound.arrayField[0]" will match 

364 only the first element in the array "arrayField". 

365 """ 

366 

367 def __init__(self, path_regex: str) -> None: 

368 """ 

369 Constructor. 

370 

371 :param path_regex: Path regex to use for filtering. 

372 """ 

373 

374 self._current_path: typing.List[str] = [] 

375 self._path_regex = re.compile(path_regex) 

376 

377 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

378 self._current_path.append(member_info.schema_name) 

379 if self._path_regex.match(self._get_current_path()): 

380 return True # the array itself matches 

381 

382 # try to find match in each element and continue into the array only if some match is found 

383 # (note that array is never None) 

384 for i, element in enumerate(array): 

385 self._current_path[-1] = member_info.schema_name + f"[{i}]" 

386 if self._match_subtree(element, member_info): 

387 return True 

388 self._current_path[-1] = member_info.schema_name 

389 return False 

390 

391 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

392 self._current_path.pop() 

393 return True 

394 

395 def before_compound(self, compound: typing.Any, member_info: MemberInfo, 

396 element_index: typing.Optional[int] = None) -> bool: 

397 self._append_path(member_info, element_index) 

398 if self._path_regex.match(self._get_current_path()): 

399 return True # the compound itself matches 

400 

401 return self._match_subtree(compound, member_info) 

402 

403 def after_compound(self, _compound: typing.Any, member_info: MemberInfo, 

404 element_index: typing.Optional[int] = None) -> bool: 

405 self._pop_path(member_info, element_index) 

406 return True 

407 

408 def before_value(self, value: typing.Any, member_info: MemberInfo, 

409 element_index: typing.Optional[int] = None) -> bool: 

410 self._append_path(member_info, element_index) 

411 return self._match_subtree(value, member_info) 

412 

413 def after_value(self, _value: typing.Any, member_info: MemberInfo, 

414 element_index: typing.Optional[int] = None) -> bool: 

415 self._pop_path(member_info, element_index) 

416 return True 

417 

418 def _match_subtree(self, member: typing.Any, member_info: MemberInfo) -> bool: 

419 if member is not None and TypeAttribute.FIELDS in member_info.type_info.attributes: 

420 # is a not None compound, try to find match within its subtree 

421 subtree_regex_filter = self._SubtreeRegexFilter(self._current_path.copy(), self._path_regex) 

422 walker = Walker(DefaultWalkObserver(), subtree_regex_filter) 

423 walker.walk(member) 

424 return subtree_regex_filter.matches() 

425 else: 

426 # try to match a simple value or None compound 

427 return self._path_regex.match(self._get_current_path()) is not None 

428 

429 def _get_current_path(self): 

430 return self._get_current_path_impl(self._current_path) 

431 

432 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None: 

433 self._append_path_impl(self._current_path, member_info, element_index) 

434 

435 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None: 

436 self._pop_path_impl(self._current_path, member_info, element_index) 

437 

438 @staticmethod 

439 def _get_current_path_impl(current_path: typing.List[str]) -> str: 

440 return ".".join(current_path) 

441 

442 @staticmethod 

443 def _append_path_impl(current_path: typing.List[str], member_info: MemberInfo, 

444 element_index: typing.Optional[int]) -> None: 

445 if element_index is None: 

446 current_path.append(member_info.schema_name) 

447 else: 

448 current_path[-1] = member_info.schema_name + f"[{element_index}]" # add index 

449 

450 @staticmethod 

451 def _pop_path_impl(current_path: typing.List[str], member_info: MemberInfo, 

452 element_index: typing.Optional[int]) -> None: 

453 if element_index is None: 

454 current_path.pop() 

455 else: 

456 current_path[-1] = member_info.schema_name # just remove the index 

457 

458 class _SubtreeRegexFilter(WalkFilter): 

459 """ 

460 Walks whole subtree and in case of match stops walking. Used to check whether any path 

461 within the subtree matches given regex. 

462 """ 

463 

464 def __init__(self, current_path: typing.List[str], path_regex: typing.Pattern) -> None: 

465 self._current_path = current_path 

466 self._path_regex = path_regex 

467 self._matches = False 

468 

469 def matches(self) -> bool: 

470 """ 

471 Returns whether the subtree contains any matching value. 

472 

473 :returns: True when the subtree contains a matching value, False otherwise. 

474 """ 

475 

476 return self._matches 

477 

478 def before_array(self, _array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

479 self._current_path.append(member_info.schema_name) 

480 self._matches = self._path_regex.match(self._get_current_path()) is not None 

481 

482 # terminate when the match is already found (note that array is never None here) 

483 return not self._matches 

484 

485 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

486 self._current_path.pop() 

487 return not self._matches # terminate when the match is already found 

488 

489 def before_compound(self, _compound: typing.Any, member_info: MemberInfo, 

490 element_index: typing.Optional[int] = None) -> bool: 

491 self._append_path(member_info, element_index) 

492 self._matches = self._path_regex.match(self._get_current_path()) is not None 

493 

494 # terminate when the match is already found (note that compound is never None here) 

495 return not self._matches 

496 

497 def after_compound(self, _compound: typing.Any, member_info: MemberInfo, 

498 element_index: typing.Optional[int] = None) -> bool: 

499 self._pop_path(member_info, element_index) 

500 return not self._matches # terminate when the match is already found 

501 

502 def before_value(self, _value: typing.Any, member_info: MemberInfo, 

503 element_index: typing.Optional[int] = None) -> bool: 

504 self._append_path(member_info, element_index) 

505 self._matches = self._path_regex.match(self._get_current_path()) is not None 

506 

507 return not self._matches # terminate when the match is already found 

508 

509 def after_value(self, _value: typing.Any, member_info: MemberInfo, 

510 element_index: typing.Optional[int] = None) -> bool: 

511 self._pop_path(member_info, element_index) 

512 return not self._matches # terminate when the match is already found 

513 

514 def _get_current_path(self) -> str: 

515 return RegexWalkFilter._get_current_path_impl(self._current_path) 

516 

517 def _append_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None: 

518 RegexWalkFilter._append_path_impl(self._current_path, member_info, element_index) 

519 

520 def _pop_path(self, member_info: MemberInfo, element_index: typing.Optional[int]) -> None: 

521 RegexWalkFilter._pop_path_impl(self._current_path, member_info, element_index) 

522 

523class ArrayLengthWalkFilter(WalkFilter): 

524 """ 

525 Walk filter which allows to walk only to the given maximum array length. 

526 """ 

527 

528 def __init__(self, max_array_length: int): 

529 """ 

530 Constructor. 

531 

532 :param max_array_length: Maximum array length to walk to. 

533 """ 

534 

535 self._max_array_length = max_array_length 

536 

537 def before_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

538 return True 

539 

540 def after_array(self, _array: typing.List[typing.Any], _member_info: MemberInfo) -> bool: 

541 return True 

542 

543 def before_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

544 element_index: typing.Optional[int] = None) -> bool: 

545 return self._filter_array_element(element_index) 

546 

547 def after_compound(self, _compound: typing.Any, _member_info: MemberInfo, 

548 element_index: typing.Optional[int] = None) -> bool: 

549 return self._filter_array_element(element_index) 

550 

551 def before_value(self, _value: typing.Any, _member_info: MemberInfo, 

552 element_index: typing.Optional[int] = None) -> bool: 

553 return self._filter_array_element(element_index) 

554 

555 def after_value(self, _value: typing.Any, _member_info: MemberInfo, 

556 element_index: typing.Optional[int] = None) -> bool: 

557 return self._filter_array_element(element_index) 

558 

559 def _filter_array_element(self, element_index: typing.Optional[int]) -> bool: 

560 return True if element_index is None else element_index < self._max_array_length 

561 

562class AndWalkFilter(WalkFilter): 

563 """ 

564 Walk filter which implements composition of particular filters. 

565 

566 The filters are called sequentially and logical and is applied on theirs results. 

567 Note that all filters are always called. 

568 """ 

569 

570 def __init__(self, walk_filters: typing.List[WalkFilter]) -> None: 

571 """ 

572 Constructor. 

573 

574 :param walk_filters: List of filters to use in composition. 

575 """ 

576 

577 self._walk_filters = walk_filters 

578 

579 def before_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

580 return self._apply_filters(lambda x: x.before_array(array, member_info)) 

581 

582 def after_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> bool: 

583 return self._apply_filters(lambda x: x.after_array(array, member_info)) 

584 

585 def before_compound(self, compound: typing.Any, member_info: MemberInfo, 

586 element_index: typing.Optional[int] = None) -> bool: 

587 return self._apply_filters(lambda x: x.before_compound(compound, member_info, element_index)) 

588 

589 def after_compound(self, compound: typing.Any, member_info: MemberInfo, 

590 element_index: typing.Optional[int] = None) -> bool: 

591 return self._apply_filters(lambda x: x.after_compound(compound, member_info, element_index)) 

592 

593 def before_value(self, value: typing.Any, member_info: MemberInfo, 

594 element_index: typing.Optional[int] = None) -> bool: 

595 return self._apply_filters(lambda x: x.before_value(value, member_info, element_index)) 

596 

597 def after_value(self, value: typing.Any, member_info: MemberInfo, 

598 element_index: typing.Optional[int] = None) -> bool: 

599 return self._apply_filters(lambda x: x.after_value(value, member_info, element_index)) 

600 

601 def _apply_filters(self, method): 

602 return functools.reduce(lambda x, y: x and y, map(method, self._walk_filters), True)