Source code for astm.mapping

# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 Alexander Shorin
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#

import datetime
import decimal
import inspect
import time
import warnings
from operator import itemgetter
from itertools import islice
try:
    from itertools import izip_longest
except ImportError: # Python 3
    from itertools import zip_longest as izip_longest
    from .compat import basestring, unicode, long


def make_string(value):
    if isinstance(value, unicode):
        return value
    elif isinstance(value, bytes):
        return unicode(value, 'utf-8')
    else:
        return unicode(value)


[docs]class Field(object): """Base mapping field class.""" def __init__(self, name=None, default=None, required=False, length=None): self.name = name self.default = default self.required = required self.length = length def __get__(self, instance, owner): if instance is None: return self value = instance._data.get(self.name) if value is not None: value = self._get_value(value) elif self.default is not None: default = self.default if hasattr(default, '__call__'): default = default() value = default return value def __set__(self, instance, value): if value is not None: value = self._set_value(value) instance._data[self.name] = value def _get_value(self, value): return value def _set_value(self, value): value = make_string(value) if self.length is not None and len(value) > self.length: raise ValueError('Field %r value is too long (max %d, got %d)' '' % (self.name, self.length, len(value))) return value
class MetaMapping(type): def __new__(mcs, name, bases, d): fields = [] names = [] def merge_fields(items): for name, field in items: if field.name is None: field.name = name if name not in names: fields.append((name, field)) names.append(name) else: fields[names.index(name)] = (name, field) for base in bases: if hasattr(base, '_fields'): merge_fields(base._fields) merge_fields([(k, v) for k, v in d.items() if isinstance(v, Field)]) if '_fields' not in d: d['_fields'] = fields else: merge_fields(d['_fields']) d['_fields'] = fields return super(MetaMapping, mcs).__new__(mcs, name, bases, d) _MappingProxy = MetaMapping('_MappingProxy', (object,), {}) # Python 3 workaround class Mapping(_MappingProxy): def __init__(self, *args, **kwargs): fieldnames = map(itemgetter(0), self._fields) values = dict(izip_longest(fieldnames, args)) values.update(kwargs) self._data = {} for attrname, field in self._fields: attrval = values.pop(attrname, None) if attrval is None: setattr(self, attrname, getattr(self, attrname)) else: setattr(self, attrname, attrval) if values: raise ValueError('Unexpected kwargs found: %r' % values) @classmethod def build(cls, *a): fields = [] newcls = type('Generic' + cls.__name__, (cls,), {}) for field in a: if field.name is None: raise ValueError('Name is required for ordered fields.') setattr(newcls, field.name, field) fields.append((field.name, field)) newcls._fields = fields return newcls def __getitem__(self, key): return self.values()[key] def __setitem__(self, key, value): setattr(self, self._fields[key][0], value) def __delitem__(self, key): self._data[self._fields[key][0]] = None def __iter__(self): return iter(self.values()) def __contains__(self, item): return item in self.values() def __len__(self): return len(self._data) def __eq__(self, other): if len(self) != len(other): return False for key, value in zip(self.keys(), other): if getattr(self, key) != value: return False return True def __ne__(self, other): return not (self == other) def __repr__(self): return '%s(%s)' % (self.__class__.__name__, ', '.join('%s=%r' % (key, value) for key, value in self.items())) def keys(self): return [key for key, field in self._fields] def values(self): return [getattr(self, key) for key in self.keys()] def items(self): return [(key, getattr(self, key)) for key, field in self._fields] def to_astm(self): def values(obj): for key, field in obj._fields: value = obj._data[key] if isinstance(value, Mapping): yield list(values(value)) elif isinstance(value, list): stack = [] for item in value: if isinstance(item, Mapping): stack.append(list(values(item))) else: stack.append(item) yield stack elif value is None and field.required: raise ValueError('Field %r value should not be None' % key) else: yield value return list(values(self))
[docs]class Record(Mapping): """ASTM record mapping class."""
[docs]class Component(Mapping): """ASTM component mapping class."""
[docs]class TextField(Field): """Mapping field for string values.""" def _set_value(self, value): if not isinstance(value, basestring): raise TypeError('String value expected, got %r' % value) return super(TextField, self)._set_value(value)
[docs]class ConstantField(Field): """Mapping field for constant values. >>> class Record(Mapping): ... type = ConstantField(default='S') >>> rec = Record() >>> rec.type 'S' >>> rec.type = 'W' Traceback (most recent call last): ... ValueError: Field changing not allowed """ def __init__(self, name=None, default=None, field=Field()): super(ConstantField, self).__init__(name, default, True, None) self.field = field self.required = True if self.default is None: raise ValueError('Constant value should be defined') def _get_value(self, value): return self.default def _set_value(self, value): value = self.field._get_value(value) if self.default != value: raise ValueError('Field changing not allowed: got %r, accepts %r' '' % (value, self.default)) return super(ConstantField, self)._set_value(value)
[docs]class IntegerField(Field): """Mapping field for integer values.""" def _get_value(self, value): return int(value) def _set_value(self, value): if not isinstance(value, (int, long)): try: value = self._get_value(value) except Exception: raise TypeError('Integer value expected, got %r' % value) return super(IntegerField, self)._set_value(value)
[docs]class DecimalField(Field): """Mapping field for decimal values.""" def _get_value(self, value): return decimal.Decimal(value) def _set_value(self, value): if not isinstance(value, (int, long, float, decimal.Decimal)): raise TypeError('Decimal value expected, got %r' % value) return super(DecimalField, self)._set_value(value)
[docs]class DateField(Field): """Mapping field for storing date/time values.""" format = '%Y%m%d' def _get_value(self, value): return datetime.datetime.strptime(value, self.format) def _set_value(self, value): if isinstance(value, basestring): value = self._get_value(value) if not isinstance(value, (datetime.datetime, datetime.date)): raise TypeError('Datetime value expected, got %r' % value) return value.strftime(self.format)
[docs]class TimeField(Field): """Mapping field for storing times.""" format = '%H%M%S' def _get_value(self, value): if isinstance(value, basestring): try: value = value.split('.', 1)[0] # strip out microseconds value = datetime.time(*time.strptime(value, self.format)[3:6]) except ValueError: raise ValueError('Value %r does not match format %s' '' % (value, self.format)) return value def _set_value(self, value): if isinstance(value, basestring): value = self._get_value(value) if not isinstance(value, (datetime.datetime, datetime.time)): raise TypeError('Datetime value expected, got %r' % value) if isinstance(value, datetime.datetime): value = value.time() return value.replace(microsecond=0).strftime(self.format)
[docs]class DateTimeField(Field): """Mapping field for storing date/time values.""" format = '%Y%m%d%H%M%S' def _get_value(self, value): return datetime.datetime.strptime(value, self.format) def _set_value(self, value): if isinstance(value, basestring): value = self._get_value(value) if not isinstance(value, (datetime.datetime, datetime.date)): raise TypeError('Datetime value expected, got %r' % value) return value.strftime(self.format)
[docs]class SetField(Field): """Mapping field for predefined set of values.""" def __init__(self, name=None, default=None, required=False, length=None, values=None, field=Field()): super(SetField, self).__init__(name, default, required, length) self.field = field self.values = values and set(values) or set([]) def _get_value(self, value): return self.field._get_value(value) def _set_value(self, value): value = self.field._get_value(value) if value not in self.values: raise ValueError('Unexpectable value %r' % value) return self.field._set_value(value)
[docs]class ComponentField(Field): """Mapping field for storing record component.""" def __init__(self, mapping, name=None, default=None): self.mapping = mapping default = default or mapping() super(ComponentField, self).__init__(name, default) def _get_value(self, value): if isinstance(value, dict): return self.mapping(**value) elif isinstance(value, self.mapping): return value else: return self.mapping(*value) def _set_value(self, value): if isinstance(value, dict): return self.mapping(**value) elif isinstance(value, self.mapping): return value if isinstance(value, basestring): value = [value] return self.mapping(*value)
[docs]class RepeatedComponentField(Field): """Mapping field for storing list of record components.""" def __init__(self, field, name=None, default=None): if isinstance(field, ComponentField): self.field = field else: assert isinstance(field, type) and issubclass(field, Mapping) self.field = ComponentField(field) default = default or [] super(RepeatedComponentField, self).__init__(name, default) class Proxy(list): def __init__(self, seq, field): list.__init__(self, seq) self.list = seq self.field = field def _to_list(self): return [list(self.field._get_value(item)) for item in self.list] def __add__(self, other): obj = type(self)(self.list, self.field) obj.extend(other) return obj def __iadd__(self, other): self.extend(other) return self def __mul__(self, other): return type(self)(self.list * other, self.field) def __imul__(self, other): self.list *= other return self def __lt__(self, other): return self._to_list() < other def __le__(self, other): return self._to_list() <= other def __eq__(self, other): return self._to_list() == other def __ne__(self, other): return self._to_list() != other def __ge__(self, other): return self._to_list() >= other def __gt__(self, other): return self._to_list() > other def __repr__(self): return '<ListProxy %s %r>' % (self.list, list(self)) def __str__(self): return str(self.list) def __unicode__(self): return unicode(self.list) def __delitem__(self, index): del self.list[index] def __getitem__(self, index): return self.field._get_value(self.list[index]) def __setitem__(self, index, value): self.list[index] = self.field._set_value(value) def __delslice__(self, i, j): del self.list[i:j] def __getslice__(self, i, j): return self.__class__(self.list[i:j], self.field) def __setslice__(self, i, j, seq): self.list[i:j] = [self.field._set_value(v) for v in seq] def __contains__(self, value): for item in self: if item == value: return True return False def __iter__(self): for index in range(len(self)): yield self[index] def __len__(self): return len(self.list) def __nonzero__(self): return bool(self.list) def __reduce__(self): return self.list.__reduce__() def __reduce_ex__(self, *args, **kwargs): return self.list.__reduce_ex__(*args, **kwargs) def append(self, item): self.list.append(self.field._set_value(item)) def count(self, value): return self._to_list().count(value) def extend(self, other): self.list.extend([self.field._set_value(i) for i in other]) def index(self, value, start=None, stop=None): start = start or 0 for idx, item in enumerate(islice(self, start, stop)): if item == value: return idx + start else: raise ValueError('%r not in list' % value) def insert(self, index, object): self.list.insert(index, self.field._set_value(object)) def remove(self, value): for item in self: if item == value: return self.list.remove(value) raise ValueError('Value %r not in list' % value) def pop(self, index=-1): return self.field._get_value(self.list.pop(index)) def sort(self, cmp=None, key=None, reverse=False): raise NotImplementedError('In place sorting not allowed.') # update docstrings from list for name, obj in inspect.getmembers(Proxy): if getattr(list, name, None) is None\ or name in ['__module__', '__doc__']: continue if not inspect.isfunction(obj): continue obj.__doc__ = getattr(list, name).__doc__ del name, obj def _get_value(self, value): return self.Proxy(value, self.field) def _set_value(self, value): return [self.field._set_value(item) for item in value]
[docs]class NotUsedField(Field): """Mapping field for value that should be used. Acts as placeholder. On attempt to assign something to it raises :exc:`UserWarning` and rejects assigned value.""" def __init__(self, name=None): super(NotUsedField, self).__init__(name) def _get_value(self, value): return None def _set_value(self, value): warnings.warn('Field %r is not used, any assignments are omitted' '' % self.name, UserWarning) return None

Project Versions