Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/bitreader.py: 100%

330 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-12-13 15:12 +0000

1""" 

2The module implements abstraction for reading data to the bit stream. 

3""" 

4 

5import typing 

6 

7from zserio.bitbuffer import BitBuffer 

8from zserio.limits import INT64_MIN 

9from zserio.exception import PythonRuntimeException 

10from zserio.float import uint16_to_float, uint32_to_float, uint64_to_float 

11from zserio.cppbind import import_cpp_class 

12 

13class BitStreamReader: 

14 """ 

15 Bit stream reader. 

16 """ 

17 

18 def __init__(self, buffer: bytes, bitsize: typing.Optional[int] = None) -> None: 

19 """ 

20 Constructs bit stream reader from bytes buffer. 

21 

22 Because bit buffer size does not have to be byte aligned (divisible by 8), it's possible that not all 

23 bits of the last byte are used. In this case, only most significant bits of the corresponded size are 

24 used. 

25 

26 :param buffer: Bytes-like buffer to read as a bit stream. 

27 :param bitsize: Number of bits stored in buffer to use. 

28 :raises PythonRuntimeException: If bitsize is out of range. 

29 """ 

30 

31 if bitsize is None: 

32 bitsize = len(buffer) * 8 

33 elif len(buffer) * 8 < bitsize: 

34 raise PythonRuntimeException(f"BitStreamReader: Bit size '{bitsize}' out of range " 

35 f"for the given buffer byte size '{len(buffer)}'!") 

36 

37 self._buffer: bytes = buffer 

38 self._bitsize: int = bitsize 

39 self._bitposition: int = 0 

40 

41 @classmethod 

42 def from_bitbuffer(cls: typing.Type['BitStreamReader'], bitbuffer: BitBuffer) -> 'BitStreamReader': 

43 """ 

44 Constructs bit stream reader from bit buffer. 

45 

46 :param bitbuffer: Bit buffer to read as a bit stream. 

47 """ 

48 

49 instance = cls(bitbuffer.buffer, bitbuffer.bitsize) 

50 

51 return instance 

52 

53 @classmethod 

54 def from_file(cls: typing.Type['BitStreamReader'], filename: str) -> 'BitStreamReader': 

55 """ 

56 Constructs bit stream reader from file. 

57 

58 :param filename: Filename to read as a bit stream. 

59 """ 

60 

61 with open(filename, 'rb') as file: 

62 return cls(file.read()) 

63 

64 def read_bits(self, numbits: int) -> int: 

65 """ 

66 Reads given number of bits from the bit stream as an unsigned integer. 

67 

68 :param numbits: Number of bits to read. 

69 :returns: Read bits as an unsigned integer. 

70 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

71 """ 

72 

73 if numbits < 0: 

74 raise PythonRuntimeException("BitStreamReader: Reading negative number of bits!") 

75 

76 return self.read_bits_unchecked(numbits) 

77 

78 def read_signed_bits(self, numbits: int) -> int: 

79 """ 

80 Reads given number of bits from the bit stream as a signed integer. 

81 

82 :param numbits: Number of bits to read 

83 :returns: Read bits as a signed integer. 

84 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

85 """ 

86 

87 if numbits < 0: 

88 raise PythonRuntimeException("BitStreamReader: Reading negative number of bits!") 

89 

90 return self.read_signed_bits_unchecked(numbits) 

91 

92 def read_bits_unchecked(self, numbits: int) -> int: 

93 """ 

94 Reads given number of bits from the bit stream as an unsigned integer. 

95 

96 This method does not check that numbits >= 0 and assumes that it's ensured by the caller. 

97 

98 :param numbits: Number of bits to read. 

99 :returns: Read bits as an unsigned integer. 

100 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

101 """ 

102 

103 end_bitposition = self._bitposition + numbits 

104 

105 if end_bitposition > self._bitsize: 

106 raise PythonRuntimeException("BitStreamReader: Reading behind the stream!") 

107 

108 start_byte = self._bitposition // 8 

109 end_byte = (end_bitposition - 1) // 8 

110 

111 value = int.from_bytes(self._buffer[start_byte : end_byte + 1], byteorder='big', signed=False) 

112 

113 last_bits = end_bitposition % 8 

114 if last_bits != 0: 

115 value >>= (8 - last_bits) 

116 value &= (1 << numbits) - 1 

117 

118 self._bitposition = end_bitposition 

119 

120 return value 

121 

122 def read_signed_bits_unchecked(self, numbits: int) -> int: 

123 """ 

124 Reads given number of bits from the bit stream as a signed integer. 

125 

126 This method does not check that numbits >= 0 and assumes that it's ensured by the caller. 

127 

128 :param numbits: Number of bits to read 

129 :returns: Read bits as a signed integer. 

130 :raises PythonRuntimeException: If the numbits is invalid number of the reading goes behind the stream. 

131 """ 

132 

133 value = self.read_bits_unchecked(numbits) 

134 

135 if numbits != 0 and (value >> (numbits - 1)) != 0: 

136 # signed 

137 return value - (1 << numbits) 

138 else: 

139 # unsigned 

140 return value 

141 

142 def read_varint16(self) -> int: 

143 """ 

144 Reads variable 16-bit signed integer value from the bit stream. 

145 

146 :returns: Variable 16-bit signed integer value. 

147 """ 

148 

149 byte = self.read_bits_unchecked(8) # byte 1 

150 sign = byte & VARINT_SIGN_1 

151 result = byte & VARINT_BYTE_1 

152 if byte & VARINT_HAS_NEXT_1 == 0: 

153 return -result if sign != 0 else result 

154 

155 result = (result << 8) | self.read_bits_unchecked(8) # byte 2 

156 return -result if sign else result 

157 

158 def read_varint32(self) -> int: 

159 """ 

160 Reads variable 32-bit signed integer value from the bit stream. 

161 

162 :returns: Variable 32-bit signed integer value. 

163 """ 

164 

165 byte = self.read_bits_unchecked(8) # byte 1 

166 sign = byte & VARINT_SIGN_1 

167 result = byte & VARINT_BYTE_1 

168 if byte & VARINT_HAS_NEXT_1 == 0: 

169 return -result if sign else result 

170 

171 byte = self.read_bits_unchecked(8) # byte 2 

172 result = result << 7 | (byte & VARINT_BYTE_N) 

173 if byte & VARINT_HAS_NEXT_N == 0: 

174 return -result if sign else result 

175 

176 byte = self.read_bits_unchecked(8) # byte 3 

177 result = result << 7 | (byte & VARINT_BYTE_N) 

178 if byte & VARINT_HAS_NEXT_N == 0: 

179 return -result if sign else result 

180 

181 result = result << 8 | self.read_bits_unchecked(8) # byte 4 

182 return -result if sign else result 

183 

184 def read_varint64(self) -> int: 

185 """ 

186 Reads variable 64-bit signed integer value from the bit stream. 

187 

188 :returns: Variable 64-bit signed integer value. 

189 """ 

190 

191 byte = self.read_bits_unchecked(8) # byte 1 

192 sign = byte & VARINT_SIGN_1 

193 result = byte & VARINT_BYTE_1 

194 if byte & VARINT_HAS_NEXT_1 == 0: 

195 return -result if sign else result 

196 

197 byte = self.read_bits_unchecked(8) # byte 2 

198 result = result << 7 | (byte & VARINT_BYTE_N) 

199 if byte & VARINT_HAS_NEXT_N == 0: 

200 return -result if sign else result 

201 

202 byte = self.read_bits_unchecked(8) # byte 3 

203 result = result << 7 | (byte & VARINT_BYTE_N) 

204 if byte & VARINT_HAS_NEXT_N == 0: 

205 return -result if sign else result 

206 

207 byte = self.read_bits_unchecked(8) # byte 4 

208 result = result << 7 | (byte & VARINT_BYTE_N) 

209 if byte & VARINT_HAS_NEXT_N == 0: 

210 return -result if sign else result 

211 

212 byte = self.read_bits_unchecked(8) # byte 5 

213 result = result << 7 | (byte & VARINT_BYTE_N) 

214 if byte & VARINT_HAS_NEXT_N == 0: 

215 return -result if sign else result 

216 

217 byte = self.read_bits_unchecked(8) # byte 6 

218 result = result << 7 | (byte & VARINT_BYTE_N) 

219 if byte & VARINT_HAS_NEXT_N == 0: 

220 return -result if sign else result 

221 

222 byte = self.read_bits_unchecked(8) # byte 7 

223 result = result << 7 | (byte & VARINT_BYTE_N) 

224 if byte & VARINT_HAS_NEXT_N == 0: 

225 return -result if sign else result 

226 

227 result = result << 8 | self.read_bits_unchecked(8) # byte 8 

228 return -result if sign else result 

229 

230 def read_varint(self) -> int: 

231 """ 

232 Reads variable signed integer value (up to 9 bytes) from the bit stream. 

233 

234 :returns: Variable signed integer value (up to 9 bytes). 

235 """ 

236 

237 byte = self.read_bits_unchecked(8) # byte 1 

238 sign = byte & VARINT_SIGN_1 

239 result = byte & VARINT_BYTE_1 

240 if byte & VARINT_HAS_NEXT_1 == 0: 

241 return (INT64_MIN if result == 0 else -result) if sign else result 

242 

243 byte = self.read_bits_unchecked(8) # byte 2 

244 result = result << 7 | (byte & VARINT_BYTE_N) 

245 if byte & VARINT_HAS_NEXT_N == 0: 

246 return -result if sign else result 

247 

248 byte = self.read_bits_unchecked(8) # byte 3 

249 result = result << 7 | (byte & VARINT_BYTE_N) 

250 if byte & VARINT_HAS_NEXT_N == 0: 

251 return -result if sign else result 

252 

253 byte = self.read_bits_unchecked(8) # byte 4 

254 result = result << 7 | (byte & VARINT_BYTE_N) 

255 if byte & VARINT_HAS_NEXT_N == 0: 

256 return -result if sign else result 

257 

258 byte = self.read_bits_unchecked(8) # byte 5 

259 result = result << 7 | (byte & VARINT_BYTE_N) 

260 if byte & VARINT_HAS_NEXT_N == 0: 

261 return -result if sign else result 

262 

263 byte = self.read_bits_unchecked(8) # byte 6 

264 result = result << 7 | (byte & VARINT_BYTE_N) 

265 if byte & VARINT_HAS_NEXT_N == 0: 

266 return -result if sign else result 

267 

268 byte = self.read_bits_unchecked(8) # byte 7 

269 result = result << 7 | (byte & VARINT_BYTE_N) 

270 if byte & VARINT_HAS_NEXT_N == 0: 

271 return -result if sign else result 

272 

273 byte = self.read_bits_unchecked(8) # byte 8 

274 result = result << 7 | (byte & VARINT_BYTE_N) 

275 if byte & VARINT_HAS_NEXT_N == 0: 

276 return -result if sign else result 

277 

278 result = result << 8 | self.read_bits_unchecked(8) # byte 9 

279 return -result if sign else result 

280 

281 def read_varuint16(self) -> int: 

282 """ 

283 Reads variable 16-bit unsigned integer value from the bit stream. 

284 

285 :returns: Variable 16-bit unsigned integer value. 

286 """ 

287 

288 byte = self.read_bits_unchecked(8) # byte 1 

289 result = byte & VARUINT_BYTE 

290 if byte & VARUINT_HAS_NEXT == 0: 

291 return result 

292 

293 result = result << 8 | self.read_bits_unchecked(8) # byte 2 

294 return result 

295 

296 def read_varuint32(self) -> int: 

297 """ 

298 Reads variable 32-bit unsigned integer value from the bit stream. 

299 

300 :returns: Variable 32-bit unsigned integer value. 

301 """ 

302 

303 byte = self.read_bits_unchecked(8) # byte 1 

304 result = byte & VARUINT_BYTE 

305 if byte & VARUINT_HAS_NEXT == 0: 

306 return result 

307 

308 byte = self.read_bits_unchecked(8) # byte 2 

309 result = result << 7 | (byte & VARUINT_BYTE) 

310 if byte & VARUINT_HAS_NEXT == 0: 

311 return result 

312 

313 byte = self.read_bits_unchecked(8) # byte 3 

314 result = result << 7 | (byte & VARUINT_BYTE) 

315 if byte & VARUINT_HAS_NEXT == 0: 

316 return result 

317 

318 result = result << 8 | self.read_bits_unchecked(8) # byte 4 

319 return result 

320 

321 def read_varuint64(self) -> int: 

322 """ 

323 Reads variable 64-bit unsigned integer value from the bit stream. 

324 

325 :returns: Variable 64-bit unsigned integer value. 

326 """ 

327 

328 byte = self.read_bits_unchecked(8) # byte 1 

329 result = byte & VARUINT_BYTE 

330 if byte & VARUINT_HAS_NEXT == 0: 

331 return result 

332 

333 byte = self.read_bits_unchecked(8) # byte 2 

334 result = result << 7 | (byte & VARUINT_BYTE) 

335 if byte & VARUINT_HAS_NEXT == 0: 

336 return result 

337 

338 byte = self.read_bits_unchecked(8) # byte 3 

339 result = result << 7 | (byte & VARUINT_BYTE) 

340 if byte & VARUINT_HAS_NEXT == 0: 

341 return result 

342 

343 byte = self.read_bits_unchecked(8) # byte 4 

344 result = result << 7 | (byte & VARUINT_BYTE) 

345 if byte & VARUINT_HAS_NEXT == 0: 

346 return result 

347 

348 byte = self.read_bits_unchecked(8) # byte 5 

349 result = result << 7 | (byte & VARUINT_BYTE) 

350 if byte & VARUINT_HAS_NEXT == 0: 

351 return result 

352 

353 byte = self.read_bits_unchecked(8) # byte 6 

354 result = result << 7 | (byte & VARUINT_BYTE) 

355 if byte & VARUINT_HAS_NEXT == 0: 

356 return result 

357 

358 byte = self.read_bits_unchecked(8) # byte 7 

359 result = result << 7 | (byte & VARUINT_BYTE) 

360 if byte & VARUINT_HAS_NEXT == 0: 

361 return result 

362 

363 result = result << 8 | self.read_bits_unchecked(8) # byte 8 

364 return result 

365 

366 def read_varuint(self) -> int: 

367 """ 

368 Reads variable unsigned integer value (up to 9 bytes) from the bit stream. 

369 

370 :returns: Variable unsigned integer value (up to 9 bytes). 

371 """ 

372 

373 byte = self.read_bits_unchecked(8) # byte 1 

374 result = byte & VARUINT_BYTE 

375 if byte & VARUINT_HAS_NEXT == 0: 

376 return result 

377 

378 byte = self.read_bits_unchecked(8) # byte 2 

379 result = result << 7 | (byte & VARUINT_BYTE) 

380 if byte & VARUINT_HAS_NEXT == 0: 

381 return result 

382 

383 byte = self.read_bits_unchecked(8) # byte 3 

384 result = result << 7 | (byte & VARUINT_BYTE) 

385 if byte & VARUINT_HAS_NEXT == 0: 

386 return result 

387 

388 byte = self.read_bits_unchecked(8) # byte 4 

389 result = result << 7 | (byte & VARUINT_BYTE) 

390 if byte & VARUINT_HAS_NEXT == 0: 

391 return result 

392 

393 byte = self.read_bits_unchecked(8) # byte 5 

394 result = result << 7 | (byte & VARUINT_BYTE) 

395 if byte & VARUINT_HAS_NEXT == 0: 

396 return result 

397 

398 byte = self.read_bits_unchecked(8) # byte 6 

399 result = result << 7 | (byte & VARUINT_BYTE) 

400 if byte & VARUINT_HAS_NEXT == 0: 

401 return result 

402 

403 byte = self.read_bits_unchecked(8) # byte 7 

404 result = result << 7 | (byte & VARUINT_BYTE) 

405 if byte & VARUINT_HAS_NEXT == 0: 

406 return result 

407 

408 byte = self.read_bits_unchecked(8) # byte 8 

409 result = result << 7 | (byte & VARUINT_BYTE) 

410 if byte & VARUINT_HAS_NEXT == 0: 

411 return result 

412 

413 result = result << 8 | self.read_bits_unchecked(8) # byte 9 

414 return result 

415 

416 def read_varsize(self) -> int: 

417 """ 

418 Reads variable size integer value from the bit stream. 

419 

420 :returns: Variable size integer value. 

421 :raises PythonRuntimeException: If read variable size integer is out of range. 

422 """ 

423 

424 byte = self.read_bits_unchecked(8) # byte 1 

425 result = byte & VARUINT_BYTE 

426 if byte & VARUINT_HAS_NEXT == 0: 

427 return result 

428 

429 byte = self.read_bits_unchecked(8) # byte 2 

430 result = result << 7 | (byte & VARUINT_BYTE) 

431 if byte & VARUINT_HAS_NEXT == 0: 

432 return result 

433 

434 byte = self.read_bits_unchecked(8) # byte 3 

435 result = result << 7 | (byte & VARUINT_BYTE) 

436 if byte & VARUINT_HAS_NEXT == 0: 

437 return result 

438 

439 byte = self.read_bits_unchecked(8) # byte 4 

440 result = result << 7 | (byte & VARUINT_BYTE) 

441 if byte & VARUINT_HAS_NEXT == 0: 

442 return result 

443 

444 result = result << 8 | self.read_bits_unchecked(8) # byte 5 

445 if result > VARSIZE_MAX_VALUE: 

446 raise PythonRuntimeException(f"BitStreamReader: Read value '{result}' is out of range " 

447 "for varsize type!") 

448 

449 return result 

450 

451 def read_float16(self) -> float: 

452 """ 

453 Read 16-bits from the stream as a float value encoded according to IEEE 754 binary16. 

454 

455 :returns: Read float value. 

456 :raises PythonRuntimeException: If the reading goes behind the stream. 

457 """ 

458 

459 return uint16_to_float(self.read_bits_unchecked(16)) 

460 

461 def read_float32(self) -> float: 

462 """ 

463 Read 32-bits from the stream as a float value encoded according to IEEE 754 binary32. 

464 

465 :returns: Read float value. 

466 :raises PythonRuntimeException: If the reading goes behind the stream. 

467 """ 

468 

469 return uint32_to_float(self.read_bits_unchecked(32)) 

470 

471 def read_float64(self) -> float: 

472 """ 

473 Read 64-bits from the stream as a float value encoded according to IEEE 754 binary64. 

474 

475 :returns: Read float value. 

476 :raises PythonRuntimeException: If the reading goes behind the stream. 

477 """ 

478 

479 return uint64_to_float(self.read_bits_unchecked(64)) 

480 

481 def read_bytes(self) -> bytearray: 

482 """ 

483 Reads bytes from the stream. 

484 

485 :returns: Read bytes. 

486 :raises PythonRuntimeException: If the reading goes behind the stream. 

487 """ 

488 

489 length = self.read_varsize() 

490 begin_bitposition = self._bitposition 

491 

492 if (begin_bitposition & 0x07) != 0: 

493 # we are not aligned to byte 

494 value = bytearray() 

495 for _ in range(length): 

496 value.append(self.read_bits_unchecked(8)) 

497 else: 

498 # we are aligned to byte 

499 self.bitposition = begin_bitposition + length * 8 

500 value = bytearray(length) 

501 begin_byte_position = begin_bitposition // 8 

502 value[0:length] = self._buffer[begin_byte_position:begin_byte_position+length] 

503 

504 return value 

505 

506 def read_string(self) -> str: 

507 """ 

508 Reads string from the stream. 

509 

510 :returns: Read string. 

511 :raises PythonRuntimeException: If the reading goes behind the stream. 

512 """ 

513 

514 length = self.read_varsize() 

515 begin_bitposition = self._bitposition 

516 if (begin_bitposition & 0x07) != 0: 

517 # we are not aligned to byte 

518 value = bytearray() 

519 for _ in range(length): 

520 value.append(self.read_bits_unchecked(8)) 

521 else: 

522 # we are aligned to byte 

523 self.bitposition = begin_bitposition + length * 8 

524 value = bytearray(length) 

525 begin_byte_position = begin_bitposition // 8 

526 value[0:length] = self._buffer[begin_byte_position:begin_byte_position + length] 

527 

528 return value.decode("utf-8") 

529 

530 def read_bool(self) -> bool: 

531 """ 

532 Reads single bit as a bool value. 

533 

534 :returns: Read bool values. 

535 :raises PythonRuntimeException: If the reading goes behind the stream. 

536 """ 

537 

538 return self.read_bits_unchecked(1) != 0 

539 

540 def read_bitbuffer(self) -> BitBuffer: 

541 """ 

542 Reads a bit buffer from the stream. 

543 

544 :returns: Read bit buffer. 

545 :raises PythonRuntimeException: If the reading goes behind the stream. 

546 """ 

547 

548 bitsize = self.read_varsize() 

549 num_bytes_to_read = bitsize // 8 

550 num_rest_bits = bitsize - num_bytes_to_read * 8 

551 bytesize = (bitsize + 7) // 8 

552 read_buffer = bytearray(bytesize) 

553 begin_bitposition = self._bitposition 

554 if (begin_bitposition & 0x07) != 0: 

555 # we are not aligned to byte 

556 for i in range(num_bytes_to_read): 

557 read_buffer[i] = self.read_bits_unchecked(8) 

558 else: 

559 # we are aligned to byte 

560 self.bitposition = begin_bitposition + num_bytes_to_read * 8 

561 begin_byte_position = begin_bitposition // 8 

562 read_buffer[0:num_bytes_to_read] = self._buffer[begin_byte_position: 

563 begin_byte_position + num_bytes_to_read] 

564 

565 if num_rest_bits != 0: 

566 read_buffer[num_bytes_to_read] = self.read_bits(num_rest_bits) << (8 - num_rest_bits) 

567 

568 return BitBuffer(read_buffer, bitsize) 

569 

570 @property 

571 def bitposition(self) -> int: 

572 """ 

573 Gets current bit position. 

574 

575 :returns: Current bit position. 

576 """ 

577 

578 return self._bitposition 

579 

580 @bitposition.setter 

581 def bitposition(self, bitposition: int) -> None: 

582 """ 

583 Sets bit position. 

584 

585 :param bitposition: New bit position. 

586 :raises PythonRuntimeException: If the position is not within the stream. 

587 """ 

588 

589 if bitposition < 0: 

590 raise PythonRuntimeException("BitStreamReader: Cannot set negative bit position!") 

591 if bitposition > self._bitsize: 

592 raise PythonRuntimeException("BitStreamReader: Setting bit position behind the stream!") 

593 

594 self._bitposition = bitposition 

595 

596 def alignto(self, alignment: int) -> None: 

597 """ 

598 Aligns the bit position according to the aligning value. 

599 

600 :param alignment: An aligning value to use. 

601 :raises PythonRuntimeException: If the aligning moves behind the stream." 

602 """ 

603 

604 offset = self._bitposition % alignment 

605 if offset != 0: 

606 self.bitposition = self._bitposition + alignment - offset 

607 

608 @property 

609 def buffer_bitsize(self) -> int: 

610 """ 

611 Gets size of the underlying buffer in bits. 

612 

613 :returns: Buffer bit size. 

614 """ 

615 

616 return self._bitsize 

617 

618VARINT_SIGN_1 = 0x80 

619VARINT_BYTE_1 = 0x3f 

620VARINT_BYTE_N = 0x7f 

621VARINT_HAS_NEXT_1 = 0x40 

622VARINT_HAS_NEXT_N = 0x80 

623VARUINT_BYTE = 0x7f 

624VARUINT_HAS_NEXT = 0x80 

625VARSIZE_MAX_VALUE = (1 << 31) - 1 

626 

627_BitStreamReaderCpp = import_cpp_class("BitStreamReader") 

628if _BitStreamReaderCpp is not None: 

629 BitStreamReader = _BitStreamReaderCpp # type: ignore 

630 

631 def _bitstreamreader_fromfile(filename: str) -> 'BitStreamReader': 

632 with open(filename, 'rb') as file: 

633 return BitStreamReader(file.read()) 

634 BitStreamReader.from_file = _bitstreamreader_fromfile