Source code for formbar.form
import logging
import sqlalchemy as sa
from formbar.renderer import FormRenderer, get_renderer
from formbar.rules import Rule, Expression
from formbar.converters import (
DeserializeException, from_python, to_python
)
log = logging.getLogger(__name__)
def remove_ws(data):
"""Helper function which removes trailing and leading whitespaces
for all values in the given dictionary. The dictionary usually
contains the submitted data."""
clean = {}
for key in data:
if isinstance(data[key], unicode):
# This may happen for lists e.g when sumitting multiple
# selection in checkboxes.
clean[key] = data[key].strip()
else:
clean[key] = data[key]
return clean
def get_attributes(cls):
return [prop.key for prop in sa.orm.class_mapper(cls).iterate_properties
if isinstance(prop, sa.orm.ColumnProperty)
or isinstance(prop, sa.orm.RelationshipProperty)]
def get_relations(cls):
return [prop.key for prop in sa.orm.class_mapper(cls).iterate_properties
if isinstance(prop, sa.orm.RelationshipProperty)]
class Error(Exception):
"""Base class for exceptions in this module."""
pass
class StateError(Error):
"""Exception raised for state errors while processing the form.
:msg: explanation of the error
"""
def __init__(self, msg):
self.msg = msg
class Validator(object):
"""Validator class for external validators. External validators can
be used to implement more complicated validations on the converted
data in the form. The validator has access to all submitted values
of the form. Validation happens on the converted pythonic values
from the submitted formdata. Additionally a context can be provided
to the validator to provide additional data needed for the
validation."""
def __init__(self, field, error, callback, context=None, triggers="error"):
"""Initialize a new Validator
:field: Name of the field which should be validated.
:error: Error message which should be show at the field when
validation fails.
:callback: Python callable which actually will do the check.
:context: Add additional data which can be provided to the callback.
:triggers: Set what kind of error message will be generated.
Everything else than "error" will trigger a warning
message. Default to error.
"""
self._field = field
self._error = error
self._callback = callback
self._context = context
self._triggers = triggers
def check(self, data):
"""Checker method which will call the callback of the validator
to actually do the validation on the provided data. Will return
True or False."""
try:
return self._callback(self._field, data)
except TypeError:
return self._callback(self._field, data, self._context)
[docs]class Form(object):
"""Class for forms. The form will take care for rendering the form,
validating the submitted data and saving the data back to the
item.
The form must be instanciated with an instance of an ``Form``
configuration and optional an SQLAlchemy mapped item.
If an SQLAlchemy mapped item is provided there are some basic
validation is done based on the defintion in the database. Further
the save method will save the values directly into the database.
If no item was provided than a dummy item will be created with the
attributes of the configured fields in the form.
"""
def __init__(self, config, item=None, dbsession=None, translate=None,
change_page_callback={}, renderers={}, request=None,
csrf_token=None, eval_url=None, url_prefix="", locale=None):
"""Initialize the form with ``Form`` configuration instance and
optional an SQLAlchemy mapped object.
:config: FormConfiguration.
:item: SQLAlchemy mapped instance
:dbsession: dbsession
:translate: Translation function which returns a translated
string for a given msgid
:set_page_callback: Url which will be called when the user
changes the currently selected page.
:renderers: A optional dictionary of custom renderers which are
provided to the form to render specific form elements. The key
is the type of the renderer as named in the formular
configuration.
:request: Current request (See
http://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html
when using in connection with ringo)
:csrf_token: Token to which will be included as hidden field in
the form to prevent CSRF attacks.
:eval_url: External URL for rule evaluation. If defined this URL is
called to evaluate client side rules with a AJAX request. The rule
to evaluate is provided in a GET request in the "rule" paramenter.
The return Value is a JSON response with success attribute set
depending on the result of the evaluation and the error message in
the data attribute in case the evaluation fails.
:url_prefix: Prefix which can be used for all URL in the form.
:locale: String of the locale of the form. Used for proper
display of the date and number functions.
"""
self._config = config
self._item = item
self._dbsession = dbsession
self._request = request
self._csrf_token = csrf_token
self._url_prefix = url_prefix
self._eval_url = eval_url
if self._url_prefix:
self._eval_url = self._url_prefix + self._eval_url
if locale:
self._locale = locale
else:
self._locale = "en"
if translate:
self._translate = translate
else:
self._translate = lambda msgid: msgid
self.validated = False
"""Flag to indicate if the form has been validated. Init value
is False. which means no validation has been done."""
self.external_validators = []
"""List with external validators. Will be called an form validation."""
self.current_page = 0
"""Number of the currently selected page"""
self.change_page_callback = change_page_callback
"""Dictionary with some parameters used to call an URL when the
user changes the currently selected page. The dictionary has the
following keys:
* url: Name of the URL which will be called
* item (optional): A string which is send to the URL as GET
paramenter. Often this is the name of the element (clazzname)
* itemid (optional): The id of the currently editied element.
The url will have the additional parameter "page" which holds
the currently selected page.
"""
self.external_renderers = renderers
"""Dictionary with external provided custom renderers."""
self.fields = self._build_fields()
"""Dictionary with fields."""
self.data = {}
"""After submission this Dictionary will contain the
validated data on successfull validation. Else this Dictionary
will be empty"""
self.submitted_data = {}
"""The submitted data from the user. If validation fails, then
this values are used to rerender the form."""
self.loaded_data = self.serialize(self._get_data_from_item())
"""This is the initial data loaded from the given item. Used to
render the readonly forms"""
self.merged_data = {}
"""This is merged date from the initial data loaded from the
given item. And userprovied data. The data is available after
rendering the form"""
self.warnings = []
"""Form wide warnings. This list contains warnings which affect
the entire form and not specific fields. These warnings are show
at the top of evere page."""
self.errors = []
"""Form wide errors. This list contains errors which affect
the entire form and not specific fields. These errors are show
at the top of evere page."""
def _set_current_field_data(self, data):
for key in self.fields:
value = data.get(key)
if value or isinstance(value, int):
field = self.fields[key]
field.set_value(value)
def _set_previous_field_data(self, data):
for key in self.fields:
value = data.get(key)
if value or isinstance(value, int):
field = self.fields[key]
field.set_previous_value(value)
def _get_data_from_item(self):
values = {}
if not self._item:
return values
for name, field in self._config.get_fields().iteritems():
try:
values[name] = getattr(self._item, name)
except AttributeError:
values[name] = None
return values
def _filter_values(self, values):
"""Will return a filtered dictionary of the given values
dictionary which only contains values which are actually part of
the current form.
:values: Dicionary with unfilterd values
:returns: Dicionary with filtered values
"""
filtered = {}
for fieldname, field in self.fields.iteritems():
if fieldname in values:
filtered[fieldname] = values[fieldname]
return filtered
def deserialize(self, data):
"""Returns a dictionary with pythonized data data. Usually this
is the submitted data coming from a form. The dictionary will
include all values provided in the initial data dictionary
converted into python datatype.
:data: Dictionary with serialized data
:returns: Dictionary with deserialized data
"""
deserialized = {}
# Load relations of the item. Those are needed to deserialize
# the relations.
relation_names = {}
try:
mapper = sa.orm.object_mapper(self._item)
relation_properties = filter(
lambda p: isinstance(p,
sa.orm.properties.RelationshipProperty),
mapper.iterate_properties)
for prop in relation_properties:
relation_names[prop.key] = prop
except sa.orm.exc.UnmappedInstanceError:
if not self._item:
pass # The form is not mapped to an item.
else:
raise
for fieldname, value in self._filter_values(data).iteritems():
field = self.fields.get(fieldname)
try:
serialized = data.get(field.name)
deserialized[fieldname] = to_python(field,
serialized,
relation_names)
except DeserializeException as ex:
self._add_error(field.name,
self._translate(ex.message) % ex.value)
log.debug("Deserialized values: %s" % deserialized)
return deserialized
def serialize(self, data):
"""Returns a dictionary with serialized data from the forms
item. The dictionary will include all attributes
and relations values of the items. The key in the dictionary is
the name of the relation/attribute. In case of relations the
value in the dictionary is the "id" value of the related item.
If no item is present then return a empty dict.
:returns: Dictionary with serialized values of the item.
"""
serialized = {}
for fieldname, value in self._filter_values(data).iteritems():
field = self.fields.get(fieldname)
serialized[fieldname] = from_python(field, value)
log.debug("Serialized values: %s" % serialized)
return serialized
def _build_fields(self):
"""Returns a dictionary with all Field instances which are
configured for this form.
:returns: Dictionary with Field instances
"""
fields = {}
for name, field in self._config.get_fields().iteritems():
fields[name] = Field(self, field, self._translate)
return fields
def has_errors(self):
"""Returns True if one of the fields in the form has errors"""
for field in self.fields.values():
if len(field.get_errors()) > 0:
return True
return len(self.errors) != 0
def has_warnings(self):
"""Returns True if one of the fields in the form has warnings"""
for field in self.fields.values():
if len(field.get_warnings()) > 0:
return True
return len(self.warnings) != 0
[docs] def get_errors(self, page=None):
"""Returns a dictionary of all errors in the form. If page
parameter is given, then only the errors for fields on the given
page are returned. This dictionary will contain the errors if
the validation fails. The key of the dictionary is the fieldname
of the field. As a field can have more than one error the value
is a list.
:page: Dictionary with errors
:returns: Dictionary with errors
"""
if page is not None:
fields_on_page = self._config.get_fields(page)
errors = {}
for field in self.fields.values():
if page is not None and field.name not in fields_on_page:
continue
if len(field.get_errors()) > 0:
errors[field.name] = field.get_errors()
if len(self.errors) != 0 and page is None:
errors[""] = self.errors
return errors
[docs] def get_warnings(self, page=None):
"""Returns a dictionary of all warnings in the form. If page
parameter is given, then only the warnings for fields on the given
page are returned. This dictionary will contain the warnings if
the validation fails. The key of the dictionary is the fieldname
of the field. As a field can have more than one warning the value
is a list.
:page: Name of the page
:returns: Dictionary with warnings
"""
if page is not None:
fields_on_page = self._config.get_fields(page)
warnings = {}
for field in self.fields.values():
if page is not None and field.name not in fields_on_page:
continue
if len(field.get_warnings()) > 0:
warnings[field.name] = field.get_warnings()
if len(self.warnings) != 0 and page is None:
warnings[""] = self.warnings
return warnings
def get_field(self, name):
return self.fields[name]
def add_validator(self, validator):
return self.external_validators.append(validator)
[docs] def render(self, values={}, page=0, buttons=True,
previous_values={}, outline=True):
"""Returns the rendererd form as an HTML string.
:values: Dictionary with values to be prefilled/overwritten in
the rendered form.
:previous_values: Dictionary of values of the last saved state
of the item. If provided a diff between the
current and previous values will be renderered
in readonly mode.
:outline: Boolean flag to indicate that the outline for pages
should be rendered. Defaults to true.
:returns: Rendered form.
"""
self.current_page = page
if self.submitted_data:
item_values = self.submitted_data
else:
item_values = self.loaded_data
# Merge the items_values with the extra provided values. Extra
# values will overwrite the item_values.
values = dict(item_values.items() + values.items())
self.merged_data = values
# Set current and previous values of the fields in the form.
self._set_current_field_data(values)
self._set_previous_field_data(previous_values)
# Add csrf_token to the values dictionary
values['csrf_token'] = self._csrf_token
renderer = FormRenderer(self, self._translate)
form = renderer.render(buttons=buttons, outline=outline)
return form
def _add_error(self, fieldname, error):
if fieldname is None:
self.errors.append(error)
else:
field = self.get_field(fieldname)
if isinstance(error, list):
for err in error:
field.add_error(err)
else:
field.add_error(error)
def _add_warning(self, fieldname, warning):
if fieldname is None:
self.warnings.append(warning)
else:
field = self.get_field(fieldname)
if isinstance(warning, list):
for war in warning:
field.add_warning(war)
else:
field.add_warning(warning)
[docs] def validate(self, submitted=None):
"""Returns True if the validation succeeds else False.
Validation of the data happens in three stages:
1. Prevalidation. Custom rules that are checked before any
datatype checks on type conversations are made.
2. Basic type checks and type conversation. Type checks and type
conversation is done based on the data type of the field and
further constraint defined in the database if the form is
instanciated with an SQLAlchemy mapped item.
3. Postvalidation. Custom rules that are checked after the type
conversation was done. Note: Postevaluation is only done for
successfull converted values.
4. External Validators. External defined checks done on teh
converted values. Note: Validators are only called for
successfull converted values
All errors are stored in the errors dictionary through the
process of validation. After the validation finished the values
are stored in the data dictionary. In case there has been errors
the dictionary will contain the origin submitted data.
:submitted: Dictionary with submitted values.
:returns: True or False
"""
if not submitted:
unvalidated = self.loaded_data
else:
try:
unvalidated = submitted.mixed()
except AttributeError:
unvalidated = submitted
unvalidated = remove_ws(unvalidated)
log.debug("Submitted data: %s" % unvalidated)
self.submitted_data = unvalidated
converted = self.deserialize(unvalidated)
# Validate the fields. Ignore fields which are disabled in
# conditionals First get list of fields which are still in the
# form after conditionals has be evaluated
fields_to_check = self._config.get_fields(values=converted,
reload_fields=True,
evaluate=True)
for fieldname, field in fields_to_check.iteritems():
field = self.fields[fieldname]
for rule in field.get_rules():
if rule.mode == "pre":
result = rule.evaluate(unvalidated)
elif fieldname not in converted:
# Ignore rule if the value can't be converted.
continue
else:
result = rule.evaluate(converted)
if not result:
if rule.triggers == "warning":
self._add_warning(fieldname, rule.msg)
else:
self._add_error(fieldname, rule.msg)
# Custom validation. User defined external validators.
for validator in self.external_validators:
if validator._field not in converted and validator._field is not None:
# Ignore validator if the value can't be converted.
continue
if not validator.check(converted):
if validator._triggers == "error":
self._add_error(validator._field, validator._error)
else:
self._add_warning(validator._field, validator._error)
# If the form is valid. Save the converted and validated data
# into the data dictionary.
has_errors = self.has_errors()
if not has_errors:
self.data = converted
self.validated = True
return not has_errors
[docs] def save(self):
"""Will save the validated data back into the item. In case of
an SQLAlchemy mapped item the data will be stored into the
database.
:returns: Item with validated data.
"""
if not self.validated:
raise StateError('Saving is not possible without prior validation')
if self.has_errors():
raise StateError('Saving is not possible if form has errors')
# Only save if there is actually an item.
if self._item is not None:
# TODO: Iterate over fields here. Fields should know their value
# and if they are a relation or not (torsten) <2013-07-24 23:24>
for key, value in self.data.iteritems():
setattr(self._item, key, value)
# If the item has no id, then we assume it is a new item. So
# add it to the database session.
if not self._item.id:
self._dbsession.add(self._item)
return self._item
class Field(object):
"""Wrapper for fields in the form. The purpose of this class is to
provide a common interface for the renderer independent to the
underlying implementation detail of the field."""
def __init__(self, form, config, translate):
"""Initialize the field with the given field configuration.
:config: Field configuration
"""
self._form = form
self._config = config
self.sa_property = self._get_sa_property()
self._translate = translate
self.renderer = get_renderer(self, translate)
self._errors = []
self._warnings = []
# Set default value
value = getattr(self._config, "value")
# If value begins with '%' then consider the following string as
# a brabbel expression and set the value of the default value to
# the result of the evaluation of the expression.
if value and value.startswith("%"):
form_values = self._form._get_data_from_item()
value = Expression(value.strip("%")).evaluate(values=form_values)
# If value begins with '$' then consider the string as attribute
# name of the item in the form and get the value
elif value and value.startswith("$"):
try:
# Special logic for ringo items.
if (self.renderer.render_type == "info"
and hasattr(self._form._item, "get_value")):
value = self._form._item.get_value(value.strip("$"),
expand=True)
else:
value = getattr(self._form._item, value.strip("$"))
except IndexError, e:
log.error("Error while accessing attribute '%s': %s"
% (value, e))
value = None
except AttributeError, e:
log.error("Error while accessing attribute '%s': %s"
% (value, e))
value = None
self.value = value
self.previous_value = None
"""Value as string of the field. Will be set on rendering the
form"""
def __getattr__(self, name):
"""Make attributes from the configuration directly available"""
return getattr(self._config, name)
def _get_sa_mapped_class(self):
# TODO: Raise Exception if this field is not a relation. (None)
# <2013-07-25 07:44>
return self.sa_property.mapper.class_
def _get_sa_property(self):
if not self._form._item:
return None
mapper = sa.orm.object_mapper(self._form._item)
for prop in mapper.iterate_properties:
if prop.key == self.name:
return prop
def get_type(self):
"""Returns the datatype of the field."""
if self._config.type:
return self._config.type
if self.sa_property:
try:
column = self.sa_property.columns[0]
dtype = str(column.type)
if dtype == "TEXT" or dtype.find("VARCHAR") > -1:
return "string"
elif dtype == "DATE":
return "date"
elif dtype == "INTEGER":
return "integer"
elif dtype == "BOOLEAN":
return "boolean"
else:
log.warning('Unhandled datatype: %s' % dtype)
except AttributeError:
return self.sa_property.direction.name.lower()
return "string"
def get_rules(self):
"""Returns a list of configured rules for the field."""
return self.rules
def set_value(self, value):
self.value = value
def set_previous_value(self, value):
self.previous_value = value
def get_value(self, default=None, expand=False):
return self._get_value(self.value, default, expand)
def get_previous_value(self, default=None, expand=False):
return self._get_value(self.previous_value, default, expand)
def _get_value(self, value, default, expand):
if expand:
if not isinstance(value, list):
value = [value]
ex_values = []
options = self.get_options()
for opt in options:
for v in value:
if unicode(v) == unicode(opt[1]):
ex_values.append("%s" % opt[0])
return ", ".join(ex_values)
else:
if value:
return from_python(self, value)
elif default:
return default
else:
return value
def _build_filter_rule(self, expr_str, item):
t = expr_str.split(" ")
for x in t:
# % marks the options in the selection field. It is used to
# iterate over the options in the selection. I case the
# options are SQLAlchemy based options the variable can be
# used to access a attribute of the item. E.g. %id will
# access the id of the current option item. For user defined
# options "%" can be used to iterate over the user defined
# options. In this case the value is the value of the
# option.
if x.startswith("%"):
key = x.strip("%")
if len(key) == 0:
# User defined option
value = item[1]
else:
value = getattr(item, key)
# @ marks the item of the current fields form item.
elif x.startswith("@"):
key = x.strip("@")
value = getattr(self._form._item, key)
# $ special attributes of the current form.
elif x.startswith("$"):
tmpitem = None
value = None
tokens = x.split(".")
if len(tokens) > 1:
key = tokens[0].strip("$")
attribute = ".".join(tokens[1:])
# FIXME: This is a bad assumption that there is a
# user within a request. (ti) <2014-07-09 11:18>
if key == "user":
tmpitem = self._form._request.user
else:
value = self._form.merged_data.get(tokens[0].strip("$"))
if tmpitem and not value:
value = getattr(tmpitem, attribute)
if hasattr(value, '__call__'):
value = value()
else:
value = None
if value is not None:
if isinstance(value, list):
value = "[%s]" % ",".join("'%s'"
% unicode(v) for v in value)
expr_str = expr_str.replace(x, value)
else:
expr_str = expr_str.replace(x, "'%s'" % unicode(value))
return Rule(expr_str)
def _load_options_from_db(self):
# Get mapped clazz for the field
try:
clazz = self._get_sa_mapped_class()
return self._form._dbsession.query(clazz)
except:
# Catch exception here. This exception can happen when
# rendering the form in the preview of the formeditor. In
# this case the item is None and will fail to get the mapped
# class.
log.error("Can not get a mappend class for '%s' "
"to load the option from db" % self.name)
return []
def filter_options(self, options):
"""Will return a of tuples with options. The given options can
be either a list of SQLAlchemy mapped items (In case the options
come directly from the database) or a list of tuples with option
name and values. (In case of userdefined options in the form)
:options: List of items or tuples
:returns: List of tuples.
"""
filtered_options = []
for option in options:
if isinstance(option, tuple):
# User defined options
o_value = option[1]
o_label = option[0]
else:
# Options loaded from the database
o_value = option.id
o_label = option
if self._config.renderer and self._config.renderer.filter:
rule = self._build_filter_rule(self._config.renderer.filter,
option)
if rule.evaluate({}):
filtered_options.append((o_label, o_value, True))
else:
filtered_options.append((o_label, o_value, False))
else:
filtered_options.append((o_label, o_value, True))
return filtered_options
def get_options(self):
"""Will return a list of tuples containing the options of the
field. The tuple contains in the following order:
1. the display value of the option,
2. its value and
3. a boolean flag if the options is a filtered one and
should not be visible in the selection.
Options can be filtered by defining the filter attribute of the
renderer. The expression will be applied on every option in the
selection. Keyword beginning with % are handled as variable. On
rule evaluation the keyword in the expression will be replaced
with the value of the item with the name of the variable.
Filtering is currently actually only done for selection based on
the SQLAlchemy model and which are loaded from the database.
"""
options = []
_ = self._form._translate
if self.get_type() == 'manytoone':
options.append((_("no selection"), "", True))
user_defined_options = self._config.options
if (isinstance(user_defined_options, list)
and len(user_defined_options) > 0):
for option in self.filter_options(user_defined_options):
options.append((option[0], option[1], option[2]))
elif isinstance(user_defined_options, str):
for option in self._form.merged_data.get(user_defined_options):
options.append((option[0], option[1], True))
elif self._form._dbsession:
options.extend(self.filter_options(self._load_options_from_db()))
else:
# TODO: Try to get the session from the item. Ther must be
# somewhere the already bound session. (torsten) <2013-07-23 00:27>
log.warning('No db connection configured for this form. Can '
'not load options')
return []
return options
def add_error(self, error):
self._errors.append(error)
def add_warning(self, warning):
self._warnings.append(warning)
def get_errors(self):
return self._errors
def get_warnings(self):
return self._warnings
def render(self):
"""Returns the rendererd HTML for the field"""
return self.renderer.render()
def is_relation(self):
return isinstance(self.sa_property,
sa.orm.RelationshipProperty)
def is_desired(self):
"""Returns true if field is set as desired in field configuration"""
return self.desired
def is_required(self):
"""Returns true if the required flag of the field configuration
is set"""
return self.required
def is_readonly(self):
"""Returns true if either the readonly flag of the field
configuration is set or the whole form is marked as readonly"""
return self.readonly or False