from datetime import timedelta
from ebu_tt_live.bindings import get_xml_parsing_context
from ebu_tt_live.errors import LogicError, SemanticValidationError, OutsideSegmentError
from ebu_tt_live.strings import ERR_SEMANTIC_VALIDATION_TIMING_TYPE
[docs]class TimingValidationMixin(object):
"""
This mixin is meant to be applied to timed elements (body, div, p, span) and provides parser hooks for timing
attributes as well as a generic semantic validation for timing attributes in the document's timeBase.
"""
_computed_begin_time = None
_computed_end_time = None
@property
def computed_begin_time(self):
return self._computed_begin_time
@property
def computed_end_time(self):
return self._computed_end_time
[docs] def _pre_timing_set_attribute(self, attr_en, attr_use):
# Pass in the timing_attribute_name to the context to help the timing type constructor refuse creation
context = get_xml_parsing_context()
if context is not None:
# This means we are in XML parsing mode
context['timing_attribute_name'] = attr_en.localName()
[docs] def _post_timing_set_attribute(self, attr_use):
context = get_xml_parsing_context()
if context is not None:
# Clean up after successful creation
context.pop('timing_attribute_name', None)
[docs] def _pre_init_variables(self, dataset, element_content):
self._begin_timedelta = self.begin and self.begin.timedelta or None
self._end_timedelta = self.end and self.end.timedelta or None
# We make sure end time is always none at the beginning because it can cause a LogicError with a stale value
self._computed_begin_time = None
self._computed_end_time = None
self._semantic_dataset = dataset
[docs] def _element_badly_timed(self, value, element):
return (element.begin is not None and \
element.end is not None and \
element.end <= element.begin)
[docs] def _post_cleanup_variables(self):
del self._semantic_dataset
del self._begin_timedelta
del self._end_timedelta
[docs] def _pre_assign_end(self, proposed_end):
self._semantic_dataset['timing_end_stack'].append(proposed_end)
self._computed_end_time = proposed_end
[docs] def _pre_calculate_end(self):
if self._end_timedelta is not None:
if self._semantic_dataset['timing_end_stack']:
# If there was already an end time in some parent element.
proposed_end = min(self._semantic_dataset['timing_syncbase'] + self._end_timedelta, self._semantic_dataset['timing_end_stack'][-1])
# New end
else:
proposed_end = self._semantic_dataset['timing_syncbase'] + self._end_timedelta
# If we have it assign it
self._pre_assign_end(proposed_end)
[docs] def _pre_assign_begin(self, proposed_begin):
if proposed_begin is not None:
# Store the element's activation begin times
# Let's push it onto the stack.
self._semantic_dataset['timing_begin_stack'].append(proposed_begin)
self._semantic_dataset['timing_syncbase'] += proposed_begin
# If we have a non-zero availability time we need to factor it in BUT the syncbase stays
if self._semantic_dataset['availability_time']:
self._computed_begin_time = max(self._semantic_dataset['timing_syncbase'],
self._semantic_dataset['availability_time'])
else:
self._computed_begin_time = self._semantic_dataset['timing_syncbase']
[docs] def _pre_calculate_begin(self):
self._pre_assign_begin(self._begin_timedelta)
[docs] def _post_calculate_begin(self, children):
"""
The computed begin time shall be moved down to match that of the earliest child begin time in case the container
does not specify a begin time itself. NOTE: This does not modify the syncbase.
:param children:
:return:
"""
if not children:
return
children_computed_begin_times = [item.computed_begin_time for item in children]
earliest_child_computed_begin = min(children_computed_begin_times)
if earliest_child_computed_begin > self._computed_begin_time:
# Adjustment scenario
self._computed_begin_time = earliest_child_computed_begin
[docs] def _semantic_preprocess_timing(self, dataset, element_content):
"""
As the validator traverses in a Depth First Search this is the hook function to call on the way DOWN.
Steps to take:
- Initialize temporary variables
- Calculate end timing if element defines an end time
- Calculate begin time and syncbase for children
:param dataset: Semantic dataset from semantic validation framework
:param element_content: PyXB's binding placeholder for this binding instance
"""
self._pre_init_variables(dataset, element_content)
self._pre_calculate_end()
# These assignments must happen last otherwise the syncbase will be wrong
# in calculations happening after syncbase adjustment.
self._pre_calculate_begin()
[docs] def _post_pop_begin(self):
begin_timedelta = None
if self._begin_timedelta is not None:
# We pushed on the stack it is time to pop it. It could probably be removed
# and replaced with self._begin_timedelta
begin_timedelta = self._semantic_dataset['timing_begin_stack'].pop()
self._semantic_dataset['timing_syncbase'] -= begin_timedelta
return begin_timedelta
[docs] def _post_pop_end(self):
end_timedelta = None
if self._end_timedelta is not None:
# We pushed on the stack it is time to pop it
end_timedelta = self._semantic_dataset['timing_end_stack'].pop()
return end_timedelta
[docs] def _semantic_postprocess_timing(self, dataset, element_content):
"""
As the validator traverses in a Depth First Search this is the hook function to call on the way UP
Steps to take:
- Fill in end times if element doesn't define end time
- Try using computed_end_time information from its children
- If no children are found look at parents end time constraints.
- Finalize computed_begin_time if begin is not specified using computed_begin_time of children.
:param dataset: Semantic dataset from semantic validation framework
:param element_content: PyXB's binding placeholder for this binding instance
"""
begin_timedelta = self._post_pop_begin()
# This end timedelta is an absolute calculated value on the timeline. Not relative.
end_timedelta = self._post_pop_end()
# If the forward running part of the traversal could not assign an end time we can use the backwards route
# which is in a way similar to dynamic programming because we take the children computed times and take the
# maximum value from them. SPECIAL case: a single leaf element with an undefined end time renders the entire
# branch undefined
if end_timedelta is not None and self.computed_end_time is None \
or end_timedelta is None and self.computed_end_time is not None:
# This is just a simple sanity check. It should never be triggered.
# Should the calculation be changed this filters out an obvious source of error.
raise LogicError()
children = None
if self.computed_end_time is None:
# This requires calculation based on the timings in its children.
# All timing containers are complexTypes so we can call orderedContent safely
# but we don't want to bother with explicitly badly timed elements so filter
# them out.
children = filter(lambda item: isinstance(item, TimingValidationMixin) \
and not item._element_badly_timed(value=None, element=item), \
[x.value for x in self.orderedContent()])
# Order of statements is important
if not children:
# This means we are in a timing container leaf.
if not self._semantic_dataset['timing_end_stack']:
# Here we go an endless document. Pointless but for clarity's sake assign it explicitly to None.
self._computed_end_time = None
else:
# Try to assign it the last specified ancestor
self._computed_end_time = self._semantic_dataset['timing_end_stack'][-1]
else:
children_computed_end_times = [item.computed_end_time for item in children]
if None in children_computed_end_times:
# The endless document case propagates up
self._computed_end_time = None
else:
# Propagate the longest end time among the children
self._computed_end_time = max(children_computed_end_times)
# When we are the body element we need to check that our explicit timings
# are valid, i.e. deal with end before befin by discarding this element
# from computed time calculation as per spec requirement. Since we exclude all
# elements where this is the case using the _element_badly_timed
# function as a filter, this only applies to the body element (on which
# the filter function doesn't get called).
if isinstance(self, BodyTimingValidationMixin) \
and self._element_badly_timed(value=None, element=self):
self._computed_end_time = None
self._computed_begin_time = timedelta(0)
if begin_timedelta is None:
if children is None:
children = filter(lambda item: isinstance(item, TimingValidationMixin) \
and not item._element_badly_timed(value=None, element=item),
[x.value for x in self.orderedContent()])
self._post_calculate_begin(children=children)
self._post_cleanup_variables()
# The mixin approach is used since there are multiple timed element types.
# The inspected values are all attributes of the element so they do not
# take part in the traversal directly we process them in the timed element's context instead: body, div, p, span
[docs] def _semantic_timebase_validation(self, dataset, element_content):
time_base = dataset['tt_element'].timeBase
if self.begin is not None:
if hasattr(self.begin, 'compatible_timebases'):
# Check typing of begin attribute against the timebase
timebases = self.begin.compatible_timebases()
if time_base not in timebases['begin']:
raise SemanticValidationError(
ERR_SEMANTIC_VALIDATION_TIMING_TYPE.format(
attr_type=type(self.begin),
attr_value=self.begin,
attr_name='begin',
time_base=time_base
)
)
if self.end is not None:
if hasattr(self.end, 'compatible_timebases'):
# Check typing of end attribute against the timebase
timebases = self.end.compatible_timebases()
if time_base not in timebases['end']:
raise SemanticValidationError(
ERR_SEMANTIC_VALIDATION_TIMING_TYPE.format(
attr_type=type(self.end),
attr_value=self.end,
attr_name='end',
time_base=time_base
)
)
[docs] def _semantic_manage_timeline(self, dataset, element_content):
# Get the document instance
doc = dataset['document']
# Register on timeline
doc.add_to_timeline(self)
# This section covers the copying operations of timed containers.
[docs] def is_in_segment(self, begin=None, end=None):
if begin is not None:
if self.computed_end_time is not None and self.computed_end_time <= begin:
return False
if end is not None:
if self.computed_begin_time >= end:
return False
return True
[docs] def _assert_in_segment(self, dataset, element_content=None):
if not self.is_in_segment(
begin=dataset['segment_begin'],
end=dataset['segment_end']
):
raise OutsideSegmentError()
[docs] def is_timed_leaf(self):
return False
[docs] def _semantic_copy_apply_leaf_timing(self, copied_instance, dataset, element_content=None):
if not copied_instance.is_timed_leaf():
copied_instance.begin = None
copied_instance.end = None
if hasattr(copied_instance, 'dur'):
copied_instance.dur = None
else:
tt_elem = dataset['tt_element']
trimmed_begin = self.computed_begin_time
trimmed_end = self.computed_end_time
segment_begin = dataset['segment_begin']
segment_end = dataset['segment_end']
if segment_begin is not None:
if segment_begin > trimmed_begin:
trimmed_begin = segment_begin
if segment_end is not None:
if trimmed_end is None or trimmed_end > segment_end:
trimmed_end = segment_end
# Create compatible timing types
copied_instance.begin = tt_elem.get_timing_type(trimmed_begin)
copied_instance.end = tt_elem.get_timing_type(trimmed_end)
[docs]class BodyTimingValidationMixin(TimingValidationMixin):
"""
The body element seems to be exception from too many rules and makes one common validator pretty difficult
to manage. This subclass is meant to call all the extensions/limitations for the body element that does not apply
to timed containers in general in the EBU-TT-Live spec.
"""
[docs] def _pre_init_variables(self, dataset, element_content):
super(BodyTimingValidationMixin, self)._pre_init_variables(dataset, element_content)
self._dur_timedelta = self.dur and self.dur.timedelta or None
[docs] def _post_cleanup_variables(self):
del self._dur_timedelta
super(BodyTimingValidationMixin, self)._post_cleanup_variables()
[docs] def _pre_calculate_end(self):
# This is all for the body element because of the dur attribute
if self._dur_timedelta is not None:
if self._begin_timedelta is not None and self._end_timedelta is not None:
# This is a special (stupid) edge case..:
proposed_end = min(self._dur_timedelta + self._begin_timedelta, self._end_timedelta)
elif self._begin_timedelta is not None and self._end_timedelta is None:
proposed_end = self._dur_timedelta + self._begin_timedelta
elif self._begin_timedelta is None and self._end_timedelta is None:
# In this case the document end at availability time + dur
proposed_end = self._semantic_dataset['availability_time'] + self._dur_timedelta
elif self._begin_timedelta is None and self._end_timedelta is not None:
proposed_end = min(self._semantic_dataset['availability_time'] + self._dur_timedelta, self._end_timedelta)
else:
# Fallback case if there is no duration specified the same as the other containers
super(BodyTimingValidationMixin, self)._pre_calculate_end()
# WARNING this assigns it so we are done
return
# If one of our special ifs worked let's assign the value here.
self._pre_assign_end(proposed_end)
[docs] def _pre_calculate_begin(self):
self._pre_assign_begin(self._begin_timedelta)
[docs] def _post_pop_end(self):
end_timedelta = None
if self._end_timedelta is not None or self._dur_timedelta is not None:
# We pushed on the stack it is time to pop it
end_timedelta = self._semantic_dataset['timing_end_stack'].pop()
return end_timedelta
[docs] def _semantic_timebase_validation(self, dataset, element_content):
super(BodyTimingValidationMixin, self)._semantic_timebase_validation(dataset, element_content)
time_base = dataset['tt_element'].timeBase
if self.dur is not None:
if hasattr(self.dur, 'compatible_timebases'):
# Check typing of dur attribute against the timebase
timebases = self.dur.compatible_timebases()
if time_base not in timebases['dur']:
raise SemanticValidationError(
ERR_SEMANTIC_VALIDATION_TIMING_TYPE.format(
attr_type=type(self.dur),
attr_value=self.dur,
attr_name='dur',
time_base=time_base
)
)