Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/json.py: 100%
726 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
1"""
2The module implements WalkObserver for writing of zserio objects to JSON format.
3"""
5import enum
6import io
7import json
8import typing
10from zserio.bitbuffer import BitBuffer
11from zserio.creator import ZserioTreeCreator
12from zserio.exception import PythonRuntimeException
13from zserio.typeinfo import TypeInfo, RecursiveTypeInfo, TypeAttribute, MemberInfo
14from zserio.walker import WalkObserver
16class JsonEnumerableFormat(enum.Enum):
17 """
18 Configuration for writing of enumerable types.
19 """
21 #: Print as JSON integral value.
22 NUMBER = enum.auto()
23 #: Print as JSON string according to the following rules:
24 #:
25 #: #. Enums
26 #:
27 #: * when an exact match with an enumerable item is found, the item name is used - e.g. "FIRST",
28 #: * when no exact match is found, it's an invalid value, the integral value is converted to string
29 #: and an appropriate comment is included - e.g. "10 /\* no match \*/".
30 #:
31 #: #. Bitmasks
32 #:
33 #: * when an exact mach with or-ed bitmask values is found, it's used - e.g. "READ | WRITE",
34 #: * when no exact match is found, but some or-ed values match, the integral value is converted
35 #: to string and the or-ed values are included in a comment - e.g. "127 /\* READ | CREATE \*/",
36 #: * when no match is found at all, the integral value is converted to string and an appropriate
37 #: comment is included - e.g. "13 /\* no match \*/".
38 STRING = enum.auto()
40class JsonWriter(WalkObserver):
41 """
42 Walker observer which dumps zserio objects to JSON format.
43 """
45 def __init__(self, *, text_io: typing.Optional[typing.TextIO] = None,
46 enumerable_format: JsonEnumerableFormat = JsonEnumerableFormat.STRING,
47 item_separator: typing.Optional[str] = None,
48 key_separator: typing.Optional[str] = None,
49 indent: typing.Union[str, int] = None) -> None:
50 """
51 Constructor.
53 :param text_io: Optional text stream for JSON output, io.StringIO is used by default.
54 :param item_separator: Optional item separator, default is ', ' if indent is None, ',' otherwise.
55 :param key_separator: Optional key separator, default is ': '.
56 :param enumerable_format: Optional enumerable format to use, default is JsonEnumerableFormat.STRING.
57 :param indent: String or (non-negative) integer defining the indent. If not None, newlines are inserted.
58 """
60 self._io : typing.TextIO = text_io if text_io else io.StringIO()
61 self._item_separator : str = item_separator if item_separator else ("," if indent is not None else ", ")
62 self._key_separator : str = key_separator if key_separator else ": "
63 self._enumerable_format = enumerable_format
65 self._indent : typing.Optional[str] = (
66 (indent if isinstance(indent, str) else " " * indent) if indent is not None else None
67 )
69 self._is_first = True
70 self._level = 0
71 self._json_encoder = JsonEncoder()
73 def get_io(self) -> typing.TextIO:
74 """
75 Gets the underlying text stream.
77 :returns: Underlying text steam.
78 """
80 return self._io
82 def begin_root(self, _compound: typing.Any) -> None:
83 self._begin_object()
85 def end_root(self, _compound: typing.Any) -> None:
86 self._end_object()
88 def begin_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
89 self._begin_item()
91 self._write_key(member_info.schema_name)
93 self._begin_array()
95 def end_array(self, array: typing.List[typing.Any], member_info: MemberInfo) -> None:
96 self._end_array()
98 self._end_item()
100 def begin_compound(self, compound: typing.Any, member_info: MemberInfo,
101 element_index: typing.Optional[int] = None) -> None:
102 self._begin_item()
104 if element_index is None:
105 self._write_key(member_info.schema_name)
107 self._begin_object()
109 def end_compound(self, compound: typing.Any, member_info: MemberInfo,
110 _element_index: typing.Optional[int] = None) -> None:
111 self._end_object()
113 self._end_item()
115 def visit_value(self, value: typing.Any, member_info: MemberInfo,
116 element_index: typing.Optional[int] = None) -> None:
117 self._begin_item()
119 if element_index is None:
120 self._write_key(member_info.schema_name)
122 self._write_value(value, member_info)
124 self._end_item()
126 def _begin_item(self):
127 if not self._is_first:
128 self._io.write(self._item_separator)
130 if self._indent is not None:
131 self._io.write("\n")
132 if self._indent:
133 self._io.write(self._indent * self._level)
135 def _end_item(self):
136 self._is_first = False
138 def _begin_object(self):
139 self._io.write("{")
141 self._is_first = True
142 self._level += 1
144 def _end_object(self):
145 if self._indent is not None:
146 self._io.write("\n")
147 self._level -= 1
148 if self._indent:
149 self._io.write(self._indent * self._level)
151 self._io.write("}")
153 def _begin_array(self):
154 self._io.write("[")
156 self._is_first = True
157 self._level += 1
159 def _end_array(self):
160 if self._indent is not None:
161 self._io.write("\n")
162 self._level -= 1
163 if self._indent:
164 self._io.write(self._indent * self._level)
166 self._io.write("]")
168 def _write_key(self, key: str) -> None:
169 self._io.write(f"{self._json_encoder.encode_value(key)}{self._key_separator}")
171 def _write_value(self, value: typing.Any, member_info: MemberInfo) -> None:
172 if value is None:
173 self._io.write(self._json_encoder.encode_value(None))
174 return
176 type_info = member_info.type_info
177 if type_info.schema_name == "extern":
178 self._write_bitbuffer(value)
179 elif type_info.schema_name == "bytes":
180 self._write_bytes(value)
181 else:
182 if TypeAttribute.ENUM_ITEMS in type_info.attributes:
183 if self._enumerable_format == JsonEnumerableFormat.STRING:
184 self._write_stringified_enum(value, type_info)
185 else:
186 self._io.write(self._json_encoder.encode_value(value.value))
187 elif TypeAttribute.BITMASK_VALUES in type_info.attributes:
188 if self._enumerable_format == JsonEnumerableFormat.STRING:
189 self._write_stringified_bitmask(value, type_info)
190 else:
191 self._io.write(self._json_encoder.encode_value(value.value))
192 else:
193 self._io.write(self._json_encoder.encode_value(value))
195 def _write_bitbuffer(self, value: BitBuffer) -> None:
196 self._begin_object()
197 self._begin_item()
198 self._write_key("buffer")
199 self._begin_array()
200 for byte in value.buffer:
201 self._begin_item()
202 self._io.write(self._json_encoder.encode_value(byte))
203 self._end_item()
204 self._end_array()
205 self._end_item()
206 self._begin_item()
207 self._write_key("bitSize")
208 self._io.write(self._json_encoder.encode_value(value.bitsize))
209 self._end_item()
210 self._end_object()
212 def _write_bytes(self, value: bytes) -> None:
213 self._begin_object()
214 self._begin_item()
215 self._write_key("buffer")
216 self._begin_array()
217 for byte in value:
218 self._begin_item()
219 self._io.write(self._json_encoder.encode_value(byte))
220 self._end_item()
221 self._end_array()
222 self._end_item()
223 self._end_object()
225 def _write_stringified_enum(self, value: typing.Any,
226 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
227 for item in type_info.attributes[TypeAttribute.ENUM_ITEMS]:
228 if item.py_item == value:
229 # exact match
230 self._io.write(self._json_encoder.encode_value(item.schema_name))
231 return
233 #no match
234 self._io.write(self._json_encoder.encode_value(str(value.value) + " /* no match */"))
236 def _write_stringified_bitmask(self, value: typing.Any,
237 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
238 string_value = ""
239 bitmask_value = value.value
240 value_check = 0
242 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]:
243 is_zero = item_info.py_item.value == 0
244 if ((not is_zero and (bitmask_value & item_info.py_item.value == item_info.py_item.value)) or
245 (is_zero and bitmask_value == 0)):
246 value_check |= item_info.py_item.value
247 if string_value:
248 string_value += " | "
249 string_value += item_info.schema_name
251 if not string_value:
252 # no match
253 string_value += str(bitmask_value) + " /* no match */"
254 elif bitmask_value != value_check:
255 # partial match
256 string_value = str(bitmask_value) + " /* partial match: " + string_value + " */"
257 # else exact match
259 self._io.write(self._json_encoder.encode_value(string_value))
262class JsonEncoder:
263 """
264 Converts zserio values to Json string representation.
265 """
267 def __init__(self) -> None:
268 """
269 Constructor.
270 """
272 self._encoder = json.JSONEncoder(ensure_ascii=False)
274 def encode_value(self, value: typing.Any) -> str:
275 """
276 Encodes value to JSON string representation.
278 :param value: Value to encode.
280 :returns: Value encoded to string as a valid JSON value.
281 """
283 return self._encoder.encode(value)
285class JsonToken(enum.Enum):
286 """
287 Tokens used by Json Tokenizer.
288 """
290 BEGIN_OF_FILE = enum.auto()
291 END_OF_FILE = enum.auto()
292 BEGIN_OBJECT = enum.auto()
293 END_OBJECT = enum.auto()
294 BEGIN_ARRAY = enum.auto()
295 END_ARRAY = enum.auto()
296 KEY_SEPARATOR = enum.auto()
297 ITEM_SEPARATOR = enum.auto()
298 VALUE = enum.auto()
300class JsonParserException(PythonRuntimeException):
301 """
302 Exception used to distinguish exceptions from the JsonParser.
303 """
305class JsonParser:
306 """
307 Json Parser.
309 Parses the JSON on the fly and calls an observer.
310 """
312 class Observer:
313 """
314 Json parser observer.
315 """
317 def begin_object(self) -> None:
318 """
319 Called when a JSON object begins - i.e. on '{'.
320 """
322 raise NotImplementedError()
324 def end_object(self) -> None:
325 """
326 Called when a JSON object ends - i.e. on '}'.
327 """
329 raise NotImplementedError()
331 def begin_array(self) -> None:
332 """
333 Called when a JSON array begins - i.e. on '['.
334 """
336 raise NotImplementedError()
338 def end_array(self) -> None:
339 """
340 Called when a JSON array ends - i.e. on ']'.
341 """
343 raise NotImplementedError()
345 def visit_key(self, key: str) -> None:
346 """
347 Called on a JSON key.
349 :param key: Key value.
350 """
352 raise NotImplementedError()
354 def visit_value(self, value: typing.Any) -> None:
355 """
356 Called on a JSON value.
358 :param value: JSON value.
359 """
361 raise NotImplementedError()
363 def __init__(self, text_io: typing.TextIO, observer: Observer) -> None:
364 """
365 Constructor.
367 :param text_io: Text stream to parse.
368 :param observer: Observer to use.
369 """
371 self._tokenizer = JsonTokenizer(text_io)
372 self._observer = observer
374 def parse(self) -> bool:
375 """
376 Parses single JSON element from the text stream.
378 :returns: True when end-of-file is reached, False otherwise (i.e. another JSON element is present).
379 :raises JsonParserException: When parsing fails.
380 """
382 if self._tokenizer.get_token() == JsonToken.BEGIN_OF_FILE:
383 self._tokenizer.next()
385 if self._tokenizer.get_token() == JsonToken.END_OF_FILE:
386 return True
388 self._parse_element()
390 return self._tokenizer.get_token() == JsonToken.END_OF_FILE
392 def get_line(self) -> int:
393 """
394 Gets current line number.
396 :returns: Line number.
397 """
399 return self._tokenizer.get_line()
401 def get_column(self) -> int:
402 """
403 Gets current column number.
405 :returns: Column number.
406 """
408 return self._tokenizer.get_column()
410 def _parse_element(self) -> None:
411 token = self._tokenizer.get_token()
412 if token == JsonToken.BEGIN_ARRAY:
413 self._parse_array()
414 elif token == JsonToken.BEGIN_OBJECT:
415 self._parse_object()
416 elif token == JsonToken.VALUE:
417 self._parse_value()
418 else:
419 self._raise_unexpected_token(JsonParser.ELEMENT_TOKENS)
421 def _parse_array(self) -> None:
422 self._observer.begin_array()
423 token = self._tokenizer.next()
425 if token in JsonParser.ELEMENT_TOKENS:
426 self._parse_elements()
428 self._consume_token(JsonToken.END_ARRAY)
429 self._observer.end_array()
431 def _parse_elements(self) -> None:
432 self._parse_element()
433 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR:
434 self._tokenizer.next()
435 self._parse_element()
437 def _parse_object(self) -> None:
438 self._observer.begin_object()
439 token = self._tokenizer.next()
440 if token == JsonToken.VALUE:
441 self._parse_members()
443 self._consume_token(JsonToken.END_OBJECT)
444 self._observer.end_object()
446 def _parse_members(self) -> None:
447 self._parse_member()
448 while self._tokenizer.get_token() == JsonToken.ITEM_SEPARATOR:
449 self._tokenizer.next()
450 self._parse_member()
452 def _parse_member(self) -> None:
453 self._check_token(JsonToken.VALUE)
454 key = self._tokenizer.get_value()
455 if not isinstance(key, str):
456 raise JsonParserException(f"JsonParser:{self.get_line()}:{self.get_column()}: "
457 f"Key must be a string value!")
459 self._observer.visit_key(key)
460 self._tokenizer.next()
462 self._consume_token(JsonToken.KEY_SEPARATOR)
464 self._parse_element()
466 def _parse_value(self) -> None:
467 self._observer.visit_value(self._tokenizer.get_value())
468 self._tokenizer.next()
470 def _consume_token(self, token: JsonToken) -> None:
471 self._check_token(token)
472 self._tokenizer.next()
474 def _check_token(self, token: JsonToken) -> None:
475 if self._tokenizer.get_token() != token:
476 self._raise_unexpected_token([token])
478 def _raise_unexpected_token(self, expecting: typing.List[JsonToken]) -> None:
479 msg = (f"JsonParser:{self.get_line()}:{self.get_column()}: "
480 f"Unexpected token: {self._tokenizer.get_token()}")
481 if self._tokenizer.get_value() is not None:
482 msg += f" ('{self._tokenizer.get_value()}')"
483 if len(expecting) == 1:
484 msg += f", expecting {expecting[0]}!"
485 else:
486 msg += ", expecting one of [" + ", ".join([str(token) for token in expecting]) + "]!"
488 raise JsonParserException(msg)
490 ELEMENT_TOKENS = [JsonToken.BEGIN_OBJECT, JsonToken.BEGIN_ARRAY, JsonToken.VALUE]
492class JsonDecoder:
493 """
494 JSON value decoder.
495 """
497 @staticmethod
498 def decode_value(content: str, pos: int) -> 'JsonDecoder.Result':
499 """
500 Decodes the JSON value from the string.
502 :param content: String which contains encoded JSON value.
503 :param pos: Position from zero in content where the encoded JSON value begins.
505 :returns: Decoder result object.
506 """
508 if pos >= len(content):
509 return JsonDecoder.Result.from_failure()
511 first_char = content[pos]
512 if first_char == 'n':
513 return JsonDecoder._decode_literal(content, pos, "null", None)
515 if first_char == 't':
516 return JsonDecoder._decode_literal(content, pos, "true", True)
518 if first_char == 'f':
519 return JsonDecoder._decode_literal(content, pos, "false", False)
521 if first_char == 'N':
522 return JsonDecoder._decode_literal(content, pos, "NaN", float("nan"))
524 if first_char == 'I':
525 return JsonDecoder._decode_literal(content, pos, "Infinity", float("inf"))
527 if first_char == '"':
528 return JsonDecoder._decode_string(content, pos)
530 if first_char == '-':
531 if pos + 1 >= len(content):
532 return JsonDecoder.Result.from_failure(1)
534 second_char = content[pos + 1]
535 if second_char == 'I':
536 return JsonDecoder._decode_literal(content, pos, "-Infinity", float("-inf"))
538 return JsonDecoder._decode_number(content, pos)
540 return JsonDecoder._decode_number(content, pos)
542 class Result:
543 """
544 Decoder result value.
545 """
547 def __init__(self, success: bool, value: typing.Any, num_read_chars: int):
548 """
549 Constructor.
550 """
552 self._success = success
553 self._value = value
554 self._num_read_chars = num_read_chars
556 @classmethod
557 def from_failure(cls: typing.Type['JsonDecoder.Result'], num_read_chars:int = 0):
558 """
559 Creates decoder result value in case of failure.
561 :param num_read_chars: Number of processed characters.
562 """
563 instance = cls(False, None, num_read_chars)
565 return instance
567 @classmethod
568 def from_success(cls: typing.Type['JsonDecoder.Result'], value: typing.Any, num_read_chars:int = 0):
569 """
570 Creates decoder result value in case of success.
572 :param value: Decoded value.
573 :param num_read_chars: Number of read characters.
574 """
575 instance = cls(True, value, num_read_chars)
577 return instance
579 @property
580 def success(self) -> bool:
581 """
582 Gets the decoder result.
584 :returns: True in case of success, otherwise false.
585 """
587 return self._success
589 @property
590 def value(self) -> typing.Any:
591 """
592 Gets the decoded JSON value.
594 :returns: Decoded JSON value or None in case of failure.
595 """
597 return self._value
599 @property
600 def num_read_chars(self) -> int:
601 """
602 Gets the number of read characters from the string which contains encoded JSON value.
604 In case of failure, it returns the number of processed (read) characters.
606 :returns: Number of read characters.
607 """
609 return self._num_read_chars
611 @staticmethod
612 def _decode_literal(content: str, pos: int, text: str, decoded_object) -> 'JsonDecoder.Result':
613 text_length = len(text)
614 if pos + text_length > len(content):
615 return JsonDecoder.Result.from_failure(len(content) - pos)
617 sub_content = content[pos : pos + text_length]
618 if sub_content == text:
619 return JsonDecoder.Result.from_success(decoded_object, text_length)
621 return JsonDecoder.Result.from_failure(text_length)
623 @staticmethod
624 def _decode_string(content: str, pos: int) -> 'JsonDecoder.Result':
625 decoded_string = ""
626 end_of_string_pos = pos + 1 # we know that at the beginning is '"'
627 while True:
628 if end_of_string_pos >= len(content):
629 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
631 next_char = content[end_of_string_pos]
632 end_of_string_pos += 1
633 if next_char == '\\':
634 if end_of_string_pos >= len(content):
635 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
637 next_next_char = content[end_of_string_pos]
638 end_of_string_pos += 1
639 if next_next_char in ('\\', '"'):
640 decoded_string += next_next_char
641 elif next_next_char == 'b':
642 decoded_string += '\b'
643 elif next_next_char == 'f':
644 decoded_string += '\f'
645 elif next_next_char == 'n':
646 decoded_string += '\n'
647 elif next_next_char == 'r':
648 decoded_string += '\r'
649 elif next_next_char == 't':
650 decoded_string += '\t'
651 elif next_next_char == 'u': # unicode escape
652 unicode_escape_len = 4
653 end_of_string_pos += unicode_escape_len
654 if end_of_string_pos >= len(content):
655 return JsonDecoder.Result.from_failure(len(content) - pos)
656 sub_content = content[end_of_string_pos - unicode_escape_len - 2 : end_of_string_pos]
657 decoded_unicode = JsonDecoder._decode_unicode_escape(sub_content)
658 if decoded_unicode is not None:
659 decoded_string += decoded_unicode
660 else:
661 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
662 else:
663 # unknown escape character, not decoded...
664 return JsonDecoder.Result.from_failure(end_of_string_pos - pos)
665 elif next_char == '"':
666 break
667 else:
668 decoded_string += next_char
670 return JsonDecoder.Result.from_success(decoded_string, end_of_string_pos - pos)
672 @staticmethod
673 def _decode_unicode_escape(content: str) -> typing.Optional[str]:
674 try:
675 return bytes(content, "ascii").decode("unicode-escape")
676 except ValueError:
677 return None
679 @staticmethod
680 def _decode_number(content: str, pos: int) -> 'JsonDecoder.Result':
681 number_content, is_float = JsonDecoder._extract_number(content, pos)
682 number_length = len(number_content)
683 if number_length == 0:
684 return JsonDecoder.Result.from_failure(1)
686 try:
687 if is_float:
688 float_number = float(number_content)
689 return JsonDecoder.Result.from_success(float_number, number_length)
690 else:
691 int_number = int(number_content)
692 return JsonDecoder.Result.from_success(int_number, number_length)
693 except ValueError:
694 return JsonDecoder.Result.from_failure(number_length)
696 @staticmethod
697 def _extract_number(content: str, pos: int) -> typing.Tuple[str, bool]:
698 end_of_number_pos = pos
699 if content[end_of_number_pos] == '-': # we already know that there is something after '-'
700 end_of_number_pos += 1
701 is_float = False
702 accept_sign = False
703 while end_of_number_pos < len(content):
704 next_char = content[end_of_number_pos]
706 if accept_sign:
707 accept_sign = False
708 if next_char in ('+', '-'):
709 end_of_number_pos += 1
710 continue
712 if next_char.isdigit():
713 end_of_number_pos += 1
714 continue
716 if not is_float and (next_char in ('.', 'e', 'E')):
717 end_of_number_pos += 1
718 is_float = True
719 if next_char in ('e', 'E'):
720 accept_sign = True
721 continue
723 break # pragma: no cover (to satisfy test coverage)
725 return content[pos:end_of_number_pos], is_float
727class JsonTokenizer:
728 """
729 Tokenizer used by JsonParser.
730 """
732 def __init__(self, text_io: typing.TextIO) -> None:
733 """
734 Constructor.
736 :param text_io: Text stream to tokenize.
737 """
738 self._io = text_io
740 self._content = self._io.read(JsonTokenizer.MAX_LINE_LEN)
741 self._line_number = 1
742 self._column_number = 1
743 self._token_column_number = 1
744 self._pos = 0
745 self._set_token(JsonToken.BEGIN_OF_FILE if self._content else JsonToken.END_OF_FILE, None)
746 self._decoder_result = JsonDecoder.Result.from_failure()
748 def next(self) -> JsonToken:
749 """
750 Moves to next token.
752 :returns: Token.
753 :raises JsonParserException: When unknown token is reached.
754 """
756 while not self._decode_next():
757 new_content = self._io.read(JsonTokenizer.MAX_LINE_LEN)
758 if not new_content:
759 if self._token == JsonToken.END_OF_FILE:
760 self._token_column_number = self._column_number
761 else:
762 # stream is finished but last token is not EOF => value must be at the end
763 self._set_token_value()
765 return self._token
767 self._content = self._content[self._pos:]
768 self._content += new_content
769 self._pos = 0
771 return self._token
773 def get_token(self) -> JsonToken:
774 """
775 Gets current token.
777 :returns: Current token.
778 """
779 return self._token
781 def get_value(self) -> typing.Any:
782 """
783 Gets current value.
785 :returns: Current value.
786 """
787 return self._value
789 def get_line(self) -> int:
790 """
791 Gets line number of the current token.
793 :returns: Line number.
794 """
795 return self._line_number
797 def get_column(self) -> int:
798 """
799 Gets column number of the current token.
801 :returns: Column number.
802 """
803 return self._token_column_number
805 def _decode_next(self) -> bool:
806 if not self._skip_whitespaces():
807 return False
809 self._token_column_number = self._column_number
811 next_char = self._content[self._pos]
812 if next_char == "{":
813 self._set_token(JsonToken.BEGIN_OBJECT, next_char)
814 self._set_position(self._pos + 1, self._column_number + 1)
815 elif next_char == "}":
816 self._set_token(JsonToken.END_OBJECT, next_char)
817 self._set_position(self._pos + 1, self._column_number + 1)
818 elif next_char == "[":
819 self._set_token(JsonToken.BEGIN_ARRAY, next_char)
820 self._set_position(self._pos + 1, self._column_number + 1)
821 elif next_char == "]":
822 self._set_token(JsonToken.END_ARRAY, next_char)
823 self._set_position(self._pos + 1, self._column_number + 1)
824 elif next_char == ":":
825 self._set_token(JsonToken.KEY_SEPARATOR, next_char)
826 self._set_position(self._pos + 1, self._column_number + 1)
827 elif next_char == ",":
828 self._set_token(JsonToken.ITEM_SEPARATOR, next_char)
829 self._set_position(self._pos + 1, self._column_number + 1)
830 else:
831 self._decoder_result = JsonDecoder.decode_value(self._content, self._pos)
832 if self._pos + self._decoder_result.num_read_chars >= len(self._content):
833 return False # we are at the end of chunk => try to read more
835 self._set_token_value()
837 return True
839 def _skip_whitespaces(self) -> bool:
840 while True:
841 if self._pos >= len(self._content):
842 self._set_token(JsonToken.END_OF_FILE, None)
843 return False
845 next_char = self._content[self._pos]
846 if next_char in (' ', '\t'):
847 self._set_position(self._pos + 1, self._column_number + 1)
848 elif next_char == '\n':
849 self._line_number += 1
850 self._set_position(self._pos + 1, 1)
851 elif next_char == '\r':
852 if self._pos + 1 >= len(self._content):
853 self._set_token(JsonToken.END_OF_FILE, None)
854 return False
856 next_next_char = self._content[self._pos + 1]
857 self._line_number += 1
858 self._set_position(self._pos + (2 if (next_next_char == '\n') else 1), 1)
859 else:
860 return True
862 def _set_token(self, new_token: JsonToken, new_value: typing.Any) -> None:
863 self._token = new_token
864 self._value = new_value
866 def _set_position(self, new_pos: int, new_column_number: int) -> None:
867 self._pos = new_pos
868 self._column_number = new_column_number
870 def _set_token_value(self) -> None:
871 if not self._decoder_result.success:
872 raise JsonParserException(f"JsonTokenizer:{self._line_number}:{self._token_column_number}: "
873 f"Unknown token!")
875 self._set_token(JsonToken.VALUE, self._decoder_result.value)
876 num_read_chars = self._decoder_result.num_read_chars
877 self._set_position(self._pos + num_read_chars, self._column_number + num_read_chars)
879 MAX_LINE_LEN = 64 * 1024
881class JsonReader:
882 """
883 Reads zserio object tree defined by a type info from a text stream.
884 """
886 def __init__(self, text_io: typing.TextIO) -> None:
887 """
888 Constructor.
890 :param text_io: Text stream to read.
891 """
893 self._creator_adapter = JsonReader._CreatorAdapter()
894 self._parser = JsonParser(text_io, self._creator_adapter)
896 def read(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> typing.Any:
897 """
898 Reads a zserio object tree defined by the given type info from the text steam.
900 :param type_info: Type info defining the expected zserio object tree.
901 :param arguments: Arguments of type defining the expected zserio object tree.
902 :returns: Zserio object tree initialized using the JSON data.
903 :raises PythonRuntimeException: When the JSON doesn't contain expected zserio object tree.
904 """
906 self._creator_adapter.set_type(type_info, *arguments)
908 try:
909 self._parser.parse()
910 except JsonParserException:
911 raise
912 except PythonRuntimeException as err:
913 raise PythonRuntimeException(f"{err} (JsonParser:"
914 f"{self._parser.get_line()}:{self._parser.get_column()})") from err
916 return self._creator_adapter.get()
918 class _ObjectValueAdapter(JsonParser.Observer):
919 """
920 Adapter for values which are encoded as Json objects.
921 """
923 def get(self) -> typing.Any:
924 """
925 Gets the parsed value.
926 """
928 raise NotImplementedError()
930 class _BitBufferAdapter(_ObjectValueAdapter):
931 """
932 The adapter which allows to parse Bit Buffer object from JSON.
933 """
935 def __init__(self) -> None:
936 """
937 Constructor.
938 """
940 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
941 self._buffer: typing.Optional[typing.List[int]] = None
942 self._bit_size: typing.Optional[int] = None
944 def get(self) -> BitBuffer:
945 """
946 Gets the created Bit Buffer object.
948 :returns: Parsed Bit Buffer object.
949 :raises PythonRuntimeException: In case of invalid use.
950 """
952 if self._buffer is None or self._bit_size is None:
953 raise PythonRuntimeException("JsonReader: Unexpected end in Bit Buffer!")
954 return BitBuffer(bytes(self._buffer), self._bit_size)
956 def begin_object(self) -> None:
957 raise PythonRuntimeException("JsonReader: Unexpected begin object in Bit Buffer!")
959 def end_object(self) -> None:
960 raise PythonRuntimeException("JsonReader: Unexpected end object in Bit Buffer!")
962 def begin_array(self) -> None:
963 if self._state == JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER:
964 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER
965 else:
966 raise PythonRuntimeException("JsonReader: Unexpected begin array in Bit Buffer!")
968 def end_array(self) -> None:
969 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER:
970 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
971 else:
972 raise PythonRuntimeException("JsonReader: Unexpected end array in Bit Buffer!")
974 def visit_key(self, key: str) -> None:
975 if self._state == JsonReader._BitBufferAdapter._State.VISIT_KEY:
976 if key == "buffer":
977 self._state = JsonReader._BitBufferAdapter._State.BEGIN_ARRAY_BUFFER
978 elif key == "bitSize":
979 self._state = JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE
980 else:
981 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in Bit Buffer!")
982 else:
983 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in Bit Buffer!")
985 def visit_value(self, value: typing.Any) -> None:
986 if self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BUFFER and isinstance(value, int):
987 if self._buffer is None:
988 self._buffer = [value]
989 else:
990 self._buffer.append(value)
991 elif (self._state == JsonReader._BitBufferAdapter._State.VISIT_VALUE_BITSIZE and
992 isinstance(value, int)):
993 self._bit_size = value
994 self._state = JsonReader._BitBufferAdapter._State.VISIT_KEY
995 else:
996 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in Bit Buffer!")
998 class _State(enum.Enum):
999 VISIT_KEY = enum.auto()
1000 BEGIN_ARRAY_BUFFER = enum.auto()
1001 VISIT_VALUE_BUFFER = enum.auto()
1002 VISIT_VALUE_BITSIZE = enum.auto()
1004 class _BytesAdapter(_ObjectValueAdapter):
1005 """
1006 The adapter which allows to parse bytes object from JSON.
1007 """
1009 def __init__(self) -> None:
1010 """
1011 Constructor.
1012 """
1014 self._state = JsonReader._BytesAdapter._State.VISIT_KEY
1015 self._buffer: typing.Optional[bytearray] = None
1017 def get(self) -> bytearray:
1018 """
1019 Gets the created bytes object.
1021 :returns: Parsed bytes object.
1022 :raises PythonRuntimeException: In case of invalid use.
1023 """
1025 if self._buffer is None:
1026 raise PythonRuntimeException("JsonReader: Unexpected end in bytes!")
1027 return self._buffer
1029 def begin_object(self) -> None:
1030 raise PythonRuntimeException("JsonReader: Unexpected begin object in bytes!")
1032 def end_object(self) -> None:
1033 raise PythonRuntimeException("JsonReader: Unexpected end object in bytes!")
1035 def begin_array(self) -> None:
1036 if self._state == JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER:
1037 self._state = JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER
1038 else:
1039 raise PythonRuntimeException("JsonReader: Unexpected begin array in bytes!")
1041 def end_array(self) -> None:
1042 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER:
1043 self._state = JsonReader._BytesAdapter._State.VISIT_KEY
1044 else:
1045 raise PythonRuntimeException("JsonReader: Unexpected end array in bytes!")
1047 def visit_key(self, key: str) -> None:
1048 if self._state == JsonReader._BytesAdapter._State.VISIT_KEY:
1049 if key == "buffer":
1050 self._state = JsonReader._BytesAdapter._State.BEGIN_ARRAY_BUFFER
1051 else:
1052 raise PythonRuntimeException(f"JsonReader: Unknown key '{key}' in bytes!")
1053 else:
1054 raise PythonRuntimeException(f"JsonReader: Unexpected key '{key}' in bytes!")
1056 def visit_value(self, value: typing.Any) -> None:
1057 if self._state == JsonReader._BytesAdapter._State.VISIT_VALUE_BUFFER and isinstance(value, int):
1058 if self._buffer is None:
1059 self._buffer = bytearray([value])
1060 else:
1061 self._buffer.append(value)
1062 else:
1063 raise PythonRuntimeException(f"JsonReader: Unexpected value '{value}' in bytes!")
1065 class _State(enum.Enum):
1066 VISIT_KEY = enum.auto()
1067 BEGIN_ARRAY_BUFFER = enum.auto()
1068 VISIT_VALUE_BUFFER = enum.auto()
1070 class _CreatorAdapter(JsonParser.Observer):
1071 """
1072 The adapter which allows to use ZserioTreeCreator as an JsonReader observer.
1073 """
1075 def __init__(self) -> None:
1076 """
1077 Constructor.
1078 """
1080 self._creator: typing.Optional[ZserioTreeCreator] = None
1081 self._key_stack: typing.List[str] = []
1082 self._object: typing.Any = None
1083 self._object_value_adapter: typing.Optional[JsonReader._ObjectValueAdapter] = None
1085 def set_type(self, type_info: TypeInfo, *arguments: typing.List[typing.Any]) -> None:
1086 """
1087 Sets type which shall be created next. Resets the current object.
1089 :param type_info: Type info of the type which is to be created.
1090 :param arguments: Arguments of type defining the expected zserio object tree.
1091 """
1093 self._creator = ZserioTreeCreator(type_info, *arguments)
1094 self._object = None
1096 def get(self) -> typing.Any:
1097 """
1098 Gets the created zserio object tree.
1100 :returns: Zserio object tree.
1101 :raises PythonRuntimeException: In case of invalid use.
1102 """
1104 if not self._object:
1105 raise PythonRuntimeException("JsonReader: Zserio tree not created!")
1107 return self._object
1109 def begin_object(self) -> None:
1110 if self._object_value_adapter:
1111 self._object_value_adapter.begin_object()
1112 else:
1113 if not self._creator:
1114 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1116 if not self._key_stack:
1117 self._creator.begin_root()
1118 else:
1119 if self._key_stack[-1]:
1120 schema_type = self._creator.get_field_type(self._key_stack[-1]).schema_name
1121 if schema_type == "extern":
1122 self._object_value_adapter = JsonReader._BitBufferAdapter()
1123 elif schema_type == "bytes":
1124 self._object_value_adapter = JsonReader._BytesAdapter()
1125 else:
1126 self._creator.begin_compound(self._key_stack[-1])
1127 else:
1128 schema_type = self._creator.get_element_type().schema_name
1129 if schema_type == "extern":
1130 self._object_value_adapter = JsonReader._BitBufferAdapter()
1131 elif schema_type == "bytes":
1132 self._object_value_adapter = JsonReader._BytesAdapter()
1133 else:
1134 self._creator.begin_compound_element()
1136 def end_object(self) -> None:
1137 if self._object_value_adapter:
1138 value = self._object_value_adapter.get()
1139 self._object_value_adapter = None
1140 self.visit_value(value)
1141 else:
1142 if not self._creator:
1143 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1145 if not self._key_stack:
1146 self._object = self._creator.end_root()
1147 self._creator = None
1148 else:
1149 if self._key_stack[-1]:
1150 self._creator.end_compound()
1151 self._key_stack.pop() # finish member
1152 else:
1153 self._creator.end_compound_element()
1155 def begin_array(self) -> None:
1156 if self._object_value_adapter:
1157 self._object_value_adapter.begin_array()
1158 else:
1159 if not self._creator:
1160 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1162 if not self._key_stack:
1163 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!")
1165 self._creator.begin_array(self._key_stack[-1])
1167 self._key_stack.append("")
1169 def end_array(self) -> None:
1170 if self._object_value_adapter:
1171 self._object_value_adapter.end_array()
1172 else:
1173 if not self._creator:
1174 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1176 self._creator.end_array()
1178 self._key_stack.pop() # finish array
1179 self._key_stack.pop() # finish member
1181 def visit_key(self, key: str) -> None:
1182 if self._object_value_adapter:
1183 self._object_value_adapter.visit_key(key)
1184 else:
1185 if not self._creator:
1186 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1188 self._key_stack.append(key)
1190 def visit_value(self, value: typing.Any) -> None:
1191 if self._object_value_adapter:
1192 self._object_value_adapter.visit_value(value)
1193 else:
1194 if not self._creator:
1195 raise PythonRuntimeException("JsonReader: Adapter not initialized!")
1197 if not self._key_stack:
1198 raise PythonRuntimeException("JsonReader: ZserioTreeCreator expects json object!")
1200 if self._key_stack[-1]:
1201 expected_type_info = self._creator.get_field_type(self._key_stack[-1])
1202 self._creator.set_value(self._key_stack[-1], self._convert_value(value, expected_type_info))
1203 self._key_stack.pop() # finish member
1204 else:
1205 expected_type_info = self._creator.get_element_type()
1206 self._creator.add_value_element(self._convert_value(value, expected_type_info))
1208 @staticmethod
1209 def _convert_value(value: typing.Any,
1210 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
1211 if value is None:
1212 return None
1214 if TypeAttribute.ENUM_ITEMS in type_info.attributes:
1215 if isinstance(value, str):
1216 return JsonReader._CreatorAdapter._enum_from_string(value, type_info)
1217 else:
1218 return type_info.py_type(value)
1219 elif TypeAttribute.BITMASK_VALUES in type_info.attributes:
1220 if isinstance(value, str):
1221 return JsonReader._CreatorAdapter._bitmask_from_string(value, type_info)
1222 else:
1223 return type_info.py_type.from_value(value)
1224 else:
1225 return value
1227 @staticmethod
1228 def _enum_from_string(string_value: str,
1229 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
1230 if string_value:
1231 first_char = string_value[0]
1232 if ('A' <= first_char <= 'Z') or ('a' <= first_char <= 'z') or first_char == '_':
1233 py_item = JsonReader._CreatorAdapter._parse_enum_string_value(string_value, type_info)
1234 if py_item is not None:
1235 return py_item
1236 # else it's a no match
1238 raise PythonRuntimeException(f"JsonReader: Cannot create enum '{type_info.schema_name}' "
1239 f"from string value '{string_value}'!")
1241 @staticmethod
1242 def _bitmask_from_string(string_value: str,
1243 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
1244 if string_value:
1245 first_char = string_value[0]
1246 if ('A' <= first_char <= 'Z') or ('a' <= first_char <= 'z') or first_char == '_':
1247 value = JsonReader._CreatorAdapter._parse_bitmask_string_value(string_value, type_info)
1248 if value is not None:
1249 return type_info.py_type.from_value(value)
1250 elif '0' <= first_char <= '9': # bitmask can be only unsigned
1251 value = JsonReader._CreatorAdapter._parse_bitmask_numeric_string_value(string_value)
1252 if value is not None:
1253 return type_info.py_type.from_value(value)
1255 raise PythonRuntimeException(f"JsonReader: Cannot create bitmask '{type_info.schema_name}' "
1256 f"from string value '{string_value}'!")
1258 @staticmethod
1259 def _parse_enum_string_value(string_value: str,
1260 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
1261 for item_info in type_info.attributes[TypeAttribute.ENUM_ITEMS]:
1262 if string_value == item_info.schema_name:
1263 return item_info.py_item
1265 return None
1267 @staticmethod
1268 def _parse_bitmask_string_value(string_value: str,
1269 type_info: typing.Union[TypeInfo, RecursiveTypeInfo]) -> typing.Any:
1270 value = 0
1271 identifiers = string_value.split('|')
1272 for identifier_with_spaces in identifiers:
1273 identifier = identifier_with_spaces.strip()
1274 match = False
1275 for item_info in type_info.attributes[TypeAttribute.BITMASK_VALUES]:
1276 if identifier == item_info.schema_name:
1277 match = True
1278 value |= item_info.py_item.value
1279 break
1281 if not match:
1282 return None
1284 return value
1286 @staticmethod
1287 def _parse_bitmask_numeric_string_value(string_value: str):
1288 number_len = 1
1289 while string_value[number_len] >= '0'and string_value[number_len] <= '9':
1290 number_len += 1
1292 return int(string_value[0:number_len])