Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/bitwriter.py: 100%

142 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-12-13 15:12 +0000

1""" 

2The module implements abstraction for writing data to the bit stream. 

3""" 

4 

5from zserio.bitbuffer import BitBuffer 

6from zserio.bitsizeof import (bitsizeof_varint16, bitsizeof_varint32, 

7 bitsizeof_varint64, bitsizeof_varint, 

8 bitsizeof_varuint16, bitsizeof_varuint32, 

9 bitsizeof_varuint64, bitsizeof_varuint, 

10 bitsizeof_varsize) 

11from zserio.exception import PythonRuntimeException 

12from zserio.float import float_to_uint16, float_to_uint32, float_to_uint64 

13from zserio.limits import INT64_MIN 

14from zserio.cppbind import import_cpp_class 

15 

16class BitStreamWriter: 

17 """ 

18 Bit stream writer using bytearray. 

19 """ 

20 

21 def __init__(self) -> None: 

22 """ 

23 Constructor. 

24 """ 

25 

26 self._byte_array: bytearray = bytearray() 

27 self._bitposition: int = 0 

28 

29 def write_bits(self, value: int, numbits: int) -> None: 

30 """ 

31 Writes the given value with the given number of bits to the underlying storage. 

32 

33 :param value: Value to write. 

34 :param numbits: Number of bits to write. 

35 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

36 """ 

37 

38 if numbits <= 0: 

39 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' is less than 1!") 

40 

41 self.write_bits_unchecked(value, numbits) 

42 

43 def write_signed_bits(self, value: int, numbits: int) -> None: 

44 """ 

45 Writes the given signed value with the given number of bits to the underlying storage. 

46 Provided for convenience. 

47 

48 :param value: Signed value to write. 

49 :param numbits: Number of bits to write. 

50 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

51 """ 

52 

53 if numbits <= 0: 

54 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' is less than 1!") 

55 

56 self.write_signed_bits_unchecked(value, numbits) 

57 

58 def write_bits_unchecked(self, value: int, numbits: int) -> None: 

59 """ 

60 Writes the given value with the given number of bits to the underlying storage. 

61 

62 This method does not check that numbits > 0 and assumes that it's ensured by the caller. 

63 

64 :param value: Value to write. 

65 :param numbits: Number of bits to write. 

66 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

67 """ 

68 

69 min_value = 0 

70 max_value = (1 << numbits) - 1 

71 if value < min_value or value > max_value: 

72 raise PythonRuntimeException(f"BitStreamWriter: Value '{value}' is out of the range " 

73 f"<{min_value},{max_value}>!") 

74 

75 self._write_bits(value, numbits, signed=False) 

76 

77 def write_signed_bits_unchecked(self, value: int, numbits: int) -> None: 

78 """ 

79 Writes the given signed value with the given number of bits to the underlying storage. 

80 Provided for convenience. 

81 

82 This method does not check that numbits > 0 and assumes that it's ensured by the caller. 

83 

84 :param value: Signed value to write. 

85 :param numbits: Number of bits to write. 

86 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid. 

87 """ 

88 

89 min_value = -(1 << (numbits - 1)) 

90 max_value = (1 << (numbits - 1)) - 1 

91 if value < min_value or value > max_value: 

92 raise PythonRuntimeException(f"BitStreamWriter: Value '{value}' is out of the range " 

93 f"<{min_value},{max_value}>!") 

94 

95 self._write_bits(value, numbits, signed=True) 

96 

97 def write_varint16(self, value: int) -> None: 

98 """ 

99 Writes a variable 16-bit signed integer value to the underlying storage. 

100 

101 :param value: Value to write. 

102 :raises PythonRuntimeException: If the value is out of the range. 

103 """ 

104 

105 self._write_varnum(value, 2, bitsizeof_varint16(value) // 8, is_signed=True) 

106 

107 def write_varint32(self, value: int) -> None: 

108 """ 

109 Writes a variable 32-bit signed integer value to the underlying storage. 

110 

111 :param value: Value to write. 

112 :raises PythonRuntimeException: If the value is out of the range. 

113 """ 

114 

115 self._write_varnum(value, 4, bitsizeof_varint32(value) // 8, is_signed=True) 

116 

117 def write_varint64(self, value: int) -> None: 

118 """ 

119 Writes a variable 16-bit signed integer value to the underlying storage. 

120 

121 :param value: Value to write. 

122 :raises PythonRuntimeException: If the value is out of the range. 

123 """ 

124 

125 self._write_varnum(value, 8, bitsizeof_varint64(value) // 8, is_signed=True) 

126 

127 def write_varint(self, value: int) -> None: 

128 """ 

129 Writes a variable signed integer value (up to 9 bytes) to the underlying storage. 

130 

131 :param value: Value to write. 

132 :raises PythonRuntimeException: If the value is out of the range. 

133 """ 

134 

135 if value == INT64_MIN: 

136 self._write_bits(0x80, 8) # INT64_MIN is stored as -0 

137 else: 

138 self._write_varnum(value, 9, bitsizeof_varint(value) // 8, is_signed=True) 

139 

140 def write_varuint16(self, value: int) -> None: 

141 """ 

142 Writes a variable 16-bit unsigned integer value to the underlying storage. 

143 

144 :param value: Value to write. 

145 :raises PythonRuntimeException: If the value is out of the range. 

146 """ 

147 

148 self._write_varnum(value, 2, bitsizeof_varuint16(value) // 8, is_signed=False) 

149 

150 def write_varuint32(self, value: int) -> None: 

151 """ 

152 Writes a variable 32-bit unsigned integer value to the underlying storage. 

153 

154 :param value: Value to write. 

155 :raises PythonRuntimeException: If the value is out of the range. 

156 """ 

157 

158 self._write_varnum(value, 4, bitsizeof_varuint32(value) // 8, is_signed=False) 

159 

160 def write_varuint64(self, value: int) -> None: 

161 """ 

162 Writes a variable 16-bit unsigned integer value to the underlying storage. 

163 

164 :param value: Value to write. 

165 :raises PythonRuntimeException: If the value is out of the range. 

166 """ 

167 

168 self._write_varnum(value, 8, bitsizeof_varuint64(value) // 8, is_signed=False) 

169 

170 def write_varuint(self, value: int) -> None: 

171 """ 

172 Writes a variable unsigned integer value (up to 9 bytes) to the underlying storage. 

173 

174 :param value: Value to write. 

175 :raises PythonRuntimeException: If the value is out of the range. 

176 """ 

177 

178 self._write_varnum(value, 9, bitsizeof_varuint(value) // 8, is_signed=False) 

179 

180 def write_varsize(self, value: int) -> None: 

181 """ 

182 Writes a variable size integer value to the underlying storage. 

183 

184 :param value: Value to write. 

185 :raises PythonRuntimeException: If the value is out of the range. 

186 """ 

187 

188 self._write_varnum(value, 5, bitsizeof_varsize(value) // 8, is_signed=False) 

189 

190 def write_float16(self, value: float) -> None: 

191 """ 

192 Writes a 16-bit float value to the underlying storage according to IEEE 754 binary16. 

193 

194 :param value: Float value to write. 

195 """ 

196 

197 self.write_bits_unchecked(float_to_uint16(value), 16) 

198 

199 def write_float32(self, value: float) -> None: 

200 """ 

201 Writes a 32-bit float value to the underlying storage according to IEEE 754 binary32. 

202 

203 :param value: Float value to write. 

204 """ 

205 

206 self.write_bits_unchecked(float_to_uint32(value), 32) 

207 

208 def write_float64(self, value: float) -> None: 

209 """ 

210 Writes a 64-bit float value to the underlying storage according to IEEE 754 binary64. 

211 

212 :param value: Float value to write. 

213 """ 

214 

215 self.write_bits_unchecked(float_to_uint64(value), 64) 

216 

217 def write_bytes(self, value: bytearray): 

218 """ 

219 Writes the given bytes to the underlying storage. Length of the bytes is written 

220 as varsize at the beginning. 

221 

222 :param value: Bytes to write. 

223 """ 

224 

225 length = len(value) 

226 self.write_varsize(length) 

227 

228 begin_bitposition = self._bitposition 

229 if (begin_bitposition & 0x07) != 0: 

230 # we are not aligned to byte 

231 for byte in value: 

232 self.write_bits_unchecked(byte, 8) 

233 else: 

234 # we are aligned to byte 

235 self._bitposition += length * 8 

236 self._byte_array += value[0:length] 

237 

238 def write_string(self, string: str) -> None: 

239 """ 

240 Writes the given string to the underlying storage in UTF-8 encoding. Length of the string is written 

241 as varsize at the beginning. 

242 

243 :param string: String to write. 

244 """ 

245 

246 string_bytes = string.encode("utf-8") 

247 length = len(string_bytes) 

248 self.write_varsize(length) 

249 

250 begin_bitposition = self._bitposition 

251 if (begin_bitposition & 0x07) != 0: 

252 # we are not aligned to byte 

253 for string_byte in string_bytes: 

254 self.write_bits_unchecked(string_byte, 8) 

255 else: 

256 # we are aligned to byte 

257 self._bitposition += length * 8 

258 self._byte_array += string_bytes[0:length] 

259 

260 def write_bool(self, value: bool) -> None: 

261 """ 

262 Writes bool in a single bit. 

263 

264 :param value: Bool value to write. 

265 """ 

266 

267 self._write_bits(1 if value else 0, 1) 

268 

269 def write_bitbuffer(self, bitbuffer: BitBuffer) -> None: 

270 """ 

271 Writes a bit buffer to the underlying storage. Length of the bit buffer is written as varsize 

272 at the beginning. 

273 

274 :param bitbuffer: Bit buffer to write. 

275 """ 

276 

277 bitsize = bitbuffer.bitsize 

278 self.write_varsize(bitsize) 

279 

280 write_buffer = bitbuffer.buffer 

281 num_bytes_to_write = bitsize // 8 

282 num_rest_bits = bitsize - num_bytes_to_write * 8 

283 begin_bitposition = self._bitposition 

284 if (begin_bitposition & 0x07) != 0: 

285 # we are not aligned to byte 

286 for i in range(num_bytes_to_write): 

287 self.write_bits_unchecked(write_buffer[i], 8) 

288 else: 

289 # we are aligned to byte 

290 self._bitposition += num_bytes_to_write * 8 

291 self._byte_array += write_buffer[0:num_bytes_to_write] 

292 

293 if num_rest_bits > 0: 

294 self.write_bits_unchecked(write_buffer[num_bytes_to_write] >> (8 - num_rest_bits), num_rest_bits) 

295 

296 @property 

297 def byte_array(self) -> bytes: 

298 """ 

299 Gets internal bytearray. 

300 

301 :returns: Underlying bytearray object. 

302 """ 

303 

304 return self._byte_array 

305 

306 def to_file(self, filename: str) -> None: 

307 """ 

308 Writes underlying bytearray to binary file. 

309 

310 :param filename: File to write. 

311 """ 

312 

313 with open(filename, "wb") as file: 

314 file.write(self._byte_array) 

315 

316 @property 

317 def bitposition(self) -> int: 

318 """ 

319 Gets current bit position. 

320 

321 :returns: Current bit position. 

322 """ 

323 

324 return self._bitposition 

325 

326 def alignto(self, alignment: int) -> None: 

327 """ 

328 Aligns the bit position according to the aligning value. 

329 

330 :param alignment: An aligning value to use. 

331 """ 

332 

333 offset = self._bitposition % alignment 

334 if offset != 0: 

335 self._write_bits(0, alignment - offset) 

336 

337 def _write_bits(self, value: int, numbits: int, *, signed: bool = False) -> None: 

338 buffer_last_byte_bits = self._bitposition % 8 

339 buffer_free_bits = (8 - buffer_last_byte_bits) if buffer_last_byte_bits != 0 else 0 

340 value_first_byte_bits = numbits % 8 or 8 

341 if value_first_byte_bits <= buffer_free_bits: 

342 left_shift = buffer_free_bits - value_first_byte_bits 

343 else: 

344 left_shift = buffer_free_bits + 8 - value_first_byte_bits 

345 value <<= left_shift 

346 num_bytes = (numbits + left_shift + 7) // 8 

347 value_bytes = value.to_bytes(num_bytes, byteorder='big', signed=signed) 

348 if buffer_free_bits == 0: 

349 self._byte_array.extend(value_bytes) 

350 else: 

351 value_first_byte = value_bytes[0] & ((1 << buffer_free_bits) - 1) 

352 self._byte_array[-1] |= value_first_byte 

353 self._byte_array.extend(value_bytes[1:]) 

354 

355 self._bitposition += numbits 

356 

357 def _write_varnum(self, value: int, max_var_bytes: int, num_var_bytes: int, *, is_signed: bool) -> None: 

358 abs_value = abs(value) 

359 has_max_byte_range = num_var_bytes == max_var_bytes 

360 for i in range(num_var_bytes): 

361 byte = 0x00 

362 numbits = 8 

363 has_next_byte = i < num_var_bytes - 1 

364 has_sign_bit = (is_signed and i == 0) 

365 if has_sign_bit: 

366 if value < 0: 

367 byte |= 0x80 

368 numbits -= 1 

369 if has_next_byte: 

370 numbits -= 1 

371 byte |= (1 << numbits) # use bit 6 if signed bit is present, use bit 7 otherwise 

372 else: # this is the last byte 

373 if not has_max_byte_range: # next byte flag isn't used in last byte in case of max byte range 

374 numbits -= 1 

375 

376 shift_bits = (num_var_bytes - (i + 1)) * 7 + (1 if has_max_byte_range and has_next_byte else 0) 

377 byte |= (abs_value >> shift_bits) & VAR_NUM_BIT_MASKS[numbits - 1] 

378 self.write_bits_unchecked(byte, 8) 

379 

380VAR_NUM_BIT_MASKS = [0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff] 

381 

382BitStreamWriter = import_cpp_class("BitStreamWriter") or BitStreamWriter # type: ignore