from .interface import INode, IConsumerNode, IProducerNode
from ebu_tt_live.carriage.interface import IConsumerCarriage, IProducerCarriage
from ebu_tt_live.errors import ComponentCompatError, DataCompatError, UnexpectedSequenceIdentifierError
from ebu_tt_live.strings import ERR_INCOMPATIBLE_COMPONENT, ERR_INCOMPATIBLE_DATA_EXPECTED, ERR_INCOMPATIBLE_DATA_PROVIDED
from ebu_tt_live.utils import RingBufferWithCallback
from ebu_tt_live.documents import SubtitleDocument
import logging
log = logging.getLogger(__name__)
class __AbstractNode(INode):
_node_id = None
def __init__(self, node_id, **kwargs):
self._node_id = node_id
def __repr__(self):
return '<{name}, ID:{id} at {address} >'.format(
name=self.__class__,
id=self._node_id,
address=hex(id(self))
)
@property
def node_id(self):
return self._node_id
@node_id.setter
def node_id(self, value):
self._node_id = value
[docs]class AbstractProducerNode(IProducerNode, __AbstractNode):
_producer_carriage = None
def __init__(self, node_id, producer_carriage=None, **kwargs):
super(AbstractProducerNode, self).__init__(node_id=node_id, **kwargs)
if producer_carriage is not None:
self.register_producer_carriage(producer_carriage)
[docs] def register_producer_carriage(self, producer_carriage):
if not isinstance(producer_carriage, IProducerCarriage):
raise ComponentCompatError(
ERR_INCOMPATIBLE_COMPONENT.format(
component=producer_carriage,
expected_interface=IProducerCarriage
)
)
if self.provides() != producer_carriage.expects():
raise DataCompatError(
ERR_INCOMPATIBLE_DATA_EXPECTED.format(
component=producer_carriage,
expects=producer_carriage.expects(),
provides=self.provides()
)
)
self._producer_carriage = producer_carriage
self._producer_carriage.register_producer_node(node=self)
@property
def producer_carriage(self):
return self._producer_carriage
[docs] def resume_producing(self):
self.process_document(document=None)
[docs]class AbstractConsumerNode(IConsumerNode, __AbstractNode):
_consumer_carriage = None
_seen_docs = None
_first_input_document_sequence = None
def __init__(self, node_id, consumer_carriage=None, **kwargs):
super(AbstractConsumerNode, self).__init__(node_id=node_id, **kwargs)
if consumer_carriage is not None:
self.register_consumer_carriage(consumer_carriage)
self._seen_docs = {}
[docs] def is_document(self, document):
if isinstance(document, SubtitleDocument):
return True
else:
return False
[docs] def limit_sequence_to_one(self, document):
if self._first_input_document_sequence is None:
self._first_input_document_sequence = document.sequence_identifier
if self._first_input_document_sequence != document.sequence_identifier:
log.error(
'Sequences limited to one: expecting: {}, received: {}'.format(self._first_input_document_sequence, document.sequence_identifier)
)
raise UnexpectedSequenceIdentifierError('Rejecting new sequence identifier')
[docs] def check_if_document_seen(self, document=None, sequence_identifier=None, sequence_number=None):
if document:
sequence_identifier = document.sequence_identifier
sequence_number = document.sequence_number
if sequence_identifier is None or sequence_number is None:
raise Exception()
if sequence_identifier in self._seen_docs.keys():
if sequence_number in self._seen_docs[sequence_identifier]:
return False
self._seen_docs.setdefault(sequence_identifier, RingBufferWithCallback(maxlen=100)).append(sequence_number)
return True
[docs] def register_consumer_carriage(self, consumer_carriage):
if not isinstance(consumer_carriage, IConsumerCarriage):
raise ComponentCompatError(
ERR_INCOMPATIBLE_COMPONENT.format(
component=consumer_carriage,
expected_interface=IConsumerCarriage
)
)
if self.expects() != consumer_carriage.provides():
raise DataCompatError(
ERR_INCOMPATIBLE_DATA_EXPECTED.format(
component=consumer_carriage,
expects=consumer_carriage.provides(),
provides=self.expects()
)
)
self._consumer_carriage = consumer_carriage
self._consumer_carriage.register_consumer_node(node=self)
@property
def consumer_carriage(self):
return self._consumer_carriage
[docs]class AbstractCombinedNode(AbstractConsumerNode, AbstractProducerNode):
def __init__(self, node_id, producer_carriage=None, consumer_carriage=None, **kwargs):
super(AbstractCombinedNode, self).__init__(
node_id=node_id,
producer_carriage=producer_carriage,
consumer_carriage=consumer_carriage,
**kwargs
)