Source code for astm.codec

# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#

from collections import Iterable
from .compat import unicode
from .constants import (
    STX, ETX, ETB, CR, LF, CRLF,
    FIELD_SEP, COMPONENT_SEP, RECORD_SEP, REPEAT_SEP, ENCODING
)
try:
    from itertools import izip_longest
except ImportError: # Python 3
    from itertools import zip_longest as izip_longest

[docs]def decode(data, encoding=ENCODING): """Common ASTM decoding function that tries to guess which kind of data it handles. If `data` starts with STX character (``0x02``) than probably it is full ASTM message with checksum and other system characters. If `data` starts with digit character (``0-9``) than probably it is frame of records leading by his sequence number. No checksum is expected in this case. Otherwise it counts `data` as regular record structure. Note, that `data` should be bytes, not unicode string even if you know his `encoding`. :param data: ASTM data object. :type data: bytes :param encoding: Data encoding. :type encoding: str :return: List of ASTM records with unicode data. :rtype: list """ if not isinstance(data, bytes): raise TypeError('bytes expected, got %r' % data) if data.startswith(STX): # may be decode message \x02...\x03CS\r\n seq, records, cs = decode_message(data, encoding) return records byte = data[:1].decode() if byte.isdigit(): seq, records = decode_frame(data, encoding) return records return [decode_record(data, encoding)]
[docs]def decode_message(message, encoding): """Decodes complete ASTM message that is sent or received due communication routines. It should contains checksum that would be additionally verified. :param message: ASTM message. :type message: bytes :param encoding: Data encoding. :type encoding: str :returns: Tuple of three elements: * :class:`int` frame sequence number. * :class:`list` of records with unicode data. * :class:`bytes` checksum. :raises: * :exc:`ValueError` if ASTM message is malformed. * :exc:`AssertionError` if checksum verification fails. """ if not isinstance(message, bytes): raise TypeError('bytes expected, got %r' % message) if not (message.startswith(STX) and message.endswith(CRLF)): raise ValueError('Malformed ASTM message. Expected that it will started' ' with %x and followed by %x%x characters. Got: %r' ' ' % (ord(STX), ord(CR), ord(LF), message)) stx, frame_cs = message[0], message[1:-2] frame, cs = frame_cs[:-2], frame_cs[-2:] ccs = make_checksum(frame) assert cs == ccs, 'Checksum failure: expected %r, calculated %r' % (cs, ccs) seq, records = decode_frame(frame, encoding) return seq, records, cs.decode()
[docs]def decode_frame(frame, encoding): """Decodes ASTM frame: list of records followed by sequence number.""" if not isinstance(frame, bytes): raise TypeError('bytes expected, got %r' % frame) if frame.endswith(CR + ETX): frame = frame[:-2] elif frame.endswith(ETB): frame = frame[:-1] else: raise ValueError('Incomplete frame data %r.' ' Expected trailing <CR><ETX> or <ETB> chars' % frame) seq = frame[:1].decode() if not seq.isdigit(): raise ValueError('Malformed ASTM frame. Expected leading seq number %r' '' % frame) seq, records = int(seq), frame[1:] return seq, [decode_record(record, encoding) for record in records.split(RECORD_SEP)]
[docs]def decode_record(record, encoding): """Decodes ASTM record message.""" fields = [] for item in record.split(FIELD_SEP): if REPEAT_SEP in item: item = decode_repeated_component(item, encoding) elif COMPONENT_SEP in item: item = decode_component(item, encoding) else: item = item.decode(encoding) fields.append([None, item][bool(item)]) return fields
[docs]def decode_component(field, encoding): """Decodes ASTM field component.""" return [[None, item.decode(encoding)][bool(item)] for item in field.split(COMPONENT_SEP)]
[docs]def decode_repeated_component(component, encoding): """Decodes ASTM field repeated component.""" return [decode_component(item, encoding) for item in component.split(REPEAT_SEP)]
[docs]def encode(records, encoding=ENCODING, size=None): """Encodes list of records into single ASTM message, also called as "packed" message. If you need to get each record as standalone message use :func:`iter_encode` instead. If the result message is too large (greater than :const:`MAX_MESSAGE_SIZE`), than it will be splitted by chunks. :param records: List of ASTM records. :type records: list :param encoding: Data encoding. :type encoding: str :return: List of ASTM message chunks. :rtype: list """ msg = encode_message(1, records, encoding) if size is not None and len(msg[1:-5]) > size: return list(split(msg, size)) return [msg]
[docs]def iter_encode(records, encoding=ENCODING, size=None): """Encodes and emits each record as separate message. If the result message is too large (greater than :const:`MAX_MESSAGE_SIZE`), than it will be splitted by chunks. :yields: ASTM message chunks. :rtype: str """ idx = 1 for record in records: msg = encode_message(idx, [record], encoding) if size is not None and len(msg) > size: for chunk in split(msg, size): idx += 1 yield chunk else: idx += 1 yield msg
[docs]def encode_message(seq, records, encoding): """Encodes ASTM message. :param seq: Frame sequence number. :type seq: int :param records: List of ASTM records. :type records: list :param encoding: Data encoding. :type encoding: str :return: ASTM complete message with checksum and other control characters. :rtype: str """ data = RECORD_SEP.join(encode_record(record, encoding) for record in records) data = b''.join((str(seq).encode(), data, CR, ETX)) return b''.join([STX, data, make_checksum(data), CR, LF])
[docs]def encode_record(record, encoding): """Encodes single ASTM record. :param record: ASTM record. Each :class:`str`-typed item counted as field value, one level nested :class:`list` counted as components and second leveled - as repeated components. :type record: list :param encoding: Data encoding. :type encoding: str :returns: Encoded ASTM record. :rtype: str """ fields = [] _append = fields.append for field in record: if isinstance(field, bytes): _append(field) elif isinstance(field, unicode): _append(field.encode(encoding)) elif isinstance(field, Iterable): _append(encode_component(field, encoding)) elif field is None: _append(b'') else: _append(unicode(field).encode(encoding)) return FIELD_SEP.join(fields)
[docs]def encode_component(component, encoding): """Encodes ASTM record field components.""" items = [] _append = items.append for item in component: if isinstance(item, bytes): _append(item) elif isinstance(item, unicode): _append(item.encode(encoding)) elif isinstance(item, Iterable): return encode_repeated_component(component, encoding) elif item is None: _append(b'') else: _append(unicode(item).encode(encoding)) return COMPONENT_SEP.join(items).rstrip(COMPONENT_SEP)
[docs]def encode_repeated_component(components, encoding): """Encodes repeated components.""" return REPEAT_SEP.join(encode_component(item, encoding) for item in components)
[docs]def make_checksum(message): """Calculates checksum for specified message. :param message: ASTM message. :type message: bytes :returns: Checksum value that is actually byte sized integer in hex base :rtype: bytes """ if not isinstance(message[0], int): message = map(ord, message) return hex(sum(message) & 0xFF)[2:].upper().zfill(2).encode()
def make_chunks(s, n): iter_bytes = (s[i:i+1] for i in range(len(s))) return [b''.join(item) for item in izip_longest(*[iter_bytes]*n, fillvalue=b'')] def split(msg, size): stx, frame, msg, tail = msg[:1], msg[1:2], msg[2:-6], msg[-6:] assert stx == STX assert frame.isdigit() assert tail.endswith(CRLF) frame = int(frame) chunks = make_chunks(msg, size) chunks, last = chunks[:-1], chunks[-1] idx = 0 for idx, chunk in enumerate(chunks): item = b''.join([str(idx + frame).encode(), chunk, ETB]) yield b''.join([STX, item, make_checksum(item), CRLF]) item = b''.join([str(idx + frame + 1).encode(), last, CR, ETX]) yield b''.join([STX, item, make_checksum(item), CRLF]) def join(chunks): chunks = list(chunks) chunks, last = chunks[:-1], chunks[-1] msg = b'1' + b''.join(c[2:-5] for c in chunks) + last[2:-4] return b''.join([STX, msg, make_checksum(msg), CRLF])
[docs]def is_chunked_message(message): """Checks plain message for chunked byte.""" length = len(message) if len(message) < 5: return False if ETB not in message: return False return message.index(ETB) == length - 5

Project Versions