import itertools import logging import random import signal import engineio import six from . import exceptions from . import namespace from . import packet default_logger = logging.getLogger('socketio.client') reconnecting_clients = [] def signal_handler(sig, frame): # pragma: no cover """SIGINT handler. Notify any clients that are in a reconnect loop to abort. Other disconnection tasks are handled at the engine.io level. """ for client in reconnecting_clients[:]: client._reconnect_abort.set() return original_signal_handler(sig, frame) original_signal_handler = signal.signal(signal.SIGINT, signal_handler) class Client(object): """A Socket.IO client. This class implements a fully compliant Socket.IO web client with support for websocket and long-polling transports. :param reconnection: ``True`` if the client should automatically attempt to reconnect to the server after an interruption, or ``False`` to not reconnect. The default is ``True``. :param reconnection_attempts: How many reconnection attempts to issue before giving up, or 0 for infinity attempts. The default is 0. :param reconnection_delay: How long to wait in seconds before the first reconnection attempt. Each successive attempt doubles this delay. :param reconnection_delay_max: The maximum delay between reconnection attempts. :param randomization_factor: Randomization amount for each delay between reconnection attempts. The default is 0.5, which means that each delay is randomly adjusted by +/- 50%. :param logger: To enable logging set to ``True`` or pass a logger object to use. To disable logging set to ``False``. The default is ``False``. :param binary: ``True`` to support binary payloads, ``False`` to treat all payloads as text. On Python 2, if this is set to ``True``, ``unicode`` values are treated as text, and ``str`` and ``bytes`` values are treated as binary. This option has no effect on Python 3, where text and binary payloads are always automatically discovered. :param json: An alternative json module to use for encoding and decoding packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library versions. The Engine.IO configuration supports the following settings: :param request_timeout: A timeout in seconds for requests. The default is 5 seconds. :param ssl_verify: ``True`` to verify SSL certificates, or ``False`` to skip SSL certificate verification, allowing connections to servers with self signed certificates. The default is ``True``. :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass a logger object to use. To disable logging set to ``False``. The default is ``False``. """ def __init__(self, reconnection=True, reconnection_attempts=0, reconnection_delay=1, reconnection_delay_max=5, randomization_factor=0.5, logger=False, binary=False, json=None, **kwargs): self.reconnection = reconnection self.reconnection_attempts = reconnection_attempts self.reconnection_delay = reconnection_delay self.reconnection_delay_max = reconnection_delay_max self.randomization_factor = randomization_factor self.binary = binary engineio_options = kwargs engineio_logger = engineio_options.pop('engineio_logger', None) if engineio_logger is not None: engineio_options['logger'] = engineio_logger if json is not None: packet.Packet.json = json engineio_options['json'] = json self.eio = self._engineio_client_class()(**engineio_options) self.eio.on('connect', self._handle_eio_connect) self.eio.on('message', self._handle_eio_message) self.eio.on('disconnect', self._handle_eio_disconnect) if not isinstance(logger, bool): self.logger = logger else: self.logger = default_logger if not logging.root.handlers and \ self.logger.level == logging.NOTSET: if logger: self.logger.setLevel(logging.INFO) else: self.logger.setLevel(logging.ERROR) self.logger.addHandler(logging.StreamHandler()) self.connection_url = None self.connection_headers = None self.connection_transports = None self.connection_namespaces = None self.socketio_path = None self.sid = None self.connected = False self.namespaces = [] self.handlers = {} self.namespace_handlers = {} self.callbacks = {} self._binary_packet = None self._reconnect_task = None self._reconnect_abort = self.eio.create_event() def is_asyncio_based(self): return False def on(self, event, handler=None, namespace=None): """Register an event handler. :param event: The event name. It can be any string. The event names ``'connect'``, ``'message'`` and ``'disconnect'`` are reserved and should not be used. :param handler: The function that should be invoked to handle the event. When this parameter is not given, the method acts as a decorator for the handler function. :param namespace: The Socket.IO namespace for the event. If this argument is omitted the handler is associated with the default namespace. Example usage:: # as a decorator: @sio.on('connect') def connect_handler(): print('Connected!') # as a method: def message_handler(msg): print('Received message: ', msg) sio.send( 'response') sio.on('message', message_handler) The ``'connect'`` event handler receives no arguments. The ``'message'`` handler and handlers for custom event names receive the message payload as only argument. Any values returned from a message handler will be passed to the client's acknowledgement callback function if it exists. The ``'disconnect'`` handler does not take arguments. """ namespace = namespace or '/' def set_handler(handler): if namespace not in self.handlers: self.handlers[namespace] = {} self.handlers[namespace][event] = handler return handler if handler is None: return set_handler set_handler(handler) def event(self, *args, **kwargs): """Decorator to register an event handler. This is a simplified version of the ``on()`` method that takes the event name from the decorated function. Example usage:: @sio.event def my_event(data): print('Received data: ', data) The above example is equivalent to:: @sio.on('my_event') def my_event(data): print('Received data: ', data) A custom namespace can be given as an argument to the decorator:: @sio.event(namespace='/test') def my_event(data): print('Received data: ', data) """ if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): # the decorator was invoked without arguments # args[0] is the decorated function return self.on(args[0].__name__)(args[0]) else: # the decorator was invoked with arguments def set_handler(handler): return self.on(handler.__name__, *args, **kwargs)(handler) return set_handler def register_namespace(self, namespace_handler): """Register a namespace handler object. :param namespace_handler: An instance of a :class:`Namespace` subclass that handles all the event traffic for a namespace. """ if not isinstance(namespace_handler, namespace.ClientNamespace): raise ValueError('Not a namespace instance') if self.is_asyncio_based() != namespace_handler.is_asyncio_based(): raise ValueError('Not a valid namespace class for this client') namespace_handler._set_client(self) self.namespace_handlers[namespace_handler.namespace] = \ namespace_handler def connect(self, url, headers={}, transports=None, namespaces=None, socketio_path='socket.io'): """Connect to a Socket.IO server. :param url: The URL of the Socket.IO server. It can include custom query string parameters if required by the server. :param headers: A dictionary with custom headers to send with the connection request. :param transports: The list of allowed transports. Valid transports are ``'polling'`` and ``'websocket'``. If not given, the polling transport is connected first, then an upgrade to websocket is attempted. :param namespaces: The list of custom namespaces to connect, in addition to the default namespace. If not given, the namespace list is obtained from the registered event handlers. :param socketio_path: The endpoint where the Socket.IO server is installed. The default value is appropriate for most cases. Example usage:: sio = socketio.Client() sio.connect('http://localhost:5000') """ self.connection_url = url self.connection_headers = headers self.connection_transports = transports self.connection_namespaces = namespaces self.socketio_path = socketio_path if namespaces is None: namespaces = set(self.handlers.keys()).union( set(self.namespace_handlers.keys())) elif isinstance(namespaces, six.string_types): namespaces = [namespaces] self.connection_namespaces = namespaces self.namespaces = [n for n in namespaces if n != '/'] try: self.eio.connect(url, headers=headers, transports=transports, engineio_path=socketio_path) except engineio.exceptions.ConnectionError as exc: six.raise_from(exceptions.ConnectionError(exc.args[0]), None) self.connected = True def wait(self): """Wait until the connection with the server ends. Client applications can use this function to block the main thread during the life of the connection. """ while True: self.eio.wait() self.sleep(1) # give the reconnect task time to start up if not self._reconnect_task: break self._reconnect_task.join() if self.eio.state != 'connected': break def emit(self, event, data=None, namespace=None, callback=None): """Emit a custom event to one or more connected clients. :param event: The event name. It can be any string. The event names ``'connect'``, ``'message'`` and ``'disconnect'`` are reserved and should not be used. :param data: The data to send to the client or clients. Data can be of type ``str``, ``bytes``, ``list`` or ``dict``. If a ``list`` or ``dict``, the data will be serialized as JSON. :param namespace: The Socket.IO namespace for the event. If this argument is omitted the event is emitted to the default namespace. :param callback: If given, this function will be called to acknowledge the the client has received the message. The arguments that will be passed to the function are those provided by the client. Callback functions can only be used when addressing an individual client. """ namespace = namespace or '/' if namespace != '/' and namespace not in self.namespaces: raise exceptions.BadNamespaceError( namespace + ' is not a connected namespace.') self.logger.info('Emitting event "%s" [%s]', event, namespace) if callback is not None: id = self._generate_ack_id(namespace, callback) else: id = None if six.PY2 and not self.binary: binary = False # pragma: nocover else: binary = None # tuples are expanded to multiple arguments, everything else is sent # as a single argument if isinstance(data, tuple): data = list(data) elif data is not None: data = [data] else: data = [] self._send_packet(packet.Packet(packet.EVENT, namespace=namespace, data=[event] + data, id=id, binary=binary)) def send(self, data, namespace=None, callback=None): """Send a message to one or more connected clients. This function emits an event with the name ``'message'``. Use :func:`emit` to issue custom event names. :param data: The data to send to the client or clients. Data can be of type ``str``, ``bytes``, ``list`` or ``dict``. If a ``list`` or ``dict``, the data will be serialized as JSON. :param namespace: The Socket.IO namespace for the event. If this argument is omitted the event is emitted to the default namespace. :param callback: If given, this function will be called to acknowledge the the client has received the message. The arguments that will be passed to the function are those provided by the client. Callback functions can only be used when addressing an individual client. """ self.emit('message', data=data, namespace=namespace, callback=callback) def call(self, event, data=None, namespace=None, timeout=60): """Emit a custom event to a client and wait for the response. :param event: The event name. It can be any string. The event names ``'connect'``, ``'message'`` and ``'disconnect'`` are reserved and should not be used. :param data: The data to send to the client or clients. Data can be of type ``str``, ``bytes``, ``list`` or ``dict``. If a ``list`` or ``dict``, the data will be serialized as JSON. :param namespace: The Socket.IO namespace for the event. If this argument is omitted the event is emitted to the default namespace. :param timeout: The waiting timeout. If the timeout is reached before the client acknowledges the event, then a ``TimeoutError`` exception is raised. """ callback_event = self.eio.create_event() callback_args = [] def event_callback(*args): callback_args.append(args) callback_event.set() self.emit(event, data=data, namespace=namespace, callback=event_callback) if not callback_event.wait(timeout=timeout): raise exceptions.TimeoutError() return callback_args[0] if len(callback_args[0]) > 1 \ else callback_args[0][0] if len(callback_args[0]) == 1 \ else None def disconnect(self): """Disconnect from the server.""" # here we just request the disconnection # later in _handle_eio_disconnect we invoke the disconnect handler for n in self.namespaces: self._send_packet(packet.Packet(packet.DISCONNECT, namespace=n)) self._send_packet(packet.Packet( packet.DISCONNECT, namespace='/')) self.connected = False self.eio.disconnect(abort=True) def transport(self): """Return the name of the transport used by the client. The two possible values returned by this function are ``'polling'`` and ``'websocket'``. """ return self.eio.transport() def start_background_task(self, target, *args, **kwargs): """Start a background task using the appropriate async model. This is a utility function that applications can use to start a background task using the method that is compatible with the selected async mode. :param target: the target function to execute. :param args: arguments to pass to the function. :param kwargs: keyword arguments to pass to the function. This function returns an object compatible with the `Thread` class in the Python standard library. The `start()` method on this object is already called by this function. """ return self.eio.start_background_task(target, *args, **kwargs) def sleep(self, seconds=0): """Sleep for the requested amount of time using the appropriate async model. This is a utility function that applications can use to put a task to sleep without having to worry about using the correct call for the selected async mode. """ return self.eio.sleep(seconds) def _send_packet(self, pkt): """Send a Socket.IO packet to the server.""" encoded_packet = pkt.encode() if isinstance(encoded_packet, list): binary = False for ep in encoded_packet: self.eio.send(ep, binary=binary) binary = True else: self.eio.send(encoded_packet, binary=False) def _generate_ack_id(self, namespace, callback): """Generate a unique identifier for an ACK packet.""" namespace = namespace or '/' if namespace not in self.callbacks: self.callbacks[namespace] = {0: itertools.count(1)} id = six.next(self.callbacks[namespace][0]) self.callbacks[namespace][id] = callback return id def _handle_connect(self, namespace): namespace = namespace or '/' self.logger.info('Namespace {} is connected'.format(namespace)) self._trigger_event('connect', namespace=namespace) if namespace == '/': for n in self.namespaces: self._send_packet(packet.Packet(packet.CONNECT, namespace=n)) elif namespace not in self.namespaces: self.namespaces.append(namespace) def _handle_disconnect(self, namespace): if not self.connected: return namespace = namespace or '/' if namespace == '/': for n in self.namespaces: self._trigger_event('disconnect', namespace=n) self.namespaces = [] self._trigger_event('disconnect', namespace=namespace) if namespace in self.namespaces: self.namespaces.remove(namespace) if namespace == '/': self.connected = False def _handle_event(self, namespace, id, data): namespace = namespace or '/' self.logger.info('Received event "%s" [%s]', data[0], namespace) r = self._trigger_event(data[0], namespace, *data[1:]) if id is not None: # send ACK packet with the response returned by the handler # tuples are expanded as multiple arguments if r is None: data = [] elif isinstance(r, tuple): data = list(r) else: data = [r] if six.PY2 and not self.binary: binary = False # pragma: nocover else: binary = None self._send_packet(packet.Packet(packet.ACK, namespace=namespace, id=id, data=data, binary=binary)) def _handle_ack(self, namespace, id, data): namespace = namespace or '/' self.logger.info('Received ack [%s]', namespace) callback = None try: callback = self.callbacks[namespace][id] except KeyError: # if we get an unknown callback we just ignore it self.logger.warning('Unknown callback received, ignoring.') else: del self.callbacks[namespace][id] if callback is not None: callback(*data) def _handle_error(self, namespace, data): namespace = namespace or '/' self.logger.info('Connection to namespace {} was rejected'.format( namespace)) if data is None: data = tuple() elif not isinstance(data, (tuple, list)): data = (data,) self._trigger_event('connect_error', namespace, *data) if namespace in self.namespaces: self.namespaces.remove(namespace) if namespace == '/': self.namespaces = [] self.connected = False def _trigger_event(self, event, namespace, *args): """Invoke an application event handler.""" # first see if we have an explicit handler for the event if namespace in self.handlers and event in self.handlers[namespace]: return self.handlers[namespace][event](*args) # or else, forward the event to a namespace handler if one exists elif namespace in self.namespace_handlers: return self.namespace_handlers[namespace].trigger_event( event, *args) def _handle_reconnect(self): self._reconnect_abort.clear() reconnecting_clients.append(self) attempt_count = 0 current_delay = self.reconnection_delay while True: delay = current_delay current_delay *= 2 if delay > self.reconnection_delay_max: delay = self.reconnection_delay_max delay += self.randomization_factor * (2 * random.random() - 1) self.logger.info( 'Connection failed, new attempt in {:.02f} seconds'.format( delay)) if self._reconnect_abort.wait(delay): self.logger.info('Reconnect task aborted') break attempt_count += 1 try: self.connect(self.connection_url, headers=self.connection_headers, transports=self.connection_transports, namespaces=self.connection_namespaces, socketio_path=self.socketio_path) except (exceptions.ConnectionError, ValueError): pass else: self.logger.info('Reconnection successful') self._reconnect_task = None break if self.reconnection_attempts and \ attempt_count >= self.reconnection_attempts: self.logger.info( 'Maximum reconnection attempts reached, giving up') break reconnecting_clients.remove(self) def _handle_eio_connect(self): """Handle the Engine.IO connection event.""" self.logger.info('Engine.IO connection established') self.sid = self.eio.sid def _handle_eio_message(self, data): """Dispatch Engine.IO messages.""" if self._binary_packet: pkt = self._binary_packet if pkt.add_attachment(data): self._binary_packet = None if pkt.packet_type == packet.BINARY_EVENT: self._handle_event(pkt.namespace, pkt.id, pkt.data) else: self._handle_ack(pkt.namespace, pkt.id, pkt.data) else: pkt = packet.Packet(encoded_packet=data) if pkt.packet_type == packet.CONNECT: self._handle_connect(pkt.namespace) elif pkt.packet_type == packet.DISCONNECT: self._handle_disconnect(pkt.namespace) elif pkt.packet_type == packet.EVENT: self._handle_event(pkt.namespace, pkt.id, pkt.data) elif pkt.packet_type == packet.ACK: self._handle_ack(pkt.namespace, pkt.id, pkt.data) elif pkt.packet_type == packet.BINARY_EVENT or \ pkt.packet_type == packet.BINARY_ACK: self._binary_packet = pkt elif pkt.packet_type == packet.ERROR: self._handle_error(pkt.namespace, pkt.data) else: raise ValueError('Unknown packet type.') def _handle_eio_disconnect(self): """Handle the Engine.IO disconnection event.""" self.logger.info('Engine.IO connection dropped') if self.connected: for n in self.namespaces: self._trigger_event('disconnect', namespace=n) self._trigger_event('disconnect', namespace='/') self.namespaces = [] self.connected = False self.callbacks = {} self._binary_packet = None self.sid = None if self.eio.state == 'connected' and self.reconnection: self._reconnect_task = self.start_background_task( self._handle_reconnect) def _engineio_client_class(self): return engineio.Client