Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/tests/test_bitstream.py: 100%

156 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-12-13 15:12 +0000

1import unittest 

2 

3from zserio.bitbuffer import BitBuffer 

4from zserio.bitreader import BitStreamReader 

5from zserio.bitsizeof import INT64_MIN 

6from zserio.bitwriter import BitStreamWriter 

7 

8class BitStreamTest(unittest.TestCase): 

9 

10 def test_bits(self): 

11 for numbits in range(1, 65): 

12 max_value = (1 << numbits) - 1 

13 values = [ 

14 max_value, 

15 max_value >> 1, 

16 max_value >> 2, 

17 1, 

18 0, 

19 1, 

20 max_value >> 2, 

21 max_value >> 1, 

22 max_value 

23 ] 

24 self._test_bits_impl(BitStreamWriter.write_bits, BitStreamReader.read_bits, values, numbits) 

25 

26 def test_signed_bits(self): 

27 for numbits in range(1, 65): 

28 min_value = -1 << (numbits - 1) 

29 max_value = (1 << (numbits - 1)) - 1 

30 values = [ 

31 min_value, 

32 max_value, 

33 min_value >> 1, 

34 max_value >> 1, 

35 min_value >> 2, 

36 max_value >> 2, 

37 - 1, 

38 (1 if numbits != 1 else -1), 

39 0, 

40 (1 if numbits != 1 else -1), 

41 - 1, 

42 max_value >> 2, 

43 min_value >> 2, 

44 max_value >> 1, 

45 min_value >> 1, 

46 max_value, 

47 min_value 

48 ] 

49 self._test_bits_impl(BitStreamWriter.write_signed_bits, BitStreamReader.read_signed_bits, values, 

50 numbits) 

51 

52 def test_varint16(self): 

53 values = [ 

54 # 1 byte 

55 0, 

56 - 1, 

57 + 1, 

58 - ((1 << (6)) - 1), 

59 + ((1 << (6)) - 1), 

60 # 2 bytes 

61 - ((1 << (6))), 

62 + ((1 << (6))), 

63 - ((1 << (6 + 8)) - 1), 

64 + ((1 << (6 + 8)) - 1), 

65 ] 

66 

67 self._test_impl(BitStreamWriter.write_varint16, BitStreamReader.read_varint16, values, 15) 

68 

69 def test_varint32(self): 

70 values = [ 

71 # 1 byte 

72 0, 

73 - ((1)), 

74 + ((1)), 

75 - ((1 << (6)) - 1), 

76 + ((1 << (6)) - 1), 

77 # 2 bytes 

78 - ((1 << (6))), 

79 + ((1 << (6))), 

80 - ((1 << (6 + 7)) - 1), 

81 + ((1 << (6 + 7)) - 1), 

82 # 3 bytes 

83 - ((1 << (6 + 7))), 

84 + ((1 << (6 + 7))), 

85 - ((1 << (6 + 7 + 7)) - 1), 

86 + ((1 << (6 + 7 + 7)) - 1), 

87 # 4 bytes 

88 - ((1 << (6 + 7 + 7))), 

89 + ((1 << (6 + 7 + 7))), 

90 - ((1 << (6 + 7 + 7 + 8)) - 1), 

91 + ((1 << (6 + 7 + 7 + 8)) - 1) 

92 ] 

93 

94 self._test_impl(BitStreamWriter.write_varint32, BitStreamReader.read_varint32, values, 31) 

95 

96 def test_varint64(self): 

97 values = [ 

98 # 1 byte 

99 0, 

100 - ((1)), 

101 + ((1)), 

102 - ((1 << (6)) - 1), 

103 + ((1 << (6)) - 1), 

104 # 2 bytes 

105 - ((1 << (6))), 

106 + ((1 << (6))), 

107 - ((1 << (6 + 7)) - 1), 

108 + ((1 << (6 + 7)) - 1), 

109 # 3 bytes 

110 - ((1 << (6 + 7))), 

111 + ((1 << (6 + 7))), 

112 - ((1 << (6 + 7 + 7)) - 1), 

113 + ((1 << (6 + 7 + 7)) - 1), 

114 # 4 bytes 

115 - ((1 << (6 + 7 + 7))), 

116 + ((1 << (6 + 7 + 7))), 

117 - ((1 << (6 + 7 + 7 + 8)) - 1), 

118 + ((1 << (6 + 7 + 7 + 8)) - 1) 

119 # 5 bytes 

120 - ((1 << (6 + 7 + 7 + 7))), 

121 + ((1 << (6 + 7 + 7 + 7))), 

122 - ((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

123 + ((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

124 # 6 bytes 

125 - ((1 << (6 + 7 + 7 + 7 + 7))), 

126 + ((1 << (6 + 7 + 7 + 7 + 7))), 

127 - ((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

128 + ((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

129 # 7 bytes 

130 - ((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

131 + ((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

132 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

133 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

134 # 8 bytes 

135 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

136 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

137 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

138 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

139 ] 

140 

141 self._test_impl(BitStreamWriter.write_varint64, BitStreamReader.read_varint64, values, 63) 

142 

143 def test_varint(self): 

144 values = [ 

145 # 1 byte 

146 0, 

147 - ((1)), 

148 + ((1)), 

149 - ((1 << (6)) - 1), 

150 + ((1 << (6)) - 1), 

151 # 2 bytes 

152 - ((1 << (6))), 

153 + ((1 << (6))), 

154 - ((1 << (6 + 7)) - 1), 

155 + ((1 << (6 + 7)) - 1), 

156 # 3 bytes 

157 - ((1 << (6 + 7))), 

158 + ((1 << (6 + 7))), 

159 - ((1 << (6 + 7 + 7)) - 1), 

160 + ((1 << (6 + 7 + 7)) - 1), 

161 # 4 bytes 

162 - ((1 << (6 + 7 + 7))), 

163 + ((1 << (6 + 7 + 7))), 

164 - ((1 << (6 + 7 + 7 + 8)) - 1), 

165 + ((1 << (6 + 7 + 7 + 8)) - 1) 

166 # 5 bytes 

167 - ((1 << (6 + 7 + 7 + 7))), 

168 + ((1 << (6 + 7 + 7 + 7))), 

169 - ((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

170 + ((1 << (6 + 7 + 7 + 7 + 7)) - 1), 

171 # 6 bytes 

172 - ((1 << (6 + 7 + 7 + 7 + 7))), 

173 + ((1 << (6 + 7 + 7 + 7 + 7))), 

174 - ((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

175 + ((1 << (6 + 7 + 7 + 7 + 7 + 7)) - 1), 

176 # 7 bytes 

177 - ((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

178 + ((1 << (6 + 7 + 7 + 7 + 7 + 7))), 

179 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

180 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

181 # 8 bytes 

182 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

183 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7))), 

184 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

185 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

186 # 9 bytes 

187 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

188 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

189 - ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

190 + ((1 << (6 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

191 # 1 byte 

192 INT64_MIN # special case, stored as -0 

193 ] 

194 

195 self._test_impl(BitStreamWriter.write_varint, BitStreamReader.read_varint, values, 71) 

196 

197 def test_varuint16(self): 

198 values = [ 

199 # 1 byte 

200 0, 

201 1, 

202 ((1 << (7)) - 1), 

203 # 2 bytes 

204 ((1 << (7))), 

205 ((1 << (7 + 8)) - 1), 

206 ] 

207 

208 self._test_impl(BitStreamWriter.write_varuint16, BitStreamReader.read_varuint16, values, 15) 

209 

210 def test_varuint32(self): 

211 values = [ 

212 # 1 byte 

213 ((0)), 

214 ((1)), 

215 ((1 << (7)) - 1), 

216 # 2 bytes 

217 ((1 << (7))), 

218 ((1 << (7 + 7)) - 1), 

219 # 3 bytes 

220 ((1 << (7 + 7))), 

221 ((1 << (7 + 7 + 7)) - 1), 

222 # 4 bytes 

223 ((1 << (7 + 7 + 7))), 

224 ((1 << (7 + 7 + 7 + 8)) - 1) 

225 ] 

226 

227 self._test_impl(BitStreamWriter.write_varuint32, BitStreamReader.read_varuint32, values, 31) 

228 

229 def test_varuint64(self): 

230 values = [ 

231 # 1 byte 

232 ((0)), 

233 ((1)), 

234 ((1 << (7)) - 1), 

235 # 2 bytes 

236 ((1 << (7))), 

237 ((1 << (7 + 7)) - 1), 

238 # 3 bytes 

239 ((1 << (7 + 7))), 

240 ((1 << (7 + 7 + 7)) - 1), 

241 # 4 bytes 

242 ((1 << (7 + 7 + 7))), 

243 ((1 << (7 + 7 + 7 + 8)) - 1), 

244 # 5 bytes 

245 ((1 << (7 + 7 + 7 + 7))), 

246 ((1 << (7 + 7 + 7 + 7 + 7)) - 1), 

247 # 6 bytes 

248 ((1 << (7 + 7 + 7 + 7 + 7))), 

249 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1), 

250 # 7 bytes 

251 ((1 << (7 + 7 + 7 + 7 + 7 + 7))), 

252 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

253 # 8 bytes 

254 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))), 

255 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

256 ] 

257 

258 self._test_impl(BitStreamWriter.write_varuint64, BitStreamReader.read_varuint64, values, 63) 

259 

260 def test_varuint(self): 

261 values = [ 

262 # 1 byte 

263 ((0)), 

264 ((1)), 

265 ((1 << (7)) - 1), 

266 # 2 bytes 

267 ((1 << (7))), 

268 ((1 << (7 + 7)) - 1), 

269 # 3 bytes 

270 ((1 << (7 + 7))), 

271 ((1 << (7 + 7 + 7)) - 1), 

272 # 4 bytes 

273 ((1 << (7 + 7 + 7))), 

274 ((1 << (7 + 7 + 7 + 8)) - 1), 

275 # 5 bytes 

276 ((1 << (7 + 7 + 7 + 7))), 

277 ((1 << (7 + 7 + 7 + 7 + 7)) - 1), 

278 # 6 bytes 

279 ((1 << (7 + 7 + 7 + 7 + 7))), 

280 ((1 << (7 + 7 + 7 + 7 + 7 + 7)) - 1), 

281 # 7 bytes 

282 ((1 << (7 + 7 + 7 + 7 + 7 + 7))), 

283 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7)) - 1), 

284 # 8 bytes 

285 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7))), 

286 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

287 # 9 bytes 

288 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7))), 

289 ((1 << (7 + 7 + 7 + 7 + 7 + 7 + 7 + 7 + 8)) - 1), 

290 ] 

291 

292 self._test_impl(BitStreamWriter.write_varuint, BitStreamReader.read_varuint, values, 71) 

293 

294 def test_varsize(self): 

295 values = [ 

296 # 1 byte 

297 ((0)), 

298 ((1)), 

299 ((1 << (7)) - 1), 

300 # 2 bytes 

301 ((1 << (7))), 

302 ((1 << (7 + 7)) - 1), 

303 # 3 bytes 

304 ((1 << (7 + 7))), 

305 ((1 << (7 + 7 + 7)) - 1), 

306 # 4 bytes 

307 ((1 << (7 + 7 + 7))), 

308 ((1 << (7 + 7 + 7 + 7)) - 1), 

309 # 5 bytes 

310 ((1 << (7 + 7 + 7 + 7))), 

311 ((1 << (2 + 7 + 7 + 7 + 8)) - 1) 

312 ] 

313 

314 self._test_impl(BitStreamWriter.write_varsize, BitStreamReader.read_varsize, values, 39) 

315 

316 def test_float16(self): 

317 values = [ 

318 - 42.5, 

319 - 2.0, 

320 0.0, 

321 0.6171875, 

322 0.875, 

323 2.0, 

324 9.875, 

325 42.5 

326 ] 

327 

328 self._test_impl(BitStreamWriter.write_float16, BitStreamReader.read_float16, values, 15) 

329 

330 def test_float32(self): 

331 values = [ 

332 - 42.5, 

333 - 2.0, 

334 0.0, 

335 0.6171875, 

336 0.875, 

337 2.0, 

338 9.875, 

339 42.5 

340 ] 

341 

342 self._test_impl(BitStreamWriter.write_float32, BitStreamReader.read_float32, values, 31) 

343 

344 def test_float64(self): 

345 values = [ 

346 - 42.5, 

347 - 2.0, 

348 0.0, 

349 0.6171875, 

350 0.875, 

351 2.0, 

352 9.875, 

353 42.5 

354 ] 

355 

356 self._test_impl(BitStreamWriter.write_float64, BitStreamReader.read_float64, values, 63) 

357 

358 def test_string(self): 

359 values = [ 

360 "Hello World", 

361 "\n\t%^@(*aAzZ01234569$%^!?<>[]](){}-=+~:;/|\\\"\'Hello World2\0nonWrittenPart", 

362 b"Price: \xE2\x82\xAC 3 what's this? -> \xC2\xA2".decode("utf-8") 

363 ] 

364 

365 self._test_impl(BitStreamWriter.write_string, BitStreamReader.read_string, values, 7) 

366 

367 def test_bool(self): 

368 values = [ 

369 False, 

370 True, 

371 True, 

372 False, 

373 False, 

374 True, 

375 False, 

376 True, 

377 False, 

378 False, 

379 True, 

380 True, 

381 False 

382 ] 

383 

384 self._test_impl(BitStreamWriter.write_bool, BitStreamReader.read_bool, values, 1) 

385 

386 def test_bitbuffer(self): 

387 values = [ 

388 BitBuffer(bytes([0xAB, 0x07]), 11), 

389 BitBuffer(bytes([0xAB, 0xCD, 0x7F]), 23) 

390 ] 

391 

392 self._test_impl(BitStreamWriter.write_bitbuffer, BitStreamReader.read_bitbuffer, values, 7) 

393 

394 def test_bytes(self): 

395 values = [ 

396 bytearray([0, 255]), 

397 bytearray([1, 127, 128, 254]) 

398 ] 

399 

400 self._test_impl(BitStreamWriter.write_bytes, BitStreamReader.read_bytes, values, 7) 

401 

402 def test_bitposition(self): 

403 writer = BitStreamWriter() 

404 writer.write_bits(0xaaaa, 16) 

405 self.assertEqual(16, writer.bitposition) 

406 writer.write_bits(0xff, 8) 

407 self.assertEqual(24, writer.bitposition) 

408 

409 reader = BitStreamReader(buffer=writer.byte_array) 

410 self.assertEqual(0xaaaa, reader.read_bits(16)) 

411 self.assertEqual(16, reader.bitposition) 

412 reader.bitposition = 8 

413 self.assertEqual(8, reader.bitposition) 

414 self.assertEqual(0xaaff, reader.read_bits(16)) 

415 reader.bitposition = 13 

416 self.assertEqual(13, reader.bitposition) 

417 self.assertEqual(0x02, reader.read_bits(3)) 

418 self.assertEqual(16, reader.bitposition) 

419 self.assertEqual(0xff, reader.read_bits(8)) 

420 self.assertEqual(24, reader.bitposition) 

421 reader.bitposition = 0 

422 self.assertEqual(0, reader.bitposition) 

423 self.assertEqual(0xaaaaff, reader.read_bits(24)) 

424 

425 def test_alignto(self): 

426 writer = BitStreamWriter() 

427 writer.write_bits(5, 3) 

428 writer.alignto(8) 

429 self.assertEqual(8, writer.bitposition) 

430 writer.write_bits(0, 1) 

431 writer.alignto(16) 

432 self.assertEqual(16, writer.bitposition) 

433 writer.write_bits(0xaa, 9) 

434 writer.alignto(32) 

435 self.assertEqual(32, writer.bitposition) 

436 writer.write_bits(0xaca, 13) 

437 writer.alignto(64) 

438 self.assertEqual(64, writer.bitposition) 

439 writer.write_bits(0xcafe, 16) 

440 

441 reader = BitStreamReader(buffer=writer.byte_array) 

442 self.assertEqual(5, reader.read_bits(3)) 

443 reader.alignto(8) 

444 self.assertEqual(8, reader.bitposition) 

445 self.assertEqual(0, reader.read_bits(1)) 

446 reader.alignto(16) 

447 self.assertEqual(16, reader.bitposition) 

448 self.assertEqual(0xaa, reader.read_bits(9)) 

449 reader.alignto(32) 

450 self.assertEqual(32, reader.bitposition) 

451 self.assertEqual(0xaca, reader.read_bits(13)) 

452 reader.alignto(64) 

453 self.assertEqual(64, reader.bitposition) 

454 self.assertEqual(0xcafe, reader.read_bits(16)) 

455 

456 def test_file(self): 

457 test_filename = "BitStreamTest.bin" 

458 writer = BitStreamWriter() 

459 writer.write_bits(13, 7) 

460 writer.write_string(test_filename) 

461 writer.write_varint(-123456) 

462 writer.to_file(test_filename) 

463 

464 reader = BitStreamReader.from_file(test_filename) 

465 self.assertEqual(13, reader.read_bits(7)) 

466 self.assertEqual(test_filename, reader.read_string()) 

467 self.assertEqual(-123456, reader.read_varint()) 

468 

469 def _test_bits_impl(self, write_method, read_method, values, numbits): 

470 for bit_pos in range(numbits): 

471 writer = BitStreamWriter() 

472 if bit_pos > 0: 

473 writer.write_bits(0, bit_pos) 

474 for value in values: 

475 write_method(writer, value, numbits) 

476 

477 reader = BitStreamReader(buffer=writer.byte_array) 

478 if bit_pos > 0: 

479 reader.read_bits(bit_pos) 

480 for value in values: 

481 self.assertEqual(value, read_method(reader, numbits), 

482 f"[numbits={numbits}, bit_pos={bit_pos}]") 

483 

484 def _test_impl(self, write_method, read_method, values, max_start_bit_pos): 

485 for bit_pos in range(max_start_bit_pos): 

486 writer = BitStreamWriter() 

487 if bit_pos > 64: 

488 writer.write_bits(0, 64) 

489 writer.write_bits(0, bit_pos - 64) 

490 elif bit_pos > 0: 

491 writer.write_bits(0, bit_pos) 

492 for value in values: 

493 write_method(writer, value) 

494 

495 reader = BitStreamReader(buffer=writer.byte_array) 

496 if bit_pos > 64: 

497 reader.read_bits(64) 

498 reader.read_bits(bit_pos - 64) 

499 elif bit_pos > 0: 

500 reader.read_bits(bit_pos) 

501 for value in values: 

502 self.assertEqual(value, read_method(reader), f"[bit_pos={bit_pos}]")