Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/bitreader.py: 100%
330 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
1"""
2The module implements abstraction for reading data to the bit stream.
3"""
5import typing
7from zserio.bitbuffer import BitBuffer
8from zserio.limits import INT64_MIN
9from zserio.exception import PythonRuntimeException
10from zserio.float import uint16_to_float, uint32_to_float, uint64_to_float
11from zserio.cppbind import import_cpp_class
13class BitStreamReader:
14 """
15 Bit stream reader.
16 """
18 def __init__(self, buffer: bytes, bitsize: typing.Optional[int] = None) -> None:
19 """
20 Constructs bit stream reader from bytes buffer.
22 Because bit buffer size does not have to be byte aligned (divisible by 8), it's possible that not all
23 bits of the last byte are used. In this case, only most significant bits of the corresponded size are
24 used.
26 :param buffer: Bytes-like buffer to read as a bit stream.
27 :param bitsize: Number of bits stored in buffer to use.
28 :raises PythonRuntimeException: If bitsize is out of range.
29 """
31 if bitsize is None:
32 bitsize = len(buffer) * 8
33 elif len(buffer) * 8 < bitsize:
34 raise PythonRuntimeException(f"BitStreamReader: Bit size '{bitsize}' out of range "
35 f"for the given buffer byte size '{len(buffer)}'!")
37 self._buffer: bytes = buffer
38 self._bitsize: int = bitsize
39 self._bitposition: int = 0
41 @classmethod
42 def from_bitbuffer(cls: typing.Type['BitStreamReader'], bitbuffer: BitBuffer) -> 'BitStreamReader':
43 """
44 Constructs bit stream reader from bit buffer.
46 :param bitbuffer: Bit buffer to read as a bit stream.
47 """
49 instance = cls(bitbuffer.buffer, bitbuffer.bitsize)
51 return instance
53 @classmethod
54 def from_file(cls: typing.Type['BitStreamReader'], filename: str) -> 'BitStreamReader':
55 """
56 Constructs bit stream reader from file.
58 :param filename: Filename to read as a bit stream.
59 """
61 with open(filename, 'rb') as file:
62 return cls(file.read())
64 def read_bits(self, numbits: int) -> int:
65 """
66 Reads given number of bits from the bit stream as an unsigned integer.
68 :param numbits: Number of bits to read.
69 :returns: Read bits as an unsigned integer.
70 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
71 """
73 if numbits < 0:
74 raise PythonRuntimeException("BitStreamReader: Reading negative number of bits!")
76 return self.read_bits_unchecked(numbits)
78 def read_signed_bits(self, numbits: int) -> int:
79 """
80 Reads given number of bits from the bit stream as a signed integer.
82 :param numbits: Number of bits to read
83 :returns: Read bits as a signed integer.
84 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
85 """
87 if numbits < 0:
88 raise PythonRuntimeException("BitStreamReader: Reading negative number of bits!")
90 return self.read_signed_bits_unchecked(numbits)
92 def read_bits_unchecked(self, numbits: int) -> int:
93 """
94 Reads given number of bits from the bit stream as an unsigned integer.
96 This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
98 :param numbits: Number of bits to read.
99 :returns: Read bits as an unsigned integer.
100 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
101 """
103 end_bitposition = self._bitposition + numbits
105 if end_bitposition > self._bitsize:
106 raise PythonRuntimeException("BitStreamReader: Reading behind the stream!")
108 start_byte = self._bitposition // 8
109 end_byte = (end_bitposition - 1) // 8
111 value = int.from_bytes(self._buffer[start_byte : end_byte + 1], byteorder='big', signed=False)
113 last_bits = end_bitposition % 8
114 if last_bits != 0:
115 value >>= (8 - last_bits)
116 value &= (1 << numbits) - 1
118 self._bitposition = end_bitposition
120 return value
122 def read_signed_bits_unchecked(self, numbits: int) -> int:
123 """
124 Reads given number of bits from the bit stream as a signed integer.
126 This method does not check that numbits >= 0 and assumes that it's ensured by the caller.
128 :param numbits: Number of bits to read
129 :returns: Read bits as a signed integer.
130 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream.
131 """
133 value = self.read_bits_unchecked(numbits)
135 if numbits != 0 and (value >> (numbits - 1)) != 0:
136 # signed
137 return value - (1 << numbits)
138 else:
139 # unsigned
140 return value
142 def read_varint16(self) -> int:
143 """
144 Reads variable 16-bit signed integer value from the bit stream.
146 :returns: Variable 16-bit signed integer value.
147 """
149 byte = self.read_bits_unchecked(8) # byte 1
150 sign = byte & VARINT_SIGN_1
151 result = byte & VARINT_BYTE_1
152 if byte & VARINT_HAS_NEXT_1 == 0:
153 return -result if sign != 0 else result
155 result = (result << 8) | self.read_bits_unchecked(8) # byte 2
156 return -result if sign else result
158 def read_varint32(self) -> int:
159 """
160 Reads variable 32-bit signed integer value from the bit stream.
162 :returns: Variable 32-bit signed integer value.
163 """
165 byte = self.read_bits_unchecked(8) # byte 1
166 sign = byte & VARINT_SIGN_1
167 result = byte & VARINT_BYTE_1
168 if byte & VARINT_HAS_NEXT_1 == 0:
169 return -result if sign else result
171 byte = self.read_bits_unchecked(8) # byte 2
172 result = result << 7 | (byte & VARINT_BYTE_N)
173 if byte & VARINT_HAS_NEXT_N == 0:
174 return -result if sign else result
176 byte = self.read_bits_unchecked(8) # byte 3
177 result = result << 7 | (byte & VARINT_BYTE_N)
178 if byte & VARINT_HAS_NEXT_N == 0:
179 return -result if sign else result
181 result = result << 8 | self.read_bits_unchecked(8) # byte 4
182 return -result if sign else result
184 def read_varint64(self) -> int:
185 """
186 Reads variable 64-bit signed integer value from the bit stream.
188 :returns: Variable 64-bit signed integer value.
189 """
191 byte = self.read_bits_unchecked(8) # byte 1
192 sign = byte & VARINT_SIGN_1
193 result = byte & VARINT_BYTE_1
194 if byte & VARINT_HAS_NEXT_1 == 0:
195 return -result if sign else result
197 byte = self.read_bits_unchecked(8) # byte 2
198 result = result << 7 | (byte & VARINT_BYTE_N)
199 if byte & VARINT_HAS_NEXT_N == 0:
200 return -result if sign else result
202 byte = self.read_bits_unchecked(8) # byte 3
203 result = result << 7 | (byte & VARINT_BYTE_N)
204 if byte & VARINT_HAS_NEXT_N == 0:
205 return -result if sign else result
207 byte = self.read_bits_unchecked(8) # byte 4
208 result = result << 7 | (byte & VARINT_BYTE_N)
209 if byte & VARINT_HAS_NEXT_N == 0:
210 return -result if sign else result
212 byte = self.read_bits_unchecked(8) # byte 5
213 result = result << 7 | (byte & VARINT_BYTE_N)
214 if byte & VARINT_HAS_NEXT_N == 0:
215 return -result if sign else result
217 byte = self.read_bits_unchecked(8) # byte 6
218 result = result << 7 | (byte & VARINT_BYTE_N)
219 if byte & VARINT_HAS_NEXT_N == 0:
220 return -result if sign else result
222 byte = self.read_bits_unchecked(8) # byte 7
223 result = result << 7 | (byte & VARINT_BYTE_N)
224 if byte & VARINT_HAS_NEXT_N == 0:
225 return -result if sign else result
227 result = result << 8 | self.read_bits_unchecked(8) # byte 8
228 return -result if sign else result
230 def read_varint(self) -> int:
231 """
232 Reads variable signed integer value (up to 9 bytes) from the bit stream.
234 :returns: Variable signed integer value (up to 9 bytes).
235 """
237 byte = self.read_bits_unchecked(8) # byte 1
238 sign = byte & VARINT_SIGN_1
239 result = byte & VARINT_BYTE_1
240 if byte & VARINT_HAS_NEXT_1 == 0:
241 return (INT64_MIN if result == 0 else -result) if sign else result
243 byte = self.read_bits_unchecked(8) # byte 2
244 result = result << 7 | (byte & VARINT_BYTE_N)
245 if byte & VARINT_HAS_NEXT_N == 0:
246 return -result if sign else result
248 byte = self.read_bits_unchecked(8) # byte 3
249 result = result << 7 | (byte & VARINT_BYTE_N)
250 if byte & VARINT_HAS_NEXT_N == 0:
251 return -result if sign else result
253 byte = self.read_bits_unchecked(8) # byte 4
254 result = result << 7 | (byte & VARINT_BYTE_N)
255 if byte & VARINT_HAS_NEXT_N == 0:
256 return -result if sign else result
258 byte = self.read_bits_unchecked(8) # byte 5
259 result = result << 7 | (byte & VARINT_BYTE_N)
260 if byte & VARINT_HAS_NEXT_N == 0:
261 return -result if sign else result
263 byte = self.read_bits_unchecked(8) # byte 6
264 result = result << 7 | (byte & VARINT_BYTE_N)
265 if byte & VARINT_HAS_NEXT_N == 0:
266 return -result if sign else result
268 byte = self.read_bits_unchecked(8) # byte 7
269 result = result << 7 | (byte & VARINT_BYTE_N)
270 if byte & VARINT_HAS_NEXT_N == 0:
271 return -result if sign else result
273 byte = self.read_bits_unchecked(8) # byte 8
274 result = result << 7 | (byte & VARINT_BYTE_N)
275 if byte & VARINT_HAS_NEXT_N == 0:
276 return -result if sign else result
278 result = result << 8 | self.read_bits_unchecked(8) # byte 9
279 return -result if sign else result
281 def read_varuint16(self) -> int:
282 """
283 Reads variable 16-bit unsigned integer value from the bit stream.
285 :returns: Variable 16-bit unsigned integer value.
286 """
288 byte = self.read_bits_unchecked(8) # byte 1
289 result = byte & VARUINT_BYTE
290 if byte & VARUINT_HAS_NEXT == 0:
291 return result
293 result = result << 8 | self.read_bits_unchecked(8) # byte 2
294 return result
296 def read_varuint32(self) -> int:
297 """
298 Reads variable 32-bit unsigned integer value from the bit stream.
300 :returns: Variable 32-bit unsigned integer value.
301 """
303 byte = self.read_bits_unchecked(8) # byte 1
304 result = byte & VARUINT_BYTE
305 if byte & VARUINT_HAS_NEXT == 0:
306 return result
308 byte = self.read_bits_unchecked(8) # byte 2
309 result = result << 7 | (byte & VARUINT_BYTE)
310 if byte & VARUINT_HAS_NEXT == 0:
311 return result
313 byte = self.read_bits_unchecked(8) # byte 3
314 result = result << 7 | (byte & VARUINT_BYTE)
315 if byte & VARUINT_HAS_NEXT == 0:
316 return result
318 result = result << 8 | self.read_bits_unchecked(8) # byte 4
319 return result
321 def read_varuint64(self) -> int:
322 """
323 Reads variable 64-bit unsigned integer value from the bit stream.
325 :returns: Variable 64-bit unsigned integer value.
326 """
328 byte = self.read_bits_unchecked(8) # byte 1
329 result = byte & VARUINT_BYTE
330 if byte & VARUINT_HAS_NEXT == 0:
331 return result
333 byte = self.read_bits_unchecked(8) # byte 2
334 result = result << 7 | (byte & VARUINT_BYTE)
335 if byte & VARUINT_HAS_NEXT == 0:
336 return result
338 byte = self.read_bits_unchecked(8) # byte 3
339 result = result << 7 | (byte & VARUINT_BYTE)
340 if byte & VARUINT_HAS_NEXT == 0:
341 return result
343 byte = self.read_bits_unchecked(8) # byte 4
344 result = result << 7 | (byte & VARUINT_BYTE)
345 if byte & VARUINT_HAS_NEXT == 0:
346 return result
348 byte = self.read_bits_unchecked(8) # byte 5
349 result = result << 7 | (byte & VARUINT_BYTE)
350 if byte & VARUINT_HAS_NEXT == 0:
351 return result
353 byte = self.read_bits_unchecked(8) # byte 6
354 result = result << 7 | (byte & VARUINT_BYTE)
355 if byte & VARUINT_HAS_NEXT == 0:
356 return result
358 byte = self.read_bits_unchecked(8) # byte 7
359 result = result << 7 | (byte & VARUINT_BYTE)
360 if byte & VARUINT_HAS_NEXT == 0:
361 return result
363 result = result << 8 | self.read_bits_unchecked(8) # byte 8
364 return result
366 def read_varuint(self) -> int:
367 """
368 Reads variable unsigned integer value (up to 9 bytes) from the bit stream.
370 :returns: Variable unsigned integer value (up to 9 bytes).
371 """
373 byte = self.read_bits_unchecked(8) # byte 1
374 result = byte & VARUINT_BYTE
375 if byte & VARUINT_HAS_NEXT == 0:
376 return result
378 byte = self.read_bits_unchecked(8) # byte 2
379 result = result << 7 | (byte & VARUINT_BYTE)
380 if byte & VARUINT_HAS_NEXT == 0:
381 return result
383 byte = self.read_bits_unchecked(8) # byte 3
384 result = result << 7 | (byte & VARUINT_BYTE)
385 if byte & VARUINT_HAS_NEXT == 0:
386 return result
388 byte = self.read_bits_unchecked(8) # byte 4
389 result = result << 7 | (byte & VARUINT_BYTE)
390 if byte & VARUINT_HAS_NEXT == 0:
391 return result
393 byte = self.read_bits_unchecked(8) # byte 5
394 result = result << 7 | (byte & VARUINT_BYTE)
395 if byte & VARUINT_HAS_NEXT == 0:
396 return result
398 byte = self.read_bits_unchecked(8) # byte 6
399 result = result << 7 | (byte & VARUINT_BYTE)
400 if byte & VARUINT_HAS_NEXT == 0:
401 return result
403 byte = self.read_bits_unchecked(8) # byte 7
404 result = result << 7 | (byte & VARUINT_BYTE)
405 if byte & VARUINT_HAS_NEXT == 0:
406 return result
408 byte = self.read_bits_unchecked(8) # byte 8
409 result = result << 7 | (byte & VARUINT_BYTE)
410 if byte & VARUINT_HAS_NEXT == 0:
411 return result
413 result = result << 8 | self.read_bits_unchecked(8) # byte 9
414 return result
416 def read_varsize(self) -> int:
417 """
418 Reads variable size integer value from the bit stream.
420 :returns: Variable size integer value.
421 :raises PythonRuntimeException: If read variable size integer is out of range.
422 """
424 byte = self.read_bits_unchecked(8) # byte 1
425 result = byte & VARUINT_BYTE
426 if byte & VARUINT_HAS_NEXT == 0:
427 return result
429 byte = self.read_bits_unchecked(8) # byte 2
430 result = result << 7 | (byte & VARUINT_BYTE)
431 if byte & VARUINT_HAS_NEXT == 0:
432 return result
434 byte = self.read_bits_unchecked(8) # byte 3
435 result = result << 7 | (byte & VARUINT_BYTE)
436 if byte & VARUINT_HAS_NEXT == 0:
437 return result
439 byte = self.read_bits_unchecked(8) # byte 4
440 result = result << 7 | (byte & VARUINT_BYTE)
441 if byte & VARUINT_HAS_NEXT == 0:
442 return result
444 result = result << 8 | self.read_bits_unchecked(8) # byte 5
445 if result > VARSIZE_MAX_VALUE:
446 raise PythonRuntimeException(f"BitStreamReader: Read value '{result}' is out of range "
447 "for varsize type!")
449 return result
451 def read_float16(self) -> float:
452 """
453 Read 16-bits from the stream as a float value encoded according to IEEE 754 binary16.
455 :returns: Read float value.
456 :raises PythonRuntimeException: If the reading goes behind the stream.
457 """
459 return uint16_to_float(self.read_bits_unchecked(16))
461 def read_float32(self) -> float:
462 """
463 Read 32-bits from the stream as a float value encoded according to IEEE 754 binary32.
465 :returns: Read float value.
466 :raises PythonRuntimeException: If the reading goes behind the stream.
467 """
469 return uint32_to_float(self.read_bits_unchecked(32))
471 def read_float64(self) -> float:
472 """
473 Read 64-bits from the stream as a float value encoded according to IEEE 754 binary64.
475 :returns: Read float value.
476 :raises PythonRuntimeException: If the reading goes behind the stream.
477 """
479 return uint64_to_float(self.read_bits_unchecked(64))
481 def read_bytes(self) -> bytearray:
482 """
483 Reads bytes from the stream.
485 :returns: Read bytes.
486 :raises PythonRuntimeException: If the reading goes behind the stream.
487 """
489 length = self.read_varsize()
490 begin_bitposition = self._bitposition
492 if (begin_bitposition & 0x07) != 0:
493 # we are not aligned to byte
494 value = bytearray()
495 for _ in range(length):
496 value.append(self.read_bits_unchecked(8))
497 else:
498 # we are aligned to byte
499 self.bitposition = begin_bitposition + length * 8
500 value = bytearray(length)
501 begin_byte_position = begin_bitposition // 8
502 value[0:length] = self._buffer[begin_byte_position:begin_byte_position+length]
504 return value
506 def read_string(self) -> str:
507 """
508 Reads string from the stream.
510 :returns: Read string.
511 :raises PythonRuntimeException: If the reading goes behind the stream.
512 """
514 length = self.read_varsize()
515 begin_bitposition = self._bitposition
516 if (begin_bitposition & 0x07) != 0:
517 # we are not aligned to byte
518 value = bytearray()
519 for _ in range(length):
520 value.append(self.read_bits_unchecked(8))
521 else:
522 # we are aligned to byte
523 self.bitposition = begin_bitposition + length * 8
524 value = bytearray(length)
525 begin_byte_position = begin_bitposition // 8
526 value[0:length] = self._buffer[begin_byte_position:begin_byte_position + length]
528 return value.decode("utf-8")
530 def read_bool(self) -> bool:
531 """
532 Reads single bit as a bool value.
534 :returns: Read bool values.
535 :raises PythonRuntimeException: If the reading goes behind the stream.
536 """
538 return self.read_bits_unchecked(1) != 0
540 def read_bitbuffer(self) -> BitBuffer:
541 """
542 Reads a bit buffer from the stream.
544 :returns: Read bit buffer.
545 :raises PythonRuntimeException: If the reading goes behind the stream.
546 """
548 bitsize = self.read_varsize()
549 num_bytes_to_read = bitsize // 8
550 num_rest_bits = bitsize - num_bytes_to_read * 8
551 bytesize = (bitsize + 7) // 8
552 read_buffer = bytearray(bytesize)
553 begin_bitposition = self._bitposition
554 if (begin_bitposition & 0x07) != 0:
555 # we are not aligned to byte
556 for i in range(num_bytes_to_read):
557 read_buffer[i] = self.read_bits_unchecked(8)
558 else:
559 # we are aligned to byte
560 self.bitposition = begin_bitposition + num_bytes_to_read * 8
561 begin_byte_position = begin_bitposition // 8
562 read_buffer[0:num_bytes_to_read] = self._buffer[begin_byte_position:
563 begin_byte_position + num_bytes_to_read]
565 if num_rest_bits != 0:
566 read_buffer[num_bytes_to_read] = self.read_bits(num_rest_bits) << (8 - num_rest_bits)
568 return BitBuffer(read_buffer, bitsize)
570 @property
571 def bitposition(self) -> int:
572 """
573 Gets current bit position.
575 :returns: Current bit position.
576 """
578 return self._bitposition
580 @bitposition.setter
581 def bitposition(self, bitposition: int) -> None:
582 """
583 Sets bit position.
585 :param bitposition: New bit position.
586 :raises PythonRuntimeException: If the position is not within the stream.
587 """
589 if bitposition < 0:
590 raise PythonRuntimeException("BitStreamReader: Cannot set negative bit position!")
591 if bitposition > self._bitsize:
592 raise PythonRuntimeException("BitStreamReader: Setting bit position behind the stream!")
594 self._bitposition = bitposition
596 def alignto(self, alignment: int) -> None:
597 """
598 Aligns the bit position according to the aligning value.
600 :param alignment: An aligning value to use.
601 :raises PythonRuntimeException: If the aligning moves behind the stream."
602 """
604 offset = self._bitposition % alignment
605 if offset != 0:
606 self.bitposition = self._bitposition + alignment - offset
608 @property
609 def buffer_bitsize(self) -> int:
610 """
611 Gets size of the underlying buffer in bits.
613 :returns: Buffer bit size.
614 """
616 return self._bitsize
618VARINT_SIGN_1 = 0x80
619VARINT_BYTE_1 = 0x3f
620VARINT_BYTE_N = 0x7f
621VARINT_HAS_NEXT_1 = 0x40
622VARINT_HAS_NEXT_N = 0x80
623VARUINT_BYTE = 0x7f
624VARUINT_HAS_NEXT = 0x80
625VARSIZE_MAX_VALUE = (1 << 31) - 1
627_BitStreamReaderCpp = import_cpp_class("BitStreamReader")
628if _BitStreamReaderCpp is not None:
629 BitStreamReader = _BitStreamReaderCpp # type: ignore
631 def _bitstreamreader_fromfile(filename: str) -> 'BitStreamReader':
632 with open(filename, 'rb') as file:
633 return BitStreamReader(file.read())
634 BitStreamReader.from_file = _bitstreamreader_fromfile