Coverage for /home/jenkins/workspace/NDS/Zserio/NDS_ZSERIO-linux-build/compiler/extensions/python/runtime/src/zserio/bitwriter.py: 100%
142 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-12-13 15:12 +0000
1"""
2The module implements abstraction for writing data to the bit stream.
3"""
5from zserio.bitbuffer import BitBuffer
6from zserio.bitsizeof import (bitsizeof_varint16, bitsizeof_varint32,
7 bitsizeof_varint64, bitsizeof_varint,
8 bitsizeof_varuint16, bitsizeof_varuint32,
9 bitsizeof_varuint64, bitsizeof_varuint,
10 bitsizeof_varsize)
11from zserio.exception import PythonRuntimeException
12from zserio.float import float_to_uint16, float_to_uint32, float_to_uint64
13from zserio.limits import INT64_MIN
14from zserio.cppbind import import_cpp_class
16class BitStreamWriter:
17 """
18 Bit stream writer using bytearray.
19 """
21 def __init__(self) -> None:
22 """
23 Constructor.
24 """
26 self._byte_array: bytearray = bytearray()
27 self._bitposition: int = 0
29 def write_bits(self, value: int, numbits: int) -> None:
30 """
31 Writes the given value with the given number of bits to the underlying storage.
33 :param value: Value to write.
34 :param numbits: Number of bits to write.
35 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
36 """
38 if numbits <= 0:
39 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' is less than 1!")
41 self.write_bits_unchecked(value, numbits)
43 def write_signed_bits(self, value: int, numbits: int) -> None:
44 """
45 Writes the given signed value with the given number of bits to the underlying storage.
46 Provided for convenience.
48 :param value: Signed value to write.
49 :param numbits: Number of bits to write.
50 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
51 """
53 if numbits <= 0:
54 raise PythonRuntimeException(f"BitStreamWriter: numbits '{numbits}' is less than 1!")
56 self.write_signed_bits_unchecked(value, numbits)
58 def write_bits_unchecked(self, value: int, numbits: int) -> None:
59 """
60 Writes the given value with the given number of bits to the underlying storage.
62 This method does not check that numbits > 0 and assumes that it's ensured by the caller.
64 :param value: Value to write.
65 :param numbits: Number of bits to write.
66 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
67 """
69 min_value = 0
70 max_value = (1 << numbits) - 1
71 if value < min_value or value > max_value:
72 raise PythonRuntimeException(f"BitStreamWriter: Value '{value}' is out of the range "
73 f"<{min_value},{max_value}>!")
75 self._write_bits(value, numbits, signed=False)
77 def write_signed_bits_unchecked(self, value: int, numbits: int) -> None:
78 """
79 Writes the given signed value with the given number of bits to the underlying storage.
80 Provided for convenience.
82 This method does not check that numbits > 0 and assumes that it's ensured by the caller.
84 :param value: Signed value to write.
85 :param numbits: Number of bits to write.
86 :raises PythonRuntimeException: If the value is out of the range or if the number of bits is invalid.
87 """
89 min_value = -(1 << (numbits - 1))
90 max_value = (1 << (numbits - 1)) - 1
91 if value < min_value or value > max_value:
92 raise PythonRuntimeException(f"BitStreamWriter: Value '{value}' is out of the range "
93 f"<{min_value},{max_value}>!")
95 self._write_bits(value, numbits, signed=True)
97 def write_varint16(self, value: int) -> None:
98 """
99 Writes a variable 16-bit signed integer value to the underlying storage.
101 :param value: Value to write.
102 :raises PythonRuntimeException: If the value is out of the range.
103 """
105 self._write_varnum(value, 2, bitsizeof_varint16(value) // 8, is_signed=True)
107 def write_varint32(self, value: int) -> None:
108 """
109 Writes a variable 32-bit signed integer value to the underlying storage.
111 :param value: Value to write.
112 :raises PythonRuntimeException: If the value is out of the range.
113 """
115 self._write_varnum(value, 4, bitsizeof_varint32(value) // 8, is_signed=True)
117 def write_varint64(self, value: int) -> None:
118 """
119 Writes a variable 16-bit signed integer value to the underlying storage.
121 :param value: Value to write.
122 :raises PythonRuntimeException: If the value is out of the range.
123 """
125 self._write_varnum(value, 8, bitsizeof_varint64(value) // 8, is_signed=True)
127 def write_varint(self, value: int) -> None:
128 """
129 Writes a variable signed integer value (up to 9 bytes) to the underlying storage.
131 :param value: Value to write.
132 :raises PythonRuntimeException: If the value is out of the range.
133 """
135 if value == INT64_MIN:
136 self._write_bits(0x80, 8) # INT64_MIN is stored as -0
137 else:
138 self._write_varnum(value, 9, bitsizeof_varint(value) // 8, is_signed=True)
140 def write_varuint16(self, value: int) -> None:
141 """
142 Writes a variable 16-bit unsigned integer value to the underlying storage.
144 :param value: Value to write.
145 :raises PythonRuntimeException: If the value is out of the range.
146 """
148 self._write_varnum(value, 2, bitsizeof_varuint16(value) // 8, is_signed=False)
150 def write_varuint32(self, value: int) -> None:
151 """
152 Writes a variable 32-bit unsigned integer value to the underlying storage.
154 :param value: Value to write.
155 :raises PythonRuntimeException: If the value is out of the range.
156 """
158 self._write_varnum(value, 4, bitsizeof_varuint32(value) // 8, is_signed=False)
160 def write_varuint64(self, value: int) -> None:
161 """
162 Writes a variable 16-bit unsigned integer value to the underlying storage.
164 :param value: Value to write.
165 :raises PythonRuntimeException: If the value is out of the range.
166 """
168 self._write_varnum(value, 8, bitsizeof_varuint64(value) // 8, is_signed=False)
170 def write_varuint(self, value: int) -> None:
171 """
172 Writes a variable unsigned integer value (up to 9 bytes) to the underlying storage.
174 :param value: Value to write.
175 :raises PythonRuntimeException: If the value is out of the range.
176 """
178 self._write_varnum(value, 9, bitsizeof_varuint(value) // 8, is_signed=False)
180 def write_varsize(self, value: int) -> None:
181 """
182 Writes a variable size integer value to the underlying storage.
184 :param value: Value to write.
185 :raises PythonRuntimeException: If the value is out of the range.
186 """
188 self._write_varnum(value, 5, bitsizeof_varsize(value) // 8, is_signed=False)
190 def write_float16(self, value: float) -> None:
191 """
192 Writes a 16-bit float value to the underlying storage according to IEEE 754 binary16.
194 :param value: Float value to write.
195 """
197 self.write_bits_unchecked(float_to_uint16(value), 16)
199 def write_float32(self, value: float) -> None:
200 """
201 Writes a 32-bit float value to the underlying storage according to IEEE 754 binary32.
203 :param value: Float value to write.
204 """
206 self.write_bits_unchecked(float_to_uint32(value), 32)
208 def write_float64(self, value: float) -> None:
209 """
210 Writes a 64-bit float value to the underlying storage according to IEEE 754 binary64.
212 :param value: Float value to write.
213 """
215 self.write_bits_unchecked(float_to_uint64(value), 64)
217 def write_bytes(self, value: bytearray):
218 """
219 Writes the given bytes to the underlying storage. Length of the bytes is written
220 as varsize at the beginning.
222 :param value: Bytes to write.
223 """
225 length = len(value)
226 self.write_varsize(length)
228 begin_bitposition = self._bitposition
229 if (begin_bitposition & 0x07) != 0:
230 # we are not aligned to byte
231 for byte in value:
232 self.write_bits_unchecked(byte, 8)
233 else:
234 # we are aligned to byte
235 self._bitposition += length * 8
236 self._byte_array += value[0:length]
238 def write_string(self, string: str) -> None:
239 """
240 Writes the given string to the underlying storage in UTF-8 encoding. Length of the string is written
241 as varsize at the beginning.
243 :param string: String to write.
244 """
246 string_bytes = string.encode("utf-8")
247 length = len(string_bytes)
248 self.write_varsize(length)
250 begin_bitposition = self._bitposition
251 if (begin_bitposition & 0x07) != 0:
252 # we are not aligned to byte
253 for string_byte in string_bytes:
254 self.write_bits_unchecked(string_byte, 8)
255 else:
256 # we are aligned to byte
257 self._bitposition += length * 8
258 self._byte_array += string_bytes[0:length]
260 def write_bool(self, value: bool) -> None:
261 """
262 Writes bool in a single bit.
264 :param value: Bool value to write.
265 """
267 self._write_bits(1 if value else 0, 1)
269 def write_bitbuffer(self, bitbuffer: BitBuffer) -> None:
270 """
271 Writes a bit buffer to the underlying storage. Length of the bit buffer is written as varsize
272 at the beginning.
274 :param bitbuffer: Bit buffer to write.
275 """
277 bitsize = bitbuffer.bitsize
278 self.write_varsize(bitsize)
280 write_buffer = bitbuffer.buffer
281 num_bytes_to_write = bitsize // 8
282 num_rest_bits = bitsize - num_bytes_to_write * 8
283 begin_bitposition = self._bitposition
284 if (begin_bitposition & 0x07) != 0:
285 # we are not aligned to byte
286 for i in range(num_bytes_to_write):
287 self.write_bits_unchecked(write_buffer[i], 8)
288 else:
289 # we are aligned to byte
290 self._bitposition += num_bytes_to_write * 8
291 self._byte_array += write_buffer[0:num_bytes_to_write]
293 if num_rest_bits > 0:
294 self.write_bits_unchecked(write_buffer[num_bytes_to_write] >> (8 - num_rest_bits), num_rest_bits)
296 @property
297 def byte_array(self) -> bytes:
298 """
299 Gets internal bytearray.
301 :returns: Underlying bytearray object.
302 """
304 return self._byte_array
306 def to_file(self, filename: str) -> None:
307 """
308 Writes underlying bytearray to binary file.
310 :param filename: File to write.
311 """
313 with open(filename, "wb") as file:
314 file.write(self._byte_array)
316 @property
317 def bitposition(self) -> int:
318 """
319 Gets current bit position.
321 :returns: Current bit position.
322 """
324 return self._bitposition
326 def alignto(self, alignment: int) -> None:
327 """
328 Aligns the bit position according to the aligning value.
330 :param alignment: An aligning value to use.
331 """
333 offset = self._bitposition % alignment
334 if offset != 0:
335 self._write_bits(0, alignment - offset)
337 def _write_bits(self, value: int, numbits: int, *, signed: bool = False) -> None:
338 buffer_last_byte_bits = self._bitposition % 8
339 buffer_free_bits = (8 - buffer_last_byte_bits) if buffer_last_byte_bits != 0 else 0
340 value_first_byte_bits = numbits % 8 or 8
341 if value_first_byte_bits <= buffer_free_bits:
342 left_shift = buffer_free_bits - value_first_byte_bits
343 else:
344 left_shift = buffer_free_bits + 8 - value_first_byte_bits
345 value <<= left_shift
346 num_bytes = (numbits + left_shift + 7) // 8
347 value_bytes = value.to_bytes(num_bytes, byteorder='big', signed=signed)
348 if buffer_free_bits == 0:
349 self._byte_array.extend(value_bytes)
350 else:
351 value_first_byte = value_bytes[0] & ((1 << buffer_free_bits) - 1)
352 self._byte_array[-1] |= value_first_byte
353 self._byte_array.extend(value_bytes[1:])
355 self._bitposition += numbits
357 def _write_varnum(self, value: int, max_var_bytes: int, num_var_bytes: int, *, is_signed: bool) -> None:
358 abs_value = abs(value)
359 has_max_byte_range = num_var_bytes == max_var_bytes
360 for i in range(num_var_bytes):
361 byte = 0x00
362 numbits = 8
363 has_next_byte = i < num_var_bytes - 1
364 has_sign_bit = (is_signed and i == 0)
365 if has_sign_bit:
366 if value < 0:
367 byte |= 0x80
368 numbits -= 1
369 if has_next_byte:
370 numbits -= 1
371 byte |= (1 << numbits) # use bit 6 if signed bit is present, use bit 7 otherwise
372 else: # this is the last byte
373 if not has_max_byte_range: # next byte flag isn't used in last byte in case of max byte range
374 numbits -= 1
376 shift_bits = (num_var_bytes - (i + 1)) * 7 + (1 if has_max_byte_range and has_next_byte else 0)
377 byte |= (abs_value >> shift_bits) & VAR_NUM_BIT_MASKS[numbits - 1]
378 self.write_bits_unchecked(byte, 8)
380VAR_NUM_BIT_MASKS = [0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f, 0xff]
382BitStreamWriter = import_cpp_class("BitStreamWriter") or BitStreamWriter # type: ignore