"""Base class for implementing servers"""
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
import sys
import _socket
import errno
from gevent.greenlet import Greenlet
from gevent.event import Event
from gevent.hub import get_hub
from gevent._compat import string_types, integer_types, xrange


__all__ = ['BaseServer']


# We define a helper function to handle closing the socket in
# do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at
# all, but that's incompatible with the calling convention of
# do_handle. On CPython, this is ~20% faster than creating and calling
# a closure and ~10% faster than using a @staticmethod. (In theory, we
# could create a closure only once in set_handle, to wrap self._handle,
# but this is safer from a  backwards compat standpoint.)
# we also avoid unpacking the *args tuple when calling/spawning this object
# for a tiny improvement (benchmark shows a wash)
def _handle_and_close_when_done(handle, close, args_tuple):
    try:
        return handle(*args_tuple)
    finally:
        close(*args_tuple)


class BaseServer(object):
    """
    An abstract base class that implements some common functionality for the servers in gevent.

    :param listener: Either be an address that the server should bind
        on or a :class:`gevent.socket.socket` instance that is already
        bound (and put into listening mode in case of TCP socket).

    :keyword handle: If given, the request handler. The request
        handler can be defined in a few ways. Most commonly,
        subclasses will implement a ``handle`` method as an
        instance method. Alternatively, a function can be passed
        as the ``handle`` argument to the constructor. In either
        case, the handler can later be changed by calling
        :meth:`set_handle`.

        When the request handler returns, the socket used for the
        request will be closed. Therefore, the handler must not return if
        the socket is still in use (for example, by manually spawned greenlets).

    :keyword spawn: If provided, is called to create a new
        greenlet to run the handler. By default,
        :func:`gevent.spawn` is used (meaning there is no
        artificial limit on the number of concurrent requests). Possible values for *spawn*:

        - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed
          using :meth:`gevent.pool.Pool.spawn` only if the pool is not full.
          While it is full, no new connections are accepted;
        - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw
          greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default;
        - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet.
          ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`.
        - an integer -- a shortcut for ``gevent.pool.Pool(integer)``

    .. versionchanged:: 1.1a1
       When the *handle* function returns from processing a connection,
       the client socket will be closed. This resolves the non-deterministic
       closing of the socket, fixing ResourceWarnings under Python 3 and PyPy.

    """
    # pylint: disable=too-many-instance-attributes,bare-except,broad-except

    #: the number of seconds to sleep in case there was an error in accept() call
    #: for consecutive errors the delay will double until it reaches max_delay
    #: when accept() finally succeeds the delay will be reset to min_delay again
    min_delay = 0.01
    max_delay = 1

    #: Sets the maximum number of consecutive accepts that a process may perform on
    #: a single wake up. High values give higher priority to high connection rates,
    #: while lower values give higher priority to already established connections.
    #: Default is 100. Note, that in case of multiple working processes on the same
    #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it
    #: to 1 when environ["wsgi.multiprocess"] is true)
    max_accept = 100

    _spawn = Greenlet.spawn

    #: the default timeout that we wait for the client connections to close in stop()
    stop_timeout = 1

    fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)

    def __init__(self, listener, handle=None, spawn='default'):
        self._stop_event = Event()
        self._stop_event.set()
        self._watcher = None
        self._timer = None
        self._handle = None
        # XXX: FIXME: Subclasses rely on the presence or absence of the
        # `socket` attribute to determine whether we are open/should be opened.
        # Instead, have it be None.
        self.pool = None
        try:
            self.set_listener(listener)
            self.set_spawn(spawn)
            self.set_handle(handle)
            self.delay = self.min_delay
            self.loop = get_hub().loop
            if self.max_accept < 1:
                raise ValueError('max_accept must be positive int: %r' % (self.max_accept, ))
        except:
            self.close()
            raise

    def set_listener(self, listener):
        if hasattr(listener, 'accept'):
            if hasattr(listener, 'do_handshake'):
                raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, ))
            self.family = listener.family
            self.address = listener.getsockname()
            self.socket = listener
        else:
            self.family, self.address = parse_address(listener)

    def set_spawn(self, spawn):
        if spawn == 'default':
            self.pool = None
            self._spawn = self._spawn
        elif hasattr(spawn, 'spawn'):
            self.pool = spawn
            self._spawn = spawn.spawn
        elif isinstance(spawn, integer_types):
            from gevent.pool import Pool
            self.pool = Pool(spawn)
            self._spawn = self.pool.spawn
        else:
            self.pool = None
            self._spawn = spawn
        if hasattr(self.pool, 'full'):
            self.full = self.pool.full
        if self.pool is not None:
            self.pool._semaphore.rawlink(self._start_accepting_if_started)

    def set_handle(self, handle):
        if handle is not None:
            self.handle = handle
        if hasattr(self, 'handle'):
            self._handle = self.handle
        else:
            raise TypeError("'handle' must be provided")

    def _start_accepting_if_started(self, _event=None):
        if self.started:
            self.start_accepting()

    def start_accepting(self):
        if self._watcher is None:
            # just stop watcher without creating a new one?
            self._watcher = self.loop.io(self.socket.fileno(), 1)
            self._watcher.start(self._do_read)

    def stop_accepting(self):
        if self._watcher is not None:
            self._watcher.stop()
            self._watcher.close()
            self._watcher = None
        if self._timer is not None:
            self._timer.stop()
            self._timer.close()
            self._timer = None

    def do_handle(self, *args):
        spawn = self._spawn
        handle = self._handle
        close = self.do_close

        try:
            if spawn is None:
                _handle_and_close_when_done(handle, close, args)
            else:
                spawn(_handle_and_close_when_done, handle, close, args)
        except:
            close(*args)
            raise

    def do_close(self, *args):
        pass

    def do_read(self):
        raise NotImplementedError()

    def _do_read(self):
        for _ in xrange(self.max_accept):
            if self.full():
                self.stop_accepting()
                return
            try:
                args = self.do_read()
                self.delay = self.min_delay
                if not args:
                    return
            except:
                self.loop.handle_error(self, *sys.exc_info())
                ex = sys.exc_info()[1]
                if self.is_fatal_error(ex):
                    self.close()
                    sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
                    return
                if self.delay >= 0:
                    self.stop_accepting()
                    self._timer = self.loop.timer(self.delay)
                    self._timer.start(self._start_accepting_if_started)
                    self.delay = min(self.max_delay, self.delay * 2)
                break
            else:
                try:
                    self.do_handle(*args)
                except:
                    self.loop.handle_error((args[1:], self), *sys.exc_info())
                    if self.delay >= 0:
                        self.stop_accepting()
                        self._timer = self.loop.timer(self.delay)
                        self._timer.start(self._start_accepting_if_started)
                        self.delay = min(self.max_delay, self.delay * 2)
                    break

    def full(self):
        # copied from self.pool
        # pylint: disable=method-hidden
        return False

    def __repr__(self):
        return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo())

    def __str__(self):
        return '<%s %s>' % (type(self).__name__, self._formatinfo())

    def _formatinfo(self):
        if hasattr(self, 'socket'):
            try:
                fileno = self.socket.fileno()
            except Exception as ex:
                fileno = str(ex)
            result = 'fileno=%s ' % fileno
        else:
            result = ''
        try:
            if isinstance(self.address, tuple) and len(self.address) == 2:
                result += 'address=%s:%s' % self.address
            else:
                result += 'address=%s' % (self.address, )
        except Exception as ex:
            result += str(ex) or '<error>'

        handle = self.__dict__.get('handle')
        if handle is not None:
            fself = getattr(handle, '__self__', None)
            try:
                if fself is self:
                    # Checks the __self__ of the handle in case it is a bound
                    # method of self to prevent recursivly defined reprs.
                    handle_repr = '<bound method %s.%s of self>' % (
                        self.__class__.__name__,
                        handle.__name__,
                    )
                else:
                    handle_repr = repr(handle)

                result += ' handle=' + handle_repr
            except Exception as ex:
                result += str(ex) or '<error>'

        return result

    @property
    def server_host(self):
        """IP address that the server is bound to (string)."""
        if isinstance(self.address, tuple):
            return self.address[0]

    @property
    def server_port(self):
        """Port that the server is bound to (an integer)."""
        if isinstance(self.address, tuple):
            return self.address[1]

    def init_socket(self):
        """If the user initialized the server with an address rather than socket,
        then this function will create a socket, bind it and put it into listening mode.

        It is not supposed to be called by the user, it is called by :meth:`start` before starting
        the accept loop."""

    @property
    def started(self):
        return not self._stop_event.is_set()

    def start(self):
        """Start accepting the connections.

        If an address was provided in the constructor, then also create a socket,
        bind it and put it into the listening mode.
        """
        self.init_socket()
        self._stop_event.clear()
        try:
            self.start_accepting()
        except:
            self.close()
            raise

    def close(self):
        """Close the listener socket and stop accepting."""
        self._stop_event.set()
        try:
            self.stop_accepting()
        finally:
            try:
                self.socket.close()
            except Exception:
                pass
            finally:
                self.__dict__.pop('socket', None)
                self.__dict__.pop('handle', None)
                self.__dict__.pop('_handle', None)
                self.__dict__.pop('_spawn', None)
                self.__dict__.pop('full', None)
                if self.pool is not None:
                    self.pool._semaphore.unlink(self._start_accepting_if_started)
                    # If the pool's semaphore had a notifier already started,
                    # there's a reference cycle we're a part of
                    # (self->pool->semaphere-hub callback->semaphore)
                    # But we can't destroy self.pool, because self.stop()
                    # calls this method, and then wants to join self.pool()

    @property
    def closed(self):
        return not hasattr(self, 'socket')

    def stop(self, timeout=None):
        """
        Stop accepting the connections and close the listening socket.

        If the server uses a pool to spawn the requests, then
        :meth:`stop` also waits for all the handlers to exit. If there
        are still handlers executing after *timeout* has expired
        (default 1 second, :attr:`stop_timeout`), then the currently
        running handlers in the pool are killed.

        If the server does not use a pool, then this merely stops accepting connections;
        any spawned greenlets that are handling requests continue running until
        they naturally complete.
        """
        self.close()
        if timeout is None:
            timeout = self.stop_timeout
        if self.pool:
            self.pool.join(timeout=timeout)
            self.pool.kill(block=True, timeout=1)


    def serve_forever(self, stop_timeout=None):
        """Start the server if it hasn't been already started and wait until it's stopped."""
        # add test that serve_forever exists on stop()
        if not self.started:
            self.start()
        try:
            self._stop_event.wait()
        finally:
            Greenlet.spawn(self.stop, timeout=stop_timeout).join()

    def is_fatal_error(self, ex):
        return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors


def _extract_family(host):
    if host.startswith('[') and host.endswith(']'):
        host = host[1:-1]
        return _socket.AF_INET6, host
    return _socket.AF_INET, host


def _parse_address(address):
    if isinstance(address, tuple):
        if not address[0] or ':' in address[0]:
            return _socket.AF_INET6, address
        return _socket.AF_INET, address

    if ((isinstance(address, string_types) and ':' not in address)
            or isinstance(address, integer_types)): # noqa (pep8 E129)
        # Just a port
        return _socket.AF_INET6, ('', int(address))

    if not isinstance(address, string_types):
        raise TypeError('Expected tuple or string, got %s' % type(address))

    host, port = address.rsplit(':', 1)
    family, host = _extract_family(host)
    if host == '*':
        host = ''
    return family, (host, int(port))


def parse_address(address):
    try:
        return _parse_address(address)
    except ValueError as ex: # pylint:disable=try-except-raise
        raise ValueError('Failed to parse address %r: %s' % (address, ex))