# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#
import logging
import socket
from .asynclib import Dispatcher, loop
from .codec import decode_message, is_chunked_message, join
from .constants import ACK, NAK, ENCODING
from .exceptions import InvalidState, NotAccepted
from .protocol import ASTMProtocol, STATE
log = logging.getLogger(__name__)
__all__ = ['BaseRecordsDispatcher', 'RequestHandler', 'Server']
[docs]class BaseRecordsDispatcher(object):
"""Dispatcher of received ASTM records by :class:`RequestHandler`."""
#: Encoding of received messages.
encoding = ENCODING
def __init__(self, encoding=None):
self.encoding = encoding or self.encoding
self.dispatch = {
'H': self.on_header,
'C': self.on_comment,
'P': self.on_patient,
'O': self.on_order,
'R': self.on_result,
'L': self.on_terminator
}
self.wrappers = {}
def __call__(self, message):
seq, records, cs = decode_message(message, self.encoding)
for record in records:
self.dispatch.get(record[0], self.on_unknown)(self.wrap(record))
def wrap(self, record):
rtype = record[0]
if rtype in self.wrappers:
return self.wrappers[rtype](*record)
return record
[docs] def on_patient(self, record):
"""Patient record handler."""
[docs] def on_order(self, record):
"""Order record handler."""
[docs] def on_result(self, record):
"""Result record handler."""
[docs] def on_terminator(self, record):
"""Terminator record handler."""
[docs] def on_unknown(self, record):
"""Fallback handler for dispatcher."""
[docs]class RequestHandler(ASTMProtocol):
"""ASTM protocol request handler.
:param sock: Socket object.
:param dispatcher: Request handler records dispatcher instance.
:type dispatcher: :class:`BaseRecordsDispatcher`
"""
def __init__(self, sock, dispatcher):
super(RequestHandler, self).__init__(sock)
self.set_init_state()
self._chunks = []
host, port = sock.getpeername() if sock is not None else (None, None)
self.client_info = {'host': host, 'port': port}
self.dispatcher = dispatcher
def on_enq(self):
if self.state == STATE.init:
self.set_transfer_state()
return ACK
else:
raise NotAccepted('ENQ is not expected while handler in state %r'
% self.state)
def on_ack(self):
raise NotAccepted('Server should not be ACKed.')
def on_nak(self):
raise NotAccepted('Server should not be NAKed.')
def on_eot(self):
if self.state != STATE.transfer:
raise InvalidState('Server is not ready to accept EOT message.')
self.set_init_state()
def on_message(self):
if self.state != STATE.transfer:
self.discard_input_buffers()
return NAK
else:
try:
self.handle_message(self._last_recv_data)
return ACK
except Exception:
log.exception('Error occurred on message handling.')
return NAK
def handle_message(self, message):
if self.is_chunked_transfer is None:
self.is_chunked_transfer = is_chunked_message(message)
if self.is_chunked_transfer:
self._chunks.append(message)
elif self._chunks:
self._chunks.append(message)
self.dispatcher(join(self._chunks))
self._chunks = []
else:
self.dispatcher(message)
def discard_input_buffers(self):
self._chunks = []
return super(RequestHandler, self).discard_input_buffers()
[docs]class Server(Dispatcher):
"""Asyncore driven ASTM server.
:param host: Server IP address or hostname.
:type host: str
:param port: Server port number.
:type port: int
:param request: Custom server request handler. If omitted the
:class:`RequestHandler` will be used by default.
:param dispatcher: Custom request handler records dispatcher. If omitted the
:class:`BaseRecordsDispatcher` will be used by default.
"""
request = RequestHandler
dispatcher = BaseRecordsDispatcher
def __init__(self, host='localhost', port=15200,
request=None, dispatcher=None):
super(Server, self).__init__()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.pool = []
if request is not None:
self.request = request
if dispatcher is not None:
self.dispatcher = dispatcher
def handle_accept(self):
pair = self.accept()
if pair is None:
return
sock, addr = pair
self.request(sock, self.dispatcher())
super(Server, self).handle_accept()
[docs] def serve_forever(self, *args, **kwargs):
"""Enters into the :func:`polling loop <asynclib.loop>` to let server
handle incoming requests."""
loop(*args, **kwargs)