Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/json.py: 100%

726 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-12-13 15:12 +0000

1""" 

2The module implements WalkObserver for writing of zserio objects to JSON format. 

3""" 

4 

5import enum 

6import io 

7import json 

8import typing 

9 

10from zserio.bitbuffer import BitBuffer 

11from zserio.creator import ZserioTreeCreator 

12from zserio.exception import PythonRuntimeException 

13from zserio.typeinfo import TypeInfo, RecursiveTypeInfo, TypeAttribute, MemberInfo 

14from zserio.walker import WalkObserver 

15 

16class JsonEnumerableFormat(enum.Enum): 

17 """ 

18 Configuration for writing of enumerable types. 

19 """ 

20 

21 #: Print as JSON integral value. 

22 NUMBER = enum.auto() 

23 #: Print as JSON string according to the following rules: 

24 #: 

25 #: #. Enums 

26 #: 

27 #: * when an exact match with an enumerable item is found, the item name is used - e.g. "FIRST", 

28 #: * when no exact match is found, it's an invalid value, the integral value is converted to string 

29 #: and an appropriate comment is included - e.g. "10 /\* no match \*/". 

30 #: 

31 #: #. Bitmasks 

32 #: 

33 #: * when an exact mach with or-ed bitmask values is found, it's used - e.g. "READ | WRITE", 

34 #: * when no exact match is found, but some or-ed values match, the integral value is converted 

35 #: to string and the or-ed values are included in a comment - e.g. "127 /\* READ | CREATE \*/", 

36 #: * when no match is found at all, the integral value is converted to string and an appropriate 

37 #: comment is included - e.g. "13 /\* no match \*/". 

38 STRING = enum.auto() 

39 

40class JsonWriter(WalkObserver): 

41 """ 

42 Walker observer which dumps zserio objects to JSON format. 

43 """ 

44 

45 def __init__(self, *, text_io: typing.Optional[typing.TextIO] = None, 

46 enumerable_format: JsonEnumerableFormat = JsonEnumerableFormat.STRING, 

47 item_separator: typing.Optional[str] = None, 

48 key_separator: typing.Optional[str] = None, 

49 indent: typing.Union[str, int] = None) -> None: 

50 """ 

51 Constructor. 

52 

53 :param text_io: Optional text stream for JSON output, io.StringIO is used by default. 

54 :param item_separator: Optional item separator, default is ', ' if indent is None, ',' otherwise. 

55 :param key_separator: Optional key separator, default is ': '. 

56 :param enumerable_format: Optional enumerable format to use, default is JsonEnumerableFormat.STRING. 

57 :param indent: String or (non-negative) integer defining the indent. If not None, newlines are inserted. 

58 """ 

59 

60 self._io : typing.TextIO = text_io if text_io else io.StringIO() 

61 self._item_separator : str = item_separator if item_separator else ("," if indent is not None else ", ") 

62 self._key_separator : str = key_separator if key_separator else ": " 

63 self._enumerable_format = enumerable_format 

64 

65 self._indent : typing.Optional[str] = ( 

66 (indent if isinstance(indent, str) else " " * indent) if indent is not None else None 

67 ) 

68 

69 self._is_first = True 

70 self._level = 0 

71 self._json_encoder = JsonEncoder() 

72 

73 def get_io(self) -> typing.TextIO: 

74 """ 

75 Gets the underlying text stream. 

76 

77 :returns: Underlying text steam. 

78 """ 

79 

80 return self._io 

81 

82 def begin_root(self, _compound: typing.Any) -> None: 

83 self._begin_object() 

84 

85 def end_root(self, _compound: typing.Any) -> None: 

86 self._end_object() 

87 

88 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

89 self._begin_item() 

90 

91 self._write_key(member_info.schema_name) 

92 

93 self._begin_array() 

94 

95 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None: 

96 self._end_array() 

97 

98 self._end_item() 

99 

100 def begin_compound(self, compound: typing.Any, member_info: MemberInfo, 

101 element_index: typing.Optional[int] = None) -> None: 

102 self._begin_item() 

103 

104 if element_index is None: 

105 self._write_key(member_info.schema_name) 

106 

107 self._begin_object() 

108 

109 def end_compound(self, compound: typing.Any, member_info: MemberInfo, 

110 _element_index: typing.Optional[int] = None) -> None: 

111 self._end_object() 

112 

113 self._end_item() 

114 

115 def visit_value(self, value: typing.Any, member_info: MemberInfo, 

116 element_index: typing.Optional[int] = None) -> None: 

117 self._begin_item() 

118 

119 if element_index is None: 

120 self._write_key(member_info.schema_name) 

121 

122 self._write_value(value, member_info) 

123 

124 self._end_item() 

125 

126 def _begin_item(self): 

127 if not self._is_first: 

128 self._io.write(self._item_separator) 

129 

130 if self._indent is not None: 

131 self._io.write("\n") 

132 if self._indent: 

133 self._io.write(self._indent * self._level) 

134 

135 def _end_item(self): 

136 self._is_first = False 

137 

138 def _begin_object(self): 

139 self._io.write("{") 

140 

141 self._is_first = True 

142 self._level += 1 

143 

144 def _end_object(self): 

145 if self._indent is not None: 

146 self._io.write("\n") 

147 self._level -= 1 

148 if self._indent: 

149 self._io.write(self._indent * self._level) 

150 

151 self._io.write("}") 

152 

153 def _begin_array(self): 

154 self._io.write("[") 

155 

156 self._is_first = True 

157 self._level += 1 

158 

159 def _end_array(self): 

160 if self._indent is not None: 

161 self._io.write("\n") 

162 self._level -= 1 

163 if self._indent: 

164 self._io.write(self._indent * self._level) 

165 

166 self._io.write("]") 

167 

168 def _write_key(self, key: str) -> None: 

169 self._io.write(f"{self._json_encoder.encode_value(key)}{self._key_separator}") 

170 

171 def _write_value(self, value: typing.Any, member_info: MemberInfo) -> None: 

172 if value is None: 

173 self._io.write(self._json_encoder.encode_value(None)) 

174 return 

175 

176 type_info = member_info.type_info 

177 if type_info.schema_name == "extern": 

178 self._write_bitbuffer(value) 

179 elif type_info.schema_name == "bytes": 

180 self._write_bytes(value) 

181 else: 

182 if TypeAttribute.ENUM_ITEMS in type_info.attributes: 

183 if self._enumerable_format == JsonEnumerableFormat.STRING: 

184 self._write_stringified_enum(value, type_info) 

185 else: 

186 self._io.write(self._json_encoder.encode_value(value.value)) 

187 elif TypeAttribute.BITMASK_VALUES in type_info.attributes: 

188 if self._enumerable_format == JsonEnumerableFormat.STRING: 

189 self._write_stringified_bitmask(value, type_info) 

190 else: 

191 self._io.write(self._json_encoder.encode_value(value.value)) 

192 else: 

193 self._io.write(self._json_encoder.encode_value(value)) 

194 

195 def _write_bitbuffer(self, value: BitBuffer) -> None: 

196 self._begin_object() 

197 self._begin_item() 

198 self._write_key("buffer") 

199 self._begin_array() 

200 for byte in value.buffer: 

201 self._begin_item() 

202 self._io.write(self._json_encoder.encode_value(byte)) 

203 self._end_item() 

204 self._end_array() 

205 self._end_item() 

206 self._begin_item() 

207 self._write_key("bitSize") 

208 self._io.write(self._json_encoder.encode_value(value.bitsize)) 

209 self._end_item() 

210 self._end_object() 

211 

212 def _write_bytes(self, value: bytes) -> None: 

213 self._begin_object() 

214 self._begin_item() 

215 self._write_key("buffer") 

216 self._begin_array() 

217 for byte in value: 

218 self._begin_item() 

219 self._io.write(self._json_encoder.encode_value(byte)) 

220 self._end_item() 

221 self._end_array() 

222 self._end_item() 

223 self._end_object() 

224 

225 def _write_stringified_enum(self, value: typing.Any, 

226 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

227 for item in type_info.attributes[TypeAttribute.ENUM_ITEMS]: 

228 if item.py_item == value: 

229 # exact match 

230 self._io.write(self._json_encoder.encode_value(item.schema_name)) 

231 return 

232 

233 #no match 

234 self._io.write(self._json_encoder.encode_value(str(value.value) + " /* no match */")) 

235 

236 def _write_stringified_bitmask(self, value: typing.Any, 

237 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

238 string_value = "" 

239 bitmask_value = value.value 

240 value_check = 0 

241 

242 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]: 

243 is_zero = item_info.py_item.value == 0 

244 if ((not is_zero and (bitmask_value & item_info.py_item.value == item_info.py_item.value)) or 

245 (is_zero and bitmask_value == 0)): 

246 value_check |= item_info.py_item.value 

247 if string_value: 

248 string_value += " | " 

249 string_value += item_info.schema_name 

250 

251 if not string_value: 

252 # no match 

253 string_value += str(bitmask_value) + " /* no match */" 

254 elif bitmask_value != value_check: 

255 # partial match 

256 string_value = str(bitmask_value) + " /* partial match: " + string_value + " */" 

257 # else exact match 

258 

259 self._io.write(self._json_encoder.encode_value(string_value)) 

260 

261 

262class JsonEncoder: 

263 """ 

264 Converts zserio values to Json string representation. 

265 """ 

266 

267 def __init__(self) -> None: 

268 """ 

269 Constructor. 

270 """ 

271 

272 self._encoder = json.JSONEncoder(ensure_ascii=False) 

273 

274 def encode_value(self, value: typing.Any) -> str: 

275 """ 

276 Encodes value to JSON string representation. 

277 

278 :param value: Value to encode. 

279 

280 :returns: Value encoded to string as a valid JSON value. 

281 """ 

282 

283 return self._encoder.encode(value) 

284 

285class JsonToken(enum.Enum): 

286 """ 

287 Tokens used by Json Tokenizer. 

288 """ 

289 

290 BEGIN_OF_FILE = enum.auto() 

291 END_OF_FILE = enum.auto() 

292 BEGIN_OBJECT = enum.auto() 

293 END_OBJECT = enum.auto() 

294 BEGIN_ARRAY = enum.auto() 

295 END_ARRAY = enum.auto() 

296 KEY_SEPARATOR = enum.auto() 

297 ITEM_SEPARATOR = enum.auto() 

298 VALUE = enum.auto() 

299 

300class JsonParserException(PythonRuntimeException): 

301 """ 

302 Exception used to distinguish exceptions from the JsonParser. 

303 """ 

304 

305class JsonParser: 

306 """ 

307 Json Parser. 

308 

309 Parses the JSON on the fly and calls an observer. 

310 """ 

311 

312 class Observer: 

313 """ 

314 Json parser observer. 

315 """ 

316 

317 def begin_object(self) -> None: 

318 """ 

319 Called when a JSON object begins - i.e. on '{'. 

320 """ 

321 

322 raise NotImplementedError() 

323 

324 def end_object(self) -> None: 

325 """ 

326 Called when a JSON object ends - i.e. on '}'. 

327 """ 

328 

329 raise NotImplementedError() 

330 

331 def begin_array(self) -> None: 

332 """ 

333 Called when a JSON array begins - i.e. on '['. 

334 """ 

335 

336 raise NotImplementedError() 

337 

338 def end_array(self) -> None: 

339 """ 

340 Called when a JSON array ends - i.e. on ']'. 

341 """ 

342 

343 raise NotImplementedError() 

344 

345 def visit_key(self, key: str) -> None: 

346 """ 

347 Called on a JSON key. 

348 

349 :param key: Key value. 

350 """ 

351 

352 raise NotImplementedError() 

353 

354 def visit_value(self, value: typing.Any) -> None: 

355 """ 

356 Called on a JSON value. 

357 

358 :param value: JSON value. 

359 """ 

360 

361 raise NotImplementedError() 

362 

363 def __init__(self, text_io: typing.TextIO, observer: Observer) -> None: 

364 """ 

365 Constructor. 

366 

367 :param text_io: Text stream to parse. 

368 :param observer: Observer to use. 

369 """ 

370 

371 self._tokenizer = JsonTokenizer(text_io) 

372 self._observer = observer 

373 

374 def parse(self) -> bool: 

375 """ 

376 Parses single JSON element from the text stream. 

377 

378 :returns: True when end-of-file is reached, False otherwise (i.e. another JSON element is present). 

379 :raises JsonParserException: When parsing fails. 

380 """ 

381 

382 if self._tokenizer.get_token() == JsonToken.BEGIN_OF_FILE: 

383 self._tokenizer.next() 

384 

385 if self._tokenizer.get_token() == JsonToken.END_OF_FILE: 

386 return True 

387 

388 self._parse_element() 

389 

390 return self._tokenizer.get_token() == JsonToken.END_OF_FILE 

391 

392 def get_line(self) -> int: 

393 """ 

394 Gets current line number. 

395 

396 :returns: Line number. 

397 """ 

398 

399 return self._tokenizer.get_line() 

400 

401 def get_column(self) -> int: 

402 """ 

403 Gets current column number. 

404 

405 :returns: Column number. 

406 """ 

407 

408 return self._tokenizer.get_column() 

409 

410 def _parse_element(self) -> None: 

411 token = self._tokenizer.get_token() 

412 if token == JsonToken.BEGIN_ARRAY: 

413 self._parse_array() 

414 elif token == JsonToken.BEGIN_OBJECT: 

415 self._parse_object() 

416 elif token == JsonToken.VALUE: 

417 self._parse_value() 

418 else: 

419 self._raise_unexpected_token(JsonParser.ELEMENT_TOKENS) 

420 

421 def _parse_array(self) -> None: 

422 self._observer.begin_array() 

423 token = self._tokenizer.next() 

424 

425 if token in JsonParser.ELEMENT_TOKENS: 

426 self._parse_elements() 

427 

428 self._consume_token(JsonToken.END_ARRAY) 

429 self._observer.end_array() 

430 

431 def _parse_elements(self) -> None: 

432 self._parse_element() 

433 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR: 

434 self._tokenizer.next() 

435 self._parse_element() 

436 

437 def _parse_object(self) -> None: 

438 self._observer.begin_object() 

439 token = self._tokenizer.next() 

440 if token == JsonToken.VALUE: 

441 self._parse_members() 

442 

443 self._consume_token(JsonToken.END_OBJECT) 

444 self._observer.end_object() 

445 

446 def _parse_members(self) -> None: 

447 self._parse_member() 

448 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR: 

449 self._tokenizer.next() 

450 self._parse_member() 

451 

452 def _parse_member(self) -> None: 

453 self._check_token(JsonToken.VALUE) 

454 key = self._tokenizer.get_value() 

455 if not isinstance(key, str): 

456 raise JsonParserException(f"JsonParser:{self.get_line()}:{self.get_column()}: " 

457 f"Key must be a string value!") 

458 

459 self._observer.visit_key(key) 

460 self._tokenizer.next() 

461 

462 self._consume_token(JsonToken.KEY_SEPARATOR) 

463 

464 self._parse_element() 

465 

466 def _parse_value(self) -> None: 

467 self._observer.visit_value(self._tokenizer.get_value()) 

468 self._tokenizer.next() 

469 

470 def _consume_token(self, token: JsonToken) -> None: 

471 self._check_token(token) 

472 self._tokenizer.next() 

473 

474 def _check_token(self, token: JsonToken) -> None: 

475 if self._tokenizer.get_token() != token: 

476 self._raise_unexpected_token([token]) 

477 

478 def _raise_unexpected_token(self, expecting: typing.List[JsonToken]) -> None: 

479 msg = (f"JsonParser:{self.get_line()}:{self.get_column()}: " 

480 f"Unexpected token: {self._tokenizer.get_token()}") 

481 if self._tokenizer.get_value() is not None: 

482 msg += f" ('{self._tokenizer.get_value()}')" 

483 if len(expecting) == 1: 

484 msg += f", expecting {expecting[0]}!" 

485 else: 

486 msg += ", expecting one of [" + ", ".join([str(token) for token in expecting]) + "]!" 

487 

488 raise JsonParserException(msg) 

489 

490 ELEMENT_TOKENS = [JsonToken.BEGIN_OBJECT, JsonToken.BEGIN_ARRAY, JsonToken.VALUE] 

491 

492class JsonDecoder: 

493 """ 

494 JSON value decoder. 

495 """ 

496 

497 @staticmethod 

498 def decode_value(content: str, pos: int) -> 'JsonDecoder.Result': 

499 """ 

500 Decodes the JSON value from the string. 

501 

502 :param content: String which contains encoded JSON value. 

503 :param pos: Position from zero in content where the encoded JSON value begins. 

504 

505 :returns: Decoder result object. 

506 """ 

507 

508 if pos >= len(content): 

509 return JsonDecoder.Result.from_failure() 

510 

511 first_char = content[pos] 

512 if first_char == 'n': 

513 return JsonDecoder._decode_literal(content, pos, "null", None) 

514 

515 if first_char == 't': 

516 return JsonDecoder._decode_literal(content, pos, "true", True) 

517 

518 if first_char == 'f': 

519 return JsonDecoder._decode_literal(content, pos, "false", False) 

520 

521 if first_char == 'N': 

522 return JsonDecoder._decode_literal(content, pos, "NaN", float("nan")) 

523 

524 if first_char == 'I': 

525 return JsonDecoder._decode_literal(content, pos, "Infinity", float("inf")) 

526 

527 if first_char == '"': 

528 return JsonDecoder._decode_string(content, pos) 

529 

530 if first_char == '-': 

531 if pos + 1 >= len(content): 

532 return JsonDecoder.Result.from_failure(1) 

533 

534 second_char = content[pos + 1] 

535 if second_char == 'I': 

536 return JsonDecoder._decode_literal(content, pos, "-Infinity", float("-inf")) 

537 

538 return JsonDecoder._decode_number(content, pos) 

539 

540 return JsonDecoder._decode_number(content, pos) 

541 

542 class Result: 

543 """ 

544 Decoder result value. 

545 """ 

546 

547 def __init__(self, success: bool, value: typing.Any, num_read_chars: int): 

548 """ 

549 Constructor. 

550 """ 

551 

552 self._success = success 

553 self._value = value 

554 self._num_read_chars = num_read_chars 

555 

556 @classmethod 

557 def from_failure(cls: typing.Type['JsonDecoder.Result'], num_read_chars:int = 0): 

558 """ 

559 Creates decoder result value in case of failure. 

560 

561 :param num_read_chars: Number of processed characters. 

562 """ 

563 instance = cls(False, None, num_read_chars) 

564 

565 return instance 

566 

567 @classmethod 

568 def from_success(cls: typing.Type['JsonDecoder.Result'], value: typing.Any, num_read_chars:int = 0): 

569 """ 

570 Creates decoder result value in case of success. 

571 

572 :param value: Decoded value. 

573 :param num_read_chars: Number of read characters. 

574 """ 

575 instance = cls(True, value, num_read_chars) 

576 

577 return instance 

578 

579 @property 

580 def success(self) -> bool: 

581 """ 

582 Gets the decoder result. 

583 

584 :returns: True in case of success, otherwise false. 

585 """ 

586 

587 return self._success 

588 

589 @property 

590 def value(self) -> typing.Any: 

591 """ 

592 Gets the decoded JSON value. 

593 

594 :returns: Decoded JSON value or None in case of failure. 

595 """ 

596 

597 return self._value 

598 

599 @property 

600 def num_read_chars(self) -> int: 

601 """ 

602 Gets the number of read characters from the string which contains encoded JSON value. 

603 

604 In case of failure, it returns the number of processed (read) characters. 

605 

606 :returns: Number of read characters. 

607 """ 

608 

609 return self._num_read_chars 

610 

611 @staticmethod 

612 def _decode_literal(content: str, pos: int, text: str, decoded_object) -> 'JsonDecoder.Result': 

613 text_length = len(text) 

614 if pos + text_length > len(content): 

615 return JsonDecoder.Result.from_failure(len(content) - pos) 

616 

617 sub_content = content[pos : pos + text_length] 

618 if sub_content == text: 

619 return JsonDecoder.Result.from_success(decoded_object, text_length) 

620 

621 return JsonDecoder.Result.from_failure(text_length) 

622 

623 @staticmethod 

624 def _decode_string(content: str, pos: int) -> 'JsonDecoder.Result': 

625 decoded_string = "" 

626 end_of_string_pos = pos + 1 # we know that at the beginning is '"' 

627 while True: 

628 if end_of_string_pos >= len(content): 

629 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

630 

631 next_char = content[end_of_string_pos] 

632 end_of_string_pos += 1 

633 if next_char == '\\': 

634 if end_of_string_pos >= len(content): 

635 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

636 

637 next_next_char = content[end_of_string_pos] 

638 end_of_string_pos += 1 

639 if next_next_char in ('\\', '"'): 

640 decoded_string += next_next_char 

641 elif next_next_char == 'b': 

642 decoded_string += '\b' 

643 elif next_next_char == 'f': 

644 decoded_string += '\f' 

645 elif next_next_char == 'n': 

646 decoded_string += '\n' 

647 elif next_next_char == 'r': 

648 decoded_string += '\r' 

649 elif next_next_char == 't': 

650 decoded_string += '\t' 

651 elif next_next_char == 'u': # unicode escape 

652 unicode_escape_len = 4 

653 end_of_string_pos += unicode_escape_len 

654 if end_of_string_pos >= len(content): 

655 return JsonDecoder.Result.from_failure(len(content) - pos) 

656 sub_content = content[end_of_string_pos - unicode_escape_len - 2 : end_of_string_pos] 

657 decoded_unicode = JsonDecoder._decode_unicode_escape(sub_content) 

658 if decoded_unicode is not None: 

659 decoded_string += decoded_unicode 

660 else: 

661 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

662 else: 

663 # unknown escape character, not decoded... 

664 return JsonDecoder.Result.from_failure(end_of_string_pos - pos) 

665 elif next_char == '"': 

666 break 

667 else: 

668 decoded_string += next_char 

669 

670 return JsonDecoder.Result.from_success(decoded_string, end_of_string_pos - pos) 

671 

672 @staticmethod 

673 def _decode_unicode_escape(content: str) -> typing.Optional[str]: 

674 try: 

675 return bytes(content, "ascii").decode("unicode-escape") 

676 except ValueError: 

677 return None 

678 

679 @staticmethod 

680 def _decode_number(content: str, pos: int) -> 'JsonDecoder.Result': 

681 number_content, is_float = JsonDecoder._extract_number(content, pos) 

682 number_length = len(number_content) 

683 if number_length == 0: 

684 return JsonDecoder.Result.from_failure(1) 

685 

686 try: 

687 if is_float: 

688 float_number = float(number_content) 

689 return JsonDecoder.Result.from_success(float_number, number_length) 

690 else: 

691 int_number = int(number_content) 

692 return JsonDecoder.Result.from_success(int_number, number_length) 

693 except ValueError: 

694 return JsonDecoder.Result.from_failure(number_length) 

695 

696 @staticmethod 

697 def _extract_number(content: str, pos: int) -> typing.Tuple[str, bool]: 

698 end_of_number_pos = pos 

699 if content[end_of_number_pos] == '-': # we already know that there is something after '-' 

700 end_of_number_pos += 1 

701 is_float = False 

702 accept_sign = False 

703 while end_of_number_pos < len(content): 

704 next_char = content[end_of_number_pos] 

705 

706 if accept_sign: 

707 accept_sign = False 

708 if next_char in ('+', '-'): 

709 end_of_number_pos += 1 

710 continue 

711 

712 if next_char.isdigit(): 

713 end_of_number_pos += 1 

714 continue 

715 

716 if not is_float and (next_char in ('.', 'e', 'E')): 

717 end_of_number_pos += 1 

718 is_float = True 

719 if next_char in ('e', 'E'): 

720 accept_sign = True 

721 continue 

722 

723 break # pragma: no cover (to satisfy test coverage) 

724 

725 return content[pos:end_of_number_pos], is_float 

726 

727class JsonTokenizer: 

728 """ 

729 Tokenizer used by JsonParser. 

730 """ 

731 

732 def __init__(self, text_io: typing.TextIO) -> None: 

733 """ 

734 Constructor. 

735 

736 :param text_io: Text stream to tokenize. 

737 """ 

738 self._io = text_io 

739 

740 self._content = self._io.read(JsonTokenizer.MAX_LINE_LEN) 

741 self._line_number = 1 

742 self._column_number = 1 

743 self._token_column_number = 1 

744 self._pos = 0 

745 self._set_token(JsonToken.BEGIN_OF_FILE if self._content else JsonToken.END_OF_FILE, None) 

746 self._decoder_result = JsonDecoder.Result.from_failure() 

747 

748 def next(self) -> JsonToken: 

749 """ 

750 Moves to next token. 

751 

752 :returns: Token. 

753 :raises JsonParserException: When unknown token is reached. 

754 """ 

755 

756 while not self._decode_next(): 

757 new_content = self._io.read(JsonTokenizer.MAX_LINE_LEN) 

758 if not new_content: 

759 if self._token == JsonToken.END_OF_FILE: 

760 self._token_column_number = self._column_number 

761 else: 

762 # stream is finished but last token is not EOF => value must be at the end 

763 self._set_token_value() 

764 

765 return self._token 

766 

767 self._content = self._content[self._pos:] 

768 self._content += new_content 

769 self._pos = 0 

770 

771 return self._token 

772 

773 def get_token(self) -> JsonToken: 

774 """ 

775 Gets current token. 

776 

777 :returns: Current token. 

778 """ 

779 return self._token 

780 

781 def get_value(self) -> typing.Any: 

782 """ 

783 Gets current value. 

784 

785 :returns: Current value. 

786 """ 

787 return self._value 

788 

789 def get_line(self) -> int: 

790 """ 

791 Gets line number of the current token. 

792 

793 :returns: Line number. 

794 """ 

795 return self._line_number 

796 

797 def get_column(self) -> int: 

798 """ 

799 Gets column number of the current token. 

800 

801 :returns: Column number. 

802 """ 

803 return self._token_column_number 

804 

805 def _decode_next(self) -> bool: 

806 if not self._skip_whitespaces(): 

807 return False 

808 

809 self._token_column_number = self._column_number 

810 

811 next_char = self._content[self._pos] 

812 if next_char == "{": 

813 self._set_token(JsonToken.BEGIN_OBJECT, next_char) 

814 self._set_position(self._pos + 1, self._column_number + 1) 

815 elif next_char == "}": 

816 self._set_token(JsonToken.END_OBJECT, next_char) 

817 self._set_position(self._pos + 1, self._column_number + 1) 

818 elif next_char == "[": 

819 self._set_token(JsonToken.BEGIN_ARRAY, next_char) 

820 self._set_position(self._pos + 1, self._column_number + 1) 

821 elif next_char == "]": 

822 self._set_token(JsonToken.END_ARRAY, next_char) 

823 self._set_position(self._pos + 1, self._column_number + 1) 

824 elif next_char == ":": 

825 self._set_token(JsonToken.KEY_SEPARATOR, next_char) 

826 self._set_position(self._pos + 1, self._column_number + 1) 

827 elif next_char == ",": 

828 self._set_token(JsonToken.ITEM_SEPARATOR, next_char) 

829 self._set_position(self._pos + 1, self._column_number + 1) 

830 else: 

831 self._decoder_result = JsonDecoder.decode_value(self._content, self._pos) 

832 if self._pos + self._decoder_result.num_read_chars >= len(self._content): 

833 return False # we are at the end of chunk => try to read more 

834 

835 self._set_token_value() 

836 

837 return True 

838 

839 def _skip_whitespaces(self) -> bool: 

840 while True: 

841 if self._pos >= len(self._content): 

842 self._set_token(JsonToken.END_OF_FILE, None) 

843 return False 

844 

845 next_char = self._content[self._pos] 

846 if next_char in (' ', '\t'): 

847 self._set_position(self._pos + 1, self._column_number + 1) 

848 elif next_char == '\n': 

849 self._line_number += 1 

850 self._set_position(self._pos + 1, 1) 

851 elif next_char == '\r': 

852 if self._pos + 1 >= len(self._content): 

853 self._set_token(JsonToken.END_OF_FILE, None) 

854 return False 

855 

856 next_next_char = self._content[self._pos + 1] 

857 self._line_number += 1 

858 self._set_position(self._pos + (2 if (next_next_char == '\n') else 1), 1) 

859 else: 

860 return True 

861 

862 def _set_token(self, new_token: JsonToken, new_value: typing.Any) -> None: 

863 self._token = new_token 

864 self._value = new_value 

865 

866 def _set_position(self, new_pos: int, new_column_number: int) -> None: 

867 self._pos = new_pos 

868 self._column_number = new_column_number 

869 

870 def _set_token_value(self) -> None: 

871 if not self._decoder_result.success: 

872 raise JsonParserException(f"JsonTokenizer:{self._line_number}:{self._token_column_number}: " 

873 f"Unknown token!") 

874 

875 self._set_token(JsonToken.VALUE, self._decoder_result.value) 

876 num_read_chars = self._decoder_result.num_read_chars 

877 self._set_position(self._pos + num_read_chars, self._column_number + num_read_chars) 

878 

879 MAX_LINE_LEN = 64 * 1024 

880 

881class JsonReader: 

882 """ 

883 Reads zserio object tree defined by a type info from a text stream. 

884 """ 

885 

886 def __init__(self, text_io: typing.TextIO) -> None: 

887 """ 

888 Constructor. 

889 

890 :param text_io: Text stream to read. 

891 """ 

892 

893 self._creator_adapter = JsonReader._CreatorAdapter() 

894 self._parser = JsonParser(text_io, self._creator_adapter) 

895 

896 def read(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> typing.Any: 

897 """ 

898 Reads a zserio object tree defined by the given type info from the text steam. 

899 

900 :param type_info: Type info defining the expected zserio object tree. 

901 :param arguments: Arguments of type defining the expected zserio object tree. 

902 :returns: Zserio object tree initialized using the JSON data. 

903 :raises PythonRuntimeException: When the JSON doesn't contain expected zserio object tree. 

904 """ 

905 

906 self._creator_adapter.set_type(type_info, *arguments) 

907 

908 try: 

909 self._parser.parse() 

910 except JsonParserException: 

911 raise 

912 except PythonRuntimeException as err: 

913 raise PythonRuntimeException(f"{err} (JsonParser:" 

914 f"{self._parser.get_line()}:{self._parser.get_column()})") from err 

915 

916 return self._creator_adapter.get() 

917 

918 class _ObjectValueAdapter(JsonParser.Observer): 

919 """ 

920 Adapter for values which are encoded as Json objects. 

921 """ 

922 

923 def get(self) -> typing.Any: 

924 """ 

925 Gets the parsed value. 

926 """ 

927 

928 raise NotImplementedError() 

929 

930 class _BitBufferAdapter(_ObjectValueAdapter): 

931 """ 

932 The adapter which allows to parse Bit Buffer object from JSON. 

933 """ 

934 

935 def __init__(self) -> None: 

936 """ 

937 Constructor. 

938 """ 

939 

940 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

941 self._buffer: typing.Optional[typing.List[int]] = None 

942 self._bit_size: typing.Optional[int] = None 

943 

944 def get(self) -> BitBuffer: 

945 """ 

946 Gets the created Bit Buffer object. 

947 

948 :returns: Parsed Bit Buffer object. 

949 :raises PythonRuntimeException: In case of invalid use. 

950 """ 

951 

952 if self._buffer is None or self._bit_size is None: 

953 raise PythonRuntimeException("JsonReader: Unexpected end in Bit Buffer!") 

954 return BitBuffer(bytes(self._buffer), self._bit_size) 

955 

956 def begin_object(self) -> None: 

957 raise PythonRuntimeException("JsonReader: Unexpected begin object in Bit Buffer!") 

958 

959 def end_object(self) -> None: 

960 raise PythonRuntimeException("JsonReader: Unexpected end object in Bit Buffer!") 

961 

962 def begin_array(self) -> None: 

963 if self._state == JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER: 

964 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER 

965 else: 

966 raise PythonRuntimeException("JsonReader: Unexpected begin array in Bit Buffer!") 

967 

968 def end_array(self) -> None: 

969 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER: 

970 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

971 else: 

972 raise PythonRuntimeException("JsonReader: Unexpected end array in Bit Buffer!") 

973 

974 def visit_key(self, key: str) -> None: 

975 if self._state == JsonReader._BitBufferAdapter._State.VISIT_KEY: 

976 if key == "buffer": 

977 self._state = JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER 

978 elif key == "bitSize": 

979 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE 

980 else: 

981 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in Bit Buffer!") 

982 else: 

983 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in Bit Buffer!") 

984 

985 def visit_value(self, value: typing.Any) -> None: 

986 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER and isinstance(value, int): 

987 if self._buffer is None: 

988 self._buffer = [value] 

989 else: 

990 self._buffer.append(value) 

991 elif (self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE and 

992 isinstance(value, int)): 

993 self._bit_size = value 

994 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY 

995 else: 

996 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in Bit Buffer!") 

997 

998 class _State(enum.Enum): 

999 VISIT_KEY = enum.auto() 

1000 BEGIN_ARRAY_BUFFER = enum.auto() 

1001 VISIT_VALUE_BUFFER = enum.auto() 

1002 VISIT_VALUE_BITSIZE = enum.auto() 

1003 

1004 class _BytesAdapter(_ObjectValueAdapter): 

1005 """ 

1006 The adapter which allows to parse bytes object from JSON. 

1007 """ 

1008 

1009 def __init__(self) -> None: 

1010 """ 

1011 Constructor. 

1012 """ 

1013 

1014 self._state = JsonReader._BytesAdapter._State.VISIT_KEY 

1015 self._buffer: typing.Optional[bytearray] = None 

1016 

1017 def get(self) -> bytearray: 

1018 """ 

1019 Gets the created bytes object. 

1020 

1021 :returns: Parsed bytes object. 

1022 :raises PythonRuntimeException: In case of invalid use. 

1023 """ 

1024 

1025 if self._buffer is None: 

1026 raise PythonRuntimeException("JsonReader: Unexpected end in bytes!") 

1027 return self._buffer 

1028 

1029 def begin_object(self) -> None: 

1030 raise PythonRuntimeException("JsonReader: Unexpected begin object in bytes!") 

1031 

1032 def end_object(self) -> None: 

1033 raise PythonRuntimeException("JsonReader: Unexpected end object in bytes!") 

1034 

1035 def begin_array(self) -> None: 

1036 if self._state == JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER: 

1037 self._state = JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER 

1038 else: 

1039 raise PythonRuntimeException("JsonReader: Unexpected begin array in bytes!") 

1040 

1041 def end_array(self) -> None: 

1042 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER: 

1043 self._state = JsonReader._BytesAdapter._State.VISIT_KEY 

1044 else: 

1045 raise PythonRuntimeException("JsonReader: Unexpected end array in bytes!") 

1046 

1047 def visit_key(self, key: str) -> None: 

1048 if self._state == JsonReader._BytesAdapter._State.VISIT_KEY: 

1049 if key == "buffer": 

1050 self._state = JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER 

1051 else: 

1052 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in bytes!") 

1053 else: 

1054 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in bytes!") 

1055 

1056 def visit_value(self, value: typing.Any) -> None: 

1057 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER and isinstance(value, int): 

1058 if self._buffer is None: 

1059 self._buffer = bytearray([value]) 

1060 else: 

1061 self._buffer.append(value) 

1062 else: 

1063 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in bytes!") 

1064 

1065 class _State(enum.Enum): 

1066 VISIT_KEY = enum.auto() 

1067 BEGIN_ARRAY_BUFFER = enum.auto() 

1068 VISIT_VALUE_BUFFER = enum.auto() 

1069 

1070 class _CreatorAdapter(JsonParser.Observer): 

1071 """ 

1072 The adapter which allows to use ZserioTreeCreator as an JsonReader observer. 

1073 """ 

1074 

1075 def __init__(self) -> None: 

1076 """ 

1077 Constructor. 

1078 """ 

1079 

1080 self._creator: typing.Optional[ZserioTreeCreator] = None 

1081 self._key_stack: typing.List[str] = [] 

1082 self._object: typing.Any = None 

1083 self._object_value_adapter: typing.Optional[JsonReader._ObjectValueAdapter] = None 

1084 

1085 def set_type(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> None: 

1086 """ 

1087 Sets type which shall be created next. Resets the current object. 

1088 

1089 :param type_info: Type info of the type which is to be created. 

1090 :param arguments: Arguments of type defining the expected zserio object tree. 

1091 """ 

1092 

1093 self._creator = ZserioTreeCreator(type_info, *arguments) 

1094 self._object = None 

1095 

1096 def get(self) -> typing.Any: 

1097 """ 

1098 Gets the created zserio object tree. 

1099 

1100 :returns: Zserio object tree. 

1101 :raises PythonRuntimeException: In case of invalid use. 

1102 """ 

1103 

1104 if not self._object: 

1105 raise PythonRuntimeException("JsonReader: Zserio tree not created!") 

1106 

1107 return self._object 

1108 

1109 def begin_object(self) -> None: 

1110 if self._object_value_adapter: 

1111 self._object_value_adapter.begin_object() 

1112 else: 

1113 if not self._creator: 

1114 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1115 

1116 if not self._key_stack: 

1117 self._creator.begin_root() 

1118 else: 

1119 if self._key_stack[-1]: 

1120 schema_type = self._creator.get_field_type(self._key_stack[-1]).schema_name 

1121 if schema_type == "extern": 

1122 self._object_value_adapter = JsonReader._BitBufferAdapter() 

1123 elif schema_type == "bytes": 

1124 self._object_value_adapter = JsonReader._BytesAdapter() 

1125 else: 

1126 self._creator.begin_compound(self._key_stack[-1]) 

1127 else: 

1128 schema_type = self._creator.get_element_type().schema_name 

1129 if schema_type == "extern": 

1130 self._object_value_adapter = JsonReader._BitBufferAdapter() 

1131 elif schema_type == "bytes": 

1132 self._object_value_adapter = JsonReader._BytesAdapter() 

1133 else: 

1134 self._creator.begin_compound_element() 

1135 

1136 def end_object(self) -> None: 

1137 if self._object_value_adapter: 

1138 value = self._object_value_adapter.get() 

1139 self._object_value_adapter = None 

1140 self.visit_value(value) 

1141 else: 

1142 if not self._creator: 

1143 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1144 

1145 if not self._key_stack: 

1146 self._object = self._creator.end_root() 

1147 self._creator = None 

1148 else: 

1149 if self._key_stack[-1]: 

1150 self._creator.end_compound() 

1151 self._key_stack.pop() # finish member 

1152 else: 

1153 self._creator.end_compound_element() 

1154 

1155 def begin_array(self) -> None: 

1156 if self._object_value_adapter: 

1157 self._object_value_adapter.begin_array() 

1158 else: 

1159 if not self._creator: 

1160 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1161 

1162 if not self._key_stack: 

1163 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!") 

1164 

1165 self._creator.begin_array(self._key_stack[-1]) 

1166 

1167 self._key_stack.append("") 

1168 

1169 def end_array(self) -> None: 

1170 if self._object_value_adapter: 

1171 self._object_value_adapter.end_array() 

1172 else: 

1173 if not self._creator: 

1174 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1175 

1176 self._creator.end_array() 

1177 

1178 self._key_stack.pop() # finish array 

1179 self._key_stack.pop() # finish member 

1180 

1181 def visit_key(self, key: str) -> None: 

1182 if self._object_value_adapter: 

1183 self._object_value_adapter.visit_key(key) 

1184 else: 

1185 if not self._creator: 

1186 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1187 

1188 self._key_stack.append(key) 

1189 

1190 def visit_value(self, value: typing.Any) -> None: 

1191 if self._object_value_adapter: 

1192 self._object_value_adapter.visit_value(value) 

1193 else: 

1194 if not self._creator: 

1195 raise PythonRuntimeException("JsonReader: Adapter not initialized!") 

1196 

1197 if not self._key_stack: 

1198 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!") 

1199 

1200 if self._key_stack[-1]: 

1201 expected_type_info = self._creator.get_field_type(self._key_stack[-1]) 

1202 self._creator.set_value(self._key_stack[-1], self._convert_value(value, expected_type_info)) 

1203 self._key_stack.pop() # finish member 

1204 else: 

1205 expected_type_info = self._creator.get_element_type() 

1206 self._creator.add_value_element(self._convert_value(value, expected_type_info)) 

1207 

1208 @staticmethod 

1209 def _convert_value(value: typing.Any, 

1210 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

1211 if value is None: 

1212 return None 

1213 

1214 if TypeAttribute.ENUM_ITEMS in type_info.attributes: 

1215 if isinstance(value, str): 

1216 return JsonReader._CreatorAdapter._enum_from_string(value, type_info) 

1217 else: 

1218 return type_info.py_type(value) 

1219 elif TypeAttribute.BITMASK_VALUES in type_info.attributes: 

1220 if isinstance(value, str): 

1221 return JsonReader._CreatorAdapter._bitmask_from_string(value, type_info) 

1222 else: 

1223 return type_info.py_type.from_value(value) 

1224 else: 

1225 return value 

1226 

1227 @staticmethod 

1228 def _enum_from_string(string_value: str, 

1229 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

1230 if string_value: 

1231 first_char = string_value[0] 

1232 if ('A' <= first_char <= 'Z') or ('a' <= first_char <= 'z') or first_char == '_': 

1233 py_item = JsonReader._CreatorAdapter._parse_enum_string_value(string_value, type_info) 

1234 if py_item is not None: 

1235 return py_item 

1236 # else it's a no match 

1237 

1238 raise PythonRuntimeException(f"JsonReader: Cannot create enum '{type_info.schema_name}' " 

1239 f"from string value '{string_value}'!") 

1240 

1241 @staticmethod 

1242 def _bitmask_from_string(string_value: str, 

1243 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

1244 if string_value: 

1245 first_char = string_value[0] 

1246 if ('A' <= first_char <= 'Z') or ('a' <= first_char <= 'z') or first_char == '_': 

1247 value = JsonReader._CreatorAdapter._parse_bitmask_string_value(string_value, type_info) 

1248 if value is not None: 

1249 return type_info.py_type.from_value(value) 

1250 elif '0' <= first_char <= '9': # bitmask can be only unsigned 

1251 value = JsonReader._CreatorAdapter._parse_bitmask_numeric_string_value(string_value) 

1252 if value is not None: 

1253 return type_info.py_type.from_value(value) 

1254 

1255 raise PythonRuntimeException(f"JsonReader: Cannot create bitmask '{type_info.schema_name}' " 

1256 f"from string value '{string_value}'!") 

1257 

1258 @staticmethod 

1259 def _parse_enum_string_value(string_value: str, 

1260 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

1261 for item_info in type_info.attributes[TypeAttribute.ENUM_ITEMS]: 

1262 if string_value == item_info.schema_name: 

1263 return item_info.py_item 

1264 

1265 return None 

1266 

1267 @staticmethod 

1268 def _parse_bitmask_string_value(string_value: str, 

1269 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any: 

1270 value = 0 

1271 identifiers = string_value.split('|') 

1272 for identifier_with_spaces in identifiers: 

1273 identifier = identifier_with_spaces.strip() 

1274 match = False 

1275 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]: 

1276 if identifier == item_info.schema_name: 

1277 match = True 

1278 value |= item_info.py_item.value 

1279 break 

1280 

1281 if not match: 

1282 return None 

1283 

1284 return value 

1285 

1286 @staticmethod 

1287 def _parse_bitmask_numeric_string_value(string_value: str): 

1288 number_len = 1 

1289 while string_value[number_len] >= '0'and string_value[number_len] <= '9': 

1290 number_len += 1 

1291 

1292 return int(string_value[0:number_len])