.. module:: configuration
:synopsis: Implements the configurations model.
.. moduleauthor:: Mariano Montone <marianomontone@gmail.com>
#import xml.etree.ElementTree as et
from util import *
from lxml import etree as et
import datetime
import dependencies
import logging
import email.utils
import urlparse
[docs]class ConfigurationSchema():
"""Configuration schemas define the configurations structure. They have a name, a list of parents, and a list of sections with options definitions.
Configuration schemas can be composed by inheriting from multiple parents. Configuration sections from the parents appear in the child configuration schema. For instance, a full stack web framework configuration schema could inherit from a generic Web schema for web server configuration, and another Database schema for database connection configuration.
Configuration schemas have sections, each containing other sections and schema options. The schemas sections and options can be manipulated in the tree appearing on the left of the configuration schemas navigator.
_configuration_schemas = {}
[docs] def get_named(cls, name):
"""Get the configuration schema registered with name *name*"""
schema = cls._configuration_schemas.get(name)
if schema is None:
raise Exception(name + ' schema not found')
return schema
[docs] def register_schema(cls, schema):
"""Register a schema, globally. This is called automatically when a configuration schema is created."""
logging.info("Registering schema " + schema.name)
cls._configuration_schemas[schema.name] = schema
[docs] def unregister_schema(cls, schema):
"""Unregister a schema, globally."""
del cls._configuration_schemas[schema.name]
[docs] def configuration_schemas(cls):
"""Return the list of globally registered configuration schemas"""
return cls._configuration_schemas.values()
def __init__(self, name='', **args):
self._name = name
self._direct_sections = []
self._documentation = args.get('documentation') or ""
self._parents = args.get("parents") or []
[docs] def section(self, section):
"""Add section *section* to schema"""
section.parent = self
return self
[docs] def direct_sections(self):
"""Return schema's direct sections (without looking at schema's parents sections)"""
return self._direct_sections
[docs] def sections(self):
"""Return schema's sections recursively considering its parents"""
sections = []
for parent in self.parents():
return sections
[docs] def move_section_backwards(self, section):
"""Move *section* backwards. For moving sections from UI"""
index = self._direct_sections.index(section)
if index > 0:
prev_section = self._direct_sections[index - 1]
self._direct_sections[index - 1] = section
self._direct_sections[index] = prev_section
[docs] def move_section_forward(self, section):
"""Move *section* forward (reorder). For moving sections from UI"""
index = self._direct_sections.index(section)
if index < len(self._direct_sections) - 1:
next_section = self._direct_sections[index - 1]
self._direct_sections[index + 1] = section
self._direct_sections[index] = next_section
[docs] def get_section(self, name):
"""Get section with name"""
return next((s for s in self._direct_sections if s.name == name), None)
[docs] def remove_section(self, section):
"""Remove *section* from the schema"""
[docs] def parents(self):
"""Returns the schema's parents list"""
return map(lambda name: ConfigurationSchema.get_named(name), self._parents)
[docs] def set_parents(self, parents):
"""Sets the schema's parents to *parents* list"""
self._parents = []
for parent in parents:
return self
[docs] def add_parent(self, parent):
"""Adds *parent* to the list of schema's parents"""
if isinstance(parent, str):
def name(self):
"""The schema's name"""
return self._name
[docs] def name(self, value):
self._name = value
return self
def documentation(self):
"""The schema's documentation"""
return self._documentation
[docs] def documentation(self, value):
self._documentation = value
return self
def remove(self):
def __str__(self):
return self.name
def path(self):
return ()
def option_in_path(self, path):
section = next((s for s in self.sections() if s.name == path[0]), None)
if not section:
raise Exception('Section ' + path[1] + ' not found')
return section.option_in_path(path[1:])
[docs]class ConfigurationSchemaSection:
"""Configuration schema section"""
def __init__(self, name='', **args):
self._name = name
self._subsections = []
self._options = []
self._documentation = args.get('documentation') or ''
self._parent = args.get('parent') or None
self._dependency_expression = None
def name(self):
"""Section name"""
return self._name
[docs] def name(self, value):
self._name = value
return self
[docs] def subsections(self):
"""Return the section's subsections"""
return self._subsections
[docs] def add_section(self, section):
"""Adds section *section* as a subsection"""
section.parent = self
return self
[docs] def remove_section(self, section):
"""Removes the subsection *section*"""
[docs] def options(self):
"""Returns the section's options list"""
return self._options
[docs] def add_option(self, option):
"""Adds option *option* to the section"""
option.section = self
return self
[docs] def remove_option(self, option):
"""Removes option *option* from the section"""
[docs] def get_option(self, name):
"""Gets section option with name *name* from the section.
If it is not found, None is return"""
return next((opt for opt in self._options if opt.name == name), None)
def documentation(self):
"""The section's documentation"""
return self._documentation
[docs] def documentation(self, value):
self._documentation = value
return self
def parent(self):
"""The section's parent. It can be the configuration schema if it is a top-level section, of the parent section if it is a subsection"""
return self._parent
[docs] def parent(self, value):
self._parent = value
return self
def dependency_expression(self):
"""The section's dependency expression"""
return self._dependency_expression
[docs] def dependency_expression(self, value):
self._dependency_expression = value
def remove(self):
def path(self):
return self.parent.path() + (self.name,)
def schema(self):
if isinstance(self.parent, ConfigurationSchema):
return self.parent
return self.parent.schema()
def move_backwards(self):
def move_forward(self):
def move_option_backwards(self, option):
index = self._options.index(option)
if index > 0:
prev_option = self._options[index - 1]
self._options[index - 1] = option
self._options[index] = prev_option
def move_option_forward(self, option):
index = self._options.index(option)
if index < len(self._options) - 1:
next_option = self._options[index - 1]
self._options[index + 1] = option
self._options[index] = next_option
def option_in_path(self, path):
if len(path) > 1:
# There are subsections in path
subsection = next((s for s in self._subsections if s.name == path[0]), None)
if not subsection:
raise Exception('Subsection not found ' + path[0])
return subsection.option_in_path(path[1:])
# Find the option
option = next((o for o in self._options if o.name == path[0]), None)
if not option:
raise Exception('Option not found ' + path[0])
return option
def validate(self, config):
# Validate the section using config
errors = {}
for option in self.options():
option_value, origin = config.option_value(option)
if config.option_is_enabled(option) and option.is_required and option.default_value is None and option_value is None:
errors[option.name] = {'option':option, 'message': option.path_string() + ' is required'}
if option_value is not None:
error = option.validate(option_value)
if error is not None:
errors[option.name] = {'option':option, 'message' : option.path_string() + ": " + error}
if len(errors.values()) > 0:
return errors
return None
class ConfigurationSchemaOption:
def __init__(self, name, option_type, **args):
self._name = name
self._option_type = option_type
self._documentation = args.get('documentation') or ''
self._section = None
self._is_required = args.get('required') or True
self._default_value = args.get('default_value') or None
self._dependency_expression = None
def name(self):
return self._name
def name(self, value):
self._name = value
return self
def option_type(self):
return self._option_type
def option_type(self, value):
self._option_type = value
return self
def documentation(self):
return self._documentation
def documentation(self, value):
self._documentation = value
return self
def section(self):
return self._section
def section(self, value):
self._section = value
return self
def remove(self):
def is_required(self):
return self._is_required
def is_required(self, value):
self._is_required = value
return self
def default_value(self):
return self._default_value
def default_value(self, value):
self._default_value = value
return self
def dependency_expression(self):
return self._dependency_expression
def dependency_expression(self, value):
self._dependency_expression = value
def path(self):
return self.section.path() + (self.name,)
def path_string(self):
return '.'.join(self.path())
def schema(self):
return self.section.schema()
def move_forward(self):
def move_backwards(self):
def parse_value(self, value):
return self._option_type.parse_value(value)
def unparse_value(self, value):
return self.option_type.unparse_value(value)
def display_value(self, value):
return self.option_type.display_value(value)
def validate(self, value):
return self.option_type.validate(value)
class OptionType(object):
_name = None
def option_name(cls):
return cls._name
def get_named(cls, name):
option_type = next((option_type for option_type in all_subclasses(cls) if option_type.option_name() == name), None)
if not option_type:
raise Exception('No option type named ' + name)
return option_type
def option_types(cls):
return all_subclasses(OptionType)
def name(self):
return self.__class__.option_name()
def parse_value(self, value):
return value
def unparse_value(self, value):
return str(value)
def display_value(self, value):
return str(value)
def validate(self, value):
return None
def __str__(self):
return self.name
class StringOptionType(OptionType):
_name = "String"
def accept(self, visitor):
return visitor.visit_StringOptionType(self)
class NumberOptionType(OptionType):
_name = "Number"
def accept(self, visitor):
return visitor.visit_NumberOptionType(self)
def parse_value(self, value):
return int(value)
def validate(self, value):
if not isinstance(value, int):
return "Should be a number"
class BooleanOptionType(OptionType):
_name = "Boolean"
def accept(self, visitor):
return visitor.visit_BooleanOptionType(self)
def parse_value(self, value):
return value == 'True'
def validate(self, value):
if not isinstance(value, bool):
return "Should be a boolean"
class EmailOptionType(OptionType):
_name = "Email"
def accept(self, visitor):
return visitor.visit_EmailOptionType(self)
def validate(self, value):
x, y = email.utils.parseaddr(value)
if x == '' and y == '':
return str(value) + " is an invalid email"
class URIOptionType(OptionType):
_name = "URI"
def accept(self, visitor):
return visitor.visit_URIOptionType(self)
def validate(self, value):
uri = urlparse.urlparse(value)
#print "URI validation result: " + str(uri)
class FilenameOptionType(OptionType):
_name = "Filename"
def accept(self, visitor):
return visitor.visit_FilenameOptionType(self)
class DirectoryOptionType(OptionType):
_name = "Directory"
def accept(self, visitor):
return visitor.visit_DirectoryOptionType(self)
class ColorOptionType(OptionType):
_name = "Color"
def accept(self, visitor):
return visitor.visit_ColorOptionType(self)
class TimezoneOptionType(OptionType):
_name = "Timezone"
def accept(self, visitor):
return visitor.visit_TimezoneOptionType(self)
class CountryOptionType(OptionType):
_name = "Country"
def accept(self, visitor):
return visitor.visit_CountryOptionType(self)
class LanguageOptionType(OptionType):
_name = "Language"
def accept(self, visitor):
return visitor.visit_LanguageOptionType(self)
class CurrencyOptionType(OptionType):
_name = "Currency"
def accept(self, visitor):
return visitor.visit_CurrencyOptionType(self)
class DateOptionType(OptionType):
_name = "Date"
def accept(self, visitor):
return visitor.visit_DateOptionType(self)
def parse_value(self, value):
return datetime.datetime.strptime(value, "%d/%m/%Y")
def unparse_value(self, value):
return value.strftime("%d/%m/%Y")
def display_value(self, value):
return value.strftime("%d/%m/%Y")
class TimeOptionType(OptionType):
_name = "Time"
def accept(self, visitor):
return visitor.visit_TimeOptionType(self)
def parse_value(self, value):
return datetime.datetime.strptime(value, "%H:%M:%S")
def unparse_value(self, value):
return value.strftime("%H:%M:%S")
def display_value(self, value):
return value.strftime("%H:%M:%S")
class DatetimeOptionType(OptionType):
_name = "Datetime"
def accept(self, visitor):
return visitor.visit_DatetimeOptionType(self)
def parse_value(self, value):
tuple = eval(value)
return (datetime.datetime.strptime(tuple[0], "%d/%m/%Y"),
datetime.datetime.strptime(tuple[1], "%H:%M:%S"))
def unparse_value(self, value):
datestring = value[0].strftime("%d/%m/%Y")
timestring = value[1].strftime("%H:%M:%S")
return "('" + datestring + "','" + timestring + "')"
def display_value(self, value):
datestring = value[0].strftime("%d/%m/%Y")
timestring = value[1].strftime("%H:%M:%S")
return datestring + " " + timestring
class ChoiceOptionType(OptionType):
_name = "Choice"
def __init__(self, options=None):
if options is None:
self._options = options
def options(self):
return self._options
def add_option(self, option):
logging.debug("Adding option " + option + " to " + str(self))
def accept(self, visitor):
return visitor.visit_ChoiceOptionType(self)
def validate(self, value):
if not value in self.options():
return '\'' + str(value) + '\' is not a valid choice (' + ', '.join(self.options()) + ')'
class ListOptionType(OptionType):
_name = "List"
def __init__(self, options=None):
if options is None:
self._options = []
self._options = options
def options(self):
return self._options
def add_option(self, option):
def accept(self, visitor):
return visitor.visit_ListOptionType(self)
def parse_value(self, value):
return eval(value)
def display_value(self, value):
return ', '.join(value)
def validate(self, value):
vals = list(set(value) - set(self.options()))
if len(vals) > 0:
return str(vals) + ' are not a valid list members (' + ', '.join(self.options()) + ')'
class MaybeOptionType(OptionType):
_name = "Maybe"
def __init__(self, option_type=None):
self._option_type = option_type
def option_type(self):
return self._option_type
def option_type(self, value):
self._option_type = value
return self
def accept(self, visitor):
return visitor.visit_MaybeOptionType(self)
class OneOfOptionType(OptionType):
_name = "One of"
def __init__(self):
def option_types(self):
return self._option_types
def option_types(self, value):
self._option_types = value
return self
def accept(self, visitor):
return visitor.visit_OneOfOptionType(self)
class ManyOptionType(OptionType):
_name = "Many"
def __init__(self, option_type=None):
self._option_type = option_type
def option_type(self):
return self._option_type
def option_type(self, value):
self._option_type = value
return self
def accept(self, visitor):
return visitor.visit_ManyOptionType(self)
def parse_value(self, str):
return eval(str)
def display_value(self, value):
return ', '.join(value)
def __str__(self):
return self.name + '(' + str(self.option_type) + ')'
class CustomOptionType(OptionType):
_custom_option_types = {}
def get_named(cls, name):
option_type = cls._custom_option_types.get(name)
if option_type is None:
raise Exception(name + ' custom option type not found')
return option_type
def register_custom_option_type(cls, option_type):
logging.info("Registering custom option type " + option_type.name)
cls._custom_option_types[option_type.name] = option_type
def unregister_custom_option_type(cls, option_type):
del cls._custom_option_types[option_type.name]
def custom_option_types(cls):
return cls._custom_option_types.values()
def __init__(self, name, attributes=None):
self._name = name
if attributes is not None:
self._attributes = attributes
self._attributes = []
def attributes(self):
return self._attributes
def attributes(self, value):
self._attributes = value
return self
def add_attribute(self, name, type):
self._attributes.append((name, type))
def name(self):
return self._name
def name(self, value):
self._name = value
return self
def accept(self, visitor):
return visitor.visit_CustomOptionType(self)
class Configuration(object):
_configurations = {}
def get_named(cls, name):
config = cls._configurations.get(name)
if config is None:
raise Exception(name + ' configuration not found')
return config
def register_config(cls, config):
logging.info("Registering config " + config.name)
cls._configurations[config.name] = config
def unregister_config(cls, config):
del cls._configurations[config.name]
def configurations(cls):
return cls._configurations.values()
def __init__(self, name='', schema=None, **options):
self._name = name
if schema is not None:
self._schema = schema.name
self._schema = None
self._parent = options.get('parent') or None
self._options = {}
self._documentation = options.get('documentation') or ''
def name(self):
return self._name
def name(self, value):
self._name = value
return self
def schema(self):
if self._schema is not None:
return ConfigurationSchema.get_named(self._schema)
def schema(self, value):
self._schema = value.name
return self
def parent(self):
if self._parent is None:
return None
return Configuration.get_named(self._parent)
def parent(self, value):
if value is None:
self._parent = None
elif isinstance(value, str):
self._parent = value
self._parent = value.name
return self
def documentation(self):
return self._documentation
def documentation(self, value):
self._documentation = value
return self
def set_option_value(self, schema_option, value):
logging.info(self.name + ' configuration: Setting ' + schema_option.path_string() + ' to ' + str(value))
option = ConfigurationOption(schema_option, value=value)
self._options[schema_option] = option
def unset_option(self, schema_option):
if self._options.get(schema_option):
del self._options[schema_option]
def option_value(self, schema_option):
option = self._options.get(schema_option, None)
if option:
# Return a tuple, the option value and its origin (the configuration that has its value)
return option.value, self
if self.parent:
return self.parent.option_value(schema_option)
return None, None
def option_is_enabled(self, schema_option):
if schema_option.dependency_expression is None:
return True
logging.debug("Option enabled " + schema_option.path_string() + ": " + str(schema_option.dependency_expression.evaluate(self)))
return schema_option.dependency_expression.evaluate(self)
def sections(self):
return self.schema.sections()
def options(self):
return self._options.values()
def validate(self):
errors = []
for section in self.sections():
section_errors = section.validate(self)
if section_errors:
if len(errors) > 0:
logging.info('Configuration ' + self.name + ' is not valid: ' + str(errors))
return errors
logging.info('Configuration ' + self.name + ' is valid')
return None
def __str__(self):
return self.name
class ConfigurationOption():
def __init__(self, schema, **options):
self._schema = schema
self._value = None
if options.get('value') is not None:
self._value = options.get('value')
def schema(self):
return self._schema
def schema(self, value):
self._schema = value
return self
def name(self):
return self._schema.name
def value(self):
return self._value
def value(self, value):
self._value = value
return self
def path(self):
return self.schema.path()
def path_string(self):
return self.schema.path_string()
def __hash__(self):
return hash(self.path())
def __eq__(self, other):
return self.path() == other.path()
def __str__(self):
return self.name
def unparse_value(self, value):
return self.schema.unparse_value(value)
def display_value(self, value):
return self.schema.display_value(value)
def validate(self):
return self.schema.validate(self.value)
class Serializer:
class XMLSerializer(Serializer):
class ConfigurationsXMLSerializer(XMLSerializer):
def __init__(self):
self._root = et.Element('configurations')
def serialize(self, config):
config_element = et.SubElement(self._root, 'configuration')
config_element.attrib['name'] = config.name
schema = et.SubElement(config_element, 'schema')
schema.attrib['name'] = config.schema.name
if config.parent:
parent = et.SubElement(config_element, 'parent')
parent.attrib['name'] = config.parent.name
for option in config.options():
option_elem = et.SubElement(config_element, 'option')
option_elem.attrib['path'] = option.path_string()
option_elem.attrib['value'] = self.serialize_option_value(option)
def serialize_option_value(self, option):
return option.unparse_value(option.value)
def write(self, recipient):
tree = et.ElementTree(self._root)
tree.write(recipient, pretty_print=True)
class ConfigurationsXMLUnserializer():
def __init__(self):
self._configs = []
def read(self, source):
self._tree = et.parse(source)
return self._configs
def unserialize(self):
for config in self._tree.getroot():
name = config.attrib['name']
doc = config.findtext('documentation')
schema_name = config.find('schema').attrib['name']
schema = ConfigurationSchema.get_named(schema_name)
cfg = Configuration(name, schema, documentation=doc)
parent = config.find('parent')
if parent is not None:
cfg.parent = parent.attrib['name']
for option in config.iterchildren(tag='option'):
path = option.attrib['path'].split('.')
opt = schema.option_in_path(path)
value = opt.option_type.parse_value(option.attrib['value'])
logging.debug('Unserialize: Setting ' + str(opt) + ' value: ' + str(value))
cfg.set_option_value(opt, value)
return self._configs
class ConfigurationSchemasXMLSerializer(XMLSerializer):
def __init__(self):
self._root = et.Element("schemas")
def serialize(self, schema):
schema_element = et.SubElement(self._root, "schema")
schema_element.attrib['name'] = schema.name
doc = et.SubElement(schema_element, 'documentation')
doc.text = schema.documentation
for parent in schema.parents():
parent_element = et.SubElement(schema_element, 'parent')
parent_element.attrib['name'] = parent.name
for section in schema.direct_sections():
self.serialize_section(section, schema_element)
def serialize_section(self, section, sections):
section_element = et.SubElement(sections, 'section')
section_element.attrib['name'] = section.name
doc = et.SubElement(section_element, 'documentation')
doc.text = section.documentation
for option in section.options():
self.serialize_option(option, section_element)
for subsection in section.subsections():
self.serialize_section(subsection, section_element)
def serialize_option(self, option, options):
option_elem = et.SubElement(options, 'option')
option_elem.attrib['name'] = option.name
doc = et.SubElement(option_elem, 'documentation')
doc.text = option.documentation
option_type = et.SubElement(option_elem, 'type')
self.serialize_option_type(option.option_type, option_type)
required = et.SubElement(option_elem, 'required')
required.text = 'True' if option.is_required else 'False'
if option.default_value is not None:
default_value = et.SubElement(option_elem, 'default')
default_value.text = option.option_type.unparse_value(option.default_value)
if option.dependency_expression is not None:
dependency_expression = et.SubElement(option_elem, 'dependency')
dependency_expression.text = str(option.dependency_expression)
def serialize_option_type(self, option_type, element):
element.attrib['name'] = option_type.name
self._option_elem = element
visitor_method = getattr(self, "serialize" + option_type.name + "OptionType", None)
if callable(visitor_method):
def serializeChoiceOptionType(self, option_type):
for option in option_type.options():
opt = et.SubElement(self._option_elem, 'option')
opt.attrib['value'] = option
def serializeListOptionType(self, option_type):
for option in option_type.options():
opt = et.SubElement(self._option_elem, 'option')
opt.attrib['value'] = option
def serializeManyOptionType(self, option_type):
many_type = option_type.option_type
many_type_elem = et.SubElement(self._option_elem, 'type')
many_type_elem.attrib['name'] = many_type.name
self.serialize_option_type(option_type.option_type, many_type_elem)
def write(self, recipient):
tree = et.ElementTree(self._root)
tree.write(recipient, pretty_print=True)
class ConfigurationSchemasXMLUnserializer():
def __init__(self, **cfg):
self._schemas = []
self._tree = None
def read(self, source):
self._tree = et.parse(source)
return self._schemas
def unserialize(self):
for schema in self._tree.getroot():
name = schema.attrib['name']
doc = schema.findtext('documentation')
sch = ConfigurationSchema(name, documentation=doc)
for parent in schema.iterchildren(tag='parent'):
for section in schema.iterchildren(tag='section'):
return self._schemas
def unserialize_section(self, section_elem):
name = section_elem.attrib['name']
doc = section_elem.findtext('documentation')
logging.debug("Unserializing section " + name)
section = ConfigurationSchemaSection(name, documentation=doc)
for option in section_elem.iterchildren(tag='option'):
for subsection in section_elem.iterchildren(tag='section'):
return section
def unserialize_option(self, option_elem):
name = option_elem.attrib['name']
doc = option_elem.findtext('documentation')
is_required = option_elem.findtext('required')
default_value = option_elem.findtext('default')
option_type_elem = option_elem.find('type')
option_type = self.unserialize_option_type(option_type_elem)
dependency_expression = option_elem.findtext('dependency')
option = ConfigurationSchemaOption(name, option_type, documentation=doc)
option.is_required = is_required is not None and eval(is_required)
if default_value is not None:
option.default_value = option_type.parse_value(default_value)
if dependency_expression is not None:
option.dependency_expression = DependencyExpressionParser.parse_expression(dependency_expression)
return option
def unserialize_option_type(self, option_type_elem):
self._option_type_elem = option_type_elem
name = option_type_elem.attrib['name']
option_type_class = OptionType.get_named(name)
option_type = option_type_class()
visitor_method = getattr(self, "unserialize" + option_type.name + "OptionType", None)
if callable(visitor_method):
return visitor_method(option_type)
return option_type
def unserializeChoiceOptionType(self, option_type):
for option in self._option_type_elem.iterchildren(tag='option'):
return option_type
def unserializeListOptionType(self, option_type):
for option in self._option_type_elem.iterchildren(tag='option'):
return option_type
def unserializeManyOptionType(self, option_type):
many_type_elem = self._option_type_elem.find('type')
many_type = self.unserialize_option_type(many_type_elem)
option_type.option_type = many_type
return option_type
class YAMLSerializer(Serializer):
# Dependency expressions
class DependencyExpression(object):
def __init__(self, **args):
def evaluate(self, config):
class BooleanConnector(DependencyExpression):
def __init__(self, arg1, arg2):
self._arg1 = arg1
self._arg2 = arg2
def arg1(self):
return self._arg1
def arg2(self):
return self._arg2
class DEAnd(BooleanConnector):
def __str__(self):
return str(self.arg1) + ' AND ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) and self.arg2.evaluate(config)
class DEOr(BooleanConnector):
def __str__(self):
return str(self.arg1) + ' OR ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) or self.arg2.evaluate(config)
class DEXor(BooleanConnector):
def __str__(self):
return str(self.arg1) + ' XOR ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) != self.arg2.evaluate(config)
class BooleanOperation(DependencyExpression):
def __init__(self, arg1, arg2):
self._arg1 = arg1
self._arg2 = arg2
def arg1(self):
return self._arg1
def arg2(self):
return self._arg2
class DEEqual(BooleanOperation):
def __str__(self):
return str(self.arg1) + ' = ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) == self.arg2.evaluate(config)
class DEGreaterThan(BooleanOperation):
def __str__(self):
return str(self.arg1) + ' > ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) > self.arg2.evaluate(config)
class DELowerThan(BooleanOperation):
def __str__(self):
return str(self.arg1) + ' < ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) < self.arg2.evaluate(config)
class DEDifferentFrom(BooleanOperation):
def __str__(self):
return str(self.arg1) + ' <> ' + str(self.arg2)
def evaluate(self, config):
return self.arg1.evaluate(config) <> self.arg2.evaluate(config)
class DEOptionPath(DependencyExpression):
def __init__(self, path):
self._path = path
def path(self):
return self._path
def __str__(self):
return '.'.join(self.path)
def evaluate(self, config):
option = config.schema.option_in_path(self.path)
value, origin = config.option_value(option)
return value
class DEString(DependencyExpression):
def __init__(self, value):
self._value = value
def __str__(self):
return str(self._value)
def evaluate(self, config):
return self._value
class DEBoolean(DependencyExpression):
def __init__(self, value):
self._value = value
def __str__(self):
return str(self._value)
def evaluate(self, config):
return self._value
class DENumber(DependencyExpression):
def __init__(self, value):
self._value = value
def __str__(self):
return str(self._value)
def evaluate(self, config):
return self._value
class DependencyExpressionParser():
def parse_expression(cls, expression):
parser = dependencies.dependenciesParser(parseinfo=False)
ast = parser.parse(expression, whitespace='',
return ast
def bool_literal(self, ast):
if ast == 'True':
return DEBoolean(True)
elif ast == 'False':
return DEBoolean(False)
raise Exception('Error parsing literal boolean ' + str(ast))
def option_path(self, ast):
if isinstance(ast, list):
path = [ast[0]]
return DEOptionPath(path)
return DEOptionPath([ast])
def identifier(self, ast):
name = ast[0]
if len(ast) > 1:
for x in ast[1]:
name = name + x
return name.strip()
def literal_string(self, ast):
string = ''
for x in ast:
string = string + x
return DEString(string)
def boolexp(self, ast):
#print "Boolexp: " + str(ast)
if isinstance(ast, list):
term = ast[0]
connector = ast[1]
exp = ast[2]
if connector == 'AND':
return DEAnd(term, exp)
elif connector == 'OR':
return DEOr(term, exp)
elif connector == 'XOR':
return DEXor(term, exp)
raise Exception('Error parsing boolean expression ' + str(ast))
return ast
def boolterm(self, ast):
#print "Boolterm: " + str(ast)
if isinstance(ast, list):
option_path = ast[0]
operation = ast[1]
value = ast[2]
if operation == '=':
return DEEqual(option_path, value)
elif operation == '>':
return DEGreaterThan(option_path, value)
elif operation == '<':
return DELowerThan(option_path, value)
elif operation == '<>':
return DEDifferentFrom(option_path, value)
return ast
def number(self, ast):
str = ''
for x in ast:
str = string + x
return DENumber(int(str))