Source code for ebu_tt_live.config.backend

import logging
from .common import ConfigurableComponent, Namespace, RequiredConfig
from ebu_tt_live.strings import ERR_CONF_WS_SERVER_PROTOCOL_MISMATCH
from ebu_tt_live.errors import ConfigurationError
from ebu_tt_live.strings import ERR_NO_SUCH_COMPONENT


log = logging.getLogger(__name__)


[docs]class BackendBase(ConfigurableComponent): _components_to_start = None _all_components = None def __init__(self, config, local_config): super(BackendBase, self).__init__(config, local_config, backend=self) self._components_to_start = [] self._all_configurators = set()
[docs] def register_configurator(self, configurator): self._all_configurators.add(configurator)
[docs] def start(self): for item in self._all_configurators: # Start all the components if item != self and item in self._components_to_start: log.info('Starting component: {}'.format(item)) item.start()
[docs] def register_component_start(self, component): self._components_to_start.append(component) log.info(self._components_to_start)
[docs] def call_once(self, func, delay=0.0, result_callback=None, error_callback=None, *args, **kwargs): raise NotImplementedError()
[docs] def call_periodically(self, func, interval=0.0, result_callback=None, error_callback=None, *args, **kwargs): raise NotImplementedError()
[docs]class DummyBackend(BackendBase): _simple_calls = None _periodic_calls = None def __init__(self, config, local_config): self._simple_calls = [] self._periodic_calls = [] super(DummyBackend, self).__init__(config=config, local_config=local_config)
[docs] def call_once(self, func, delay=0.0, result_callback=None, error_callback=None, *args, **kwargs): self._simple_calls.append({ 'func': func, 'delay': delay, 'result_callback': result_callback, 'error_callback': error_callback })
[docs] def call_periodically(self, func, interval=0.0, result_callback=None, error_callback=None, *args, **kwargs): self._periodic_calls.append({ 'func': func, 'interval': interval, 'result_callback': result_callback, 'error_callback': error_callback })
[docs]class TwistedBackend(BackendBase): required_config = Namespace() _websocket = None _reactor = None _task = None _ws_twisted_producer_type = None _ws_twisted_consumer_type = None _wsl_twisted_producer_type = None _wsl_twisted_consumer_type = None _ws_twisted_producers = None _ws_twisted_consumers = None _wsl_twisted_servers = None _ws_twisted_servers = None def __init__(self, config, local_config): from ebu_tt_live.twisted import websocket, reactor, task, TwistedWSPushProducer, TwistedWSConsumer, \ TwistedConsumer, TwistedPullProducer self._websocket = websocket self._reactor = reactor self._task = task self._ws_twisted_producer_type = TwistedWSPushProducer self._wsl_twisted_producer_type = TwistedPullProducer self._ws_twisted_consumer_type = TwistedWSConsumer self._wsl_twisted_consumer_type = TwistedConsumer self._wsl_twisted_servers = {} self._ws_twisted_servers = {} super(TwistedBackend, self).__init__(config=config, local_config=local_config)
[docs] def start(self): super(TwistedBackend, self).start() self._reactor.run()
def _crosscheck_ws_server_uri(self, listen, legacy=False): """ Checking for sockets already listening on the specified listen address. If there is a server there its factory is returned, if there isn't one, None is returned, if there is protocol mismatch a ConfigurationError exception is raised. :param listen: URI of websocket server :return: BroadcastServerFactory instance if exists for listen address, otherwise None :raises ConfigurationError: on protocol mismatch """ ws_server = self._ws_twisted_servers.get(listen) wsl_server = self._wsl_twisted_servers.get(listen) if ws_server and legacy or wsl_server and not legacy: raise ConfigurationError( ERR_CONF_WS_SERVER_PROTOCOL_MISMATCH.format( address=listen ) ) if ws_server: return ws_server if wsl_server: return wsl_server return None def _ws_create_server_factory(self, listen, producer=None, consumer=None): server_factory = self._websocket.BroadcastServerFactory( listen.geturl(), producer=producer, consumer=consumer ) server_factory.protocol = self._websocket.BroadcastServerProtocol self._ws_twisted_servers[listen.geturl()] = server_factory server_factory.listen() return server_factory def _ws_create_client_factories(self, connect, producer=None, consumer=None, proxy=None): factory_args = {} for dst in connect: client_factory = self._websocket.BroadcastClientFactory( url=dst.geturl(), producer=producer, consumer=consumer, **factory_args ) client_factory.protocol = self._websocket.BroadcastClientProtocol client_factory.proxy = proxy client_factory.connect()
[docs] def ws_backend_producer(self, custom_producer, listen=None, connect=None, proxy=None): """ The following cases to be considered. 1. There is listen address 1.1. The address is used by another producer: ERROR 1.2. The address is used by another protocol: ERROR 1.3. The address is used by a consumer server: create producer and assign it to the factory 1.4. The address is not in use: Create factory and create producer 2. There are connections to make with *publish* action 2.1 There is a producer from the server. Use that. Create client factories with it. 2.2 There is no producer from the server. Create producer, create client factories with it. :param custom_producer: :param listen: :param connect: :param proxy: :return: The Twisted Producer instance with server socket and/or client connections """ server_factory = listen and self._crosscheck_ws_server_uri(listen.geturl()) or None twisted_producer = server_factory and server_factory.producer or None if not twisted_producer: twisted_producer = self._ws_twisted_producer_type( custom_producer=custom_producer ) if listen: if server_factory: server_factory.producer = twisted_producer else: self._ws_create_server_factory( listen=listen, producer=twisted_producer ) if connect: self._ws_create_client_factories( connect=connect, producer=twisted_producer, proxy=proxy ) return twisted_producer
[docs] def ws_backend_consumer(self, custom_consumer, listen=None, connect=None, proxy=None): server_factory = listen and self._crosscheck_ws_server_uri(listen.geturl()) or None twisted_consumer = server_factory and server_factory.consumer or None if not twisted_consumer: twisted_consumer = self._ws_twisted_consumer_type( custom_consumer=custom_consumer ) if listen: if server_factory: server_factory.consumer = twisted_consumer else: self._ws_create_server_factory( listen=listen, consumer=twisted_consumer ) if connect: self._ws_create_client_factories( connect=connect, consumer=twisted_consumer, proxy=proxy ) return twisted_consumer
[docs] def wsl_backend_producer(self, uri, custom_producer): server_factory = self._crosscheck_ws_server_uri(listen=uri.geturl(), legacy=True) if not server_factory: twisted_producer = self._wsl_twisted_producer_type( custom_producer=custom_producer ) server_factory = self._websocket.LegacyBroadcastServerFactory( uri.geturl() ) server_factory.protocol = self._websocket.LegacyBroadcastServerProtocol server_factory.registerProducer(twisted_producer, streaming=True) self._wsl_twisted_servers[uri.geturl()] = server_factory server_factory.listen() return server_factory.producer
[docs] def wsl_backend_consumer(self, uri, custom_consumer, proxy=None): factory_args = {} if proxy: proxyHost, proxyPort = proxy.split(':') factory_args['proxy'] = {'host': proxyHost, 'port': int(proxyPort)} factory = self._websocket.LegacyBroadcastClientFactory( url=uri.geturl(), consumer=self._wsl_twisted_consumer_type( custom_consumer=custom_consumer ), **factory_args ) factory.protocol = self._websocket.LegacyBroadcastClientProtocol factory.connect()
[docs] def call_once(self, func, delay=0.0, result_callback=None, error_callback=None, *args, **kwargs): d = self._task.deferLater(self._reactor, delay=delay, callable=func, *args, **kwargs) if result_callback is not None: d.addCallback(result_callback) if error_callback is not None: d.addErrback(error_callback)
[docs] def call_periodically(self, func, interval=0.0, result_callback=None, error_callback=None, *args, **kwargs): looping_call = self._task.LoopingCall(f=func, *args, **kwargs) d = looping_call.start(interval, now=False) if result_callback is not None: d.addCallback(result_callback) if error_callback is not None: d.addErrback(error_callback)
backend_by_type = { 'twisted': TwistedBackend, 'dummy': DummyBackend }
[docs]def get_backend(backend_name): try: return backend_by_type.get(backend_name) except KeyError: raise ConfigurationError( ERR_NO_SUCH_COMPONENT.format( type_name=backend_name ) )
[docs]class UniversalBackend(RequiredConfig): required_config = Namespace() required_config.backend = backend = Namespace() backend.add_option('type', default='twisted', from_string_converter=get_backend)