# -*- test-case-name: twisted.internet.test.test_endpoints -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Implementations of L{IStreamServerEndpoint} and L{IStreamClientEndpoint} that
wrap the L{IReactorTCP}, L{IReactorSSL}, and L{IReactorUNIX} interfaces.
This also implements an extensible mini-language for describing endpoints,
parsed by the L{clientFromString} and L{serverFromString} functions.
@since: 10.1
"""
from __future__ import division, absolute_import
import os
import socket
from zope.interface import implementer, directlyProvides
import warnings
from twisted.internet import interfaces, defer, error, fdesc, threads
from twisted.internet.protocol import (
ClientFactory, Protocol, ProcessProtocol, Factory)
from twisted.internet.interfaces import IStreamServerEndpointStringParser
from twisted.internet.interfaces import IStreamClientEndpointStringParser
from twisted.python.filepath import FilePath
from twisted.python.failure import Failure
from twisted.python import log
from twisted.python.components import proxyForInterface
from twisted.plugin import IPlugin, getPlugins
from twisted.internet import stdio
from .interfaces import IFileDescriptorReceiver
__all__ = ["TCP4ClientEndpoint", "SSL4ServerEndpoint"]
class _WrappingProtocol(Protocol):
"""
Wrap another protocol in order to notify my user when a connection has
been made.
"""
def __init__(self, connectedDeferred, wrappedProtocol):
"""
@param connectedDeferred: The L{Deferred} that will callback
with the C{wrappedProtocol} when it is connected.
@param wrappedProtocol: An L{IProtocol} provider that will be
connected.
"""
self._connectedDeferred = connectedDeferred
self._wrappedProtocol = wrappedProtocol
for iface in [interfaces.IHalfCloseableProtocol,
IFileDescriptorReceiver]:
if iface.providedBy(self._wrappedProtocol):
directlyProvides(self, iface)
def logPrefix(self):
"""
Transparently pass through the wrapped protocol's log prefix.
"""
if interfaces.ILoggingContext.providedBy(self._wrappedProtocol):
return self._wrappedProtocol.logPrefix()
return self._wrappedProtocol.__class__.__name__
def connectionMade(self):
"""
Connect the C{self._wrappedProtocol} to our C{self.transport} and
callback C{self._connectedDeferred} with the C{self._wrappedProtocol}
"""
self._wrappedProtocol.makeConnection(self.transport)
self._connectedDeferred.callback(self._wrappedProtocol)
def dataReceived(self, data):
"""
Proxy C{dataReceived} calls to our C{self._wrappedProtocol}
"""
return self._wrappedProtocol.dataReceived(data)
def fileDescriptorReceived(self, descriptor):
"""
Proxy C{fileDescriptorReceived} calls to our C{self._wrappedProtocol}
"""
return self._wrappedProtocol.fileDescriptorReceived(descriptor)
def connectionLost(self, reason):
"""
Proxy C{connectionLost} calls to our C{self._wrappedProtocol}
"""
return self._wrappedProtocol.connectionLost(reason)
def readConnectionLost(self):
"""
Proxy L{IHalfCloseableProtocol.readConnectionLost} to our
C{self._wrappedProtocol}
"""
self._wrappedProtocol.readConnectionLost()
def writeConnectionLost(self):
"""
Proxy L{IHalfCloseableProtocol.writeConnectionLost} to our
C{self._wrappedProtocol}
"""
self._wrappedProtocol.writeConnectionLost()
class _WrappingFactory(ClientFactory):
"""
Wrap a factory in order to wrap the protocols it builds.
@ivar _wrappedFactory: A provider of I{IProtocolFactory} whose buildProtocol
method will be called and whose resulting protocol will be wrapped.
@ivar _onConnection: A L{Deferred} that fires when the protocol is
connected
@ivar _connector: A L{connector <twisted.internet.interfaces.IConnector>}
that is managing the current or previous connection attempt.
"""
protocol = _WrappingProtocol
def __init__(self, wrappedFactory):
"""
@param wrappedFactory: A provider of I{IProtocolFactory} whose
buildProtocol method will be called and whose resulting protocol
will be wrapped.
"""
self._wrappedFactory = wrappedFactory
self._onConnection = defer.Deferred(canceller=self._canceller)
def startedConnecting(self, connector):
"""
A connection attempt was started. Remember the connector which started
said attempt, for use later.
"""
self._connector = connector
def _canceller(self, deferred):
"""
The outgoing connection attempt was cancelled. Fail that L{Deferred}
with an L{error.ConnectingCancelledError}.
@param deferred: The L{Deferred <defer.Deferred>} that was cancelled;
should be the same as C{self._onConnection}.
@type deferred: L{Deferred <defer.Deferred>}
@note: This relies on startedConnecting having been called, so it may
seem as though there's a race condition where C{_connector} may not
have been set. However, using public APIs, this condition is
impossible to catch, because a connection API
(C{connectTCP}/C{SSL}/C{UNIX}) is always invoked before a
L{_WrappingFactory}'s L{Deferred <defer.Deferred>} is returned to
C{connect()}'s caller.
@return: C{None}
"""
deferred.errback(
error.ConnectingCancelledError(
self._connector.getDestination()))
self._connector.stopConnecting()
def doStart(self):
"""
Start notifications are passed straight through to the wrapped factory.
"""
self._wrappedFactory.doStart()
def doStop(self):
"""
Stop notifications are passed straight through to the wrapped factory.
"""
self._wrappedFactory.doStop()
def buildProtocol(self, addr):
"""
Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback
the C{self._onConnection} L{Deferred}.
@return: An instance of L{_WrappingProtocol} or C{None}
"""
try:
proto = self._wrappedFactory.buildProtocol(addr)
except:
self._onConnection.errback()
else:
return self.protocol(self._onConnection, proto)
def clientConnectionFailed(self, connector, reason):
"""
Errback the C{self._onConnection} L{Deferred} when the
client connection fails.
"""
if not self._onConnection.called:
self._onConnection.errback(reason)
@implementer(interfaces.ITransport)
class _ProcessEndpointTransport(proxyForInterface(
interfaces.IProcessTransport, '_process')):
"""
An L{ITransport} provider for the L{IProtocol} instance passed to the
process endpoint.
@ivar _process: An active process transport which will be used by write
methods on this object to write data to a child process.
@type _process: L{interfaces.IProcessTransport} provider
"""
def write(self, data):
"""
Write to the child process's standard input.
@param data: The data to write on stdin.
"""
self._process.writeToChild(0, data)
def writeSequence(self, data):
"""
Write a list of strings to child process's stdin.
@param data: The list of chunks to write on stdin.
"""
for chunk in data:
self._process.writeToChild(0, chunk)
@implementer(interfaces.IStreamServerEndpoint)
class _TCPServerEndpoint(object):
"""
A TCP server endpoint interface
"""
def __init__(self, reactor, port, backlog, interface):
"""
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to
@type interface: str
"""
self._reactor = reactor
self._port = port
self._backlog = backlog
self._interface = interface
def listen(self, protocolFactory):
"""
Implement L{IStreamServerEndpoint.listen} to listen on a TCP
socket
"""
return defer.execute(self._reactor.listenTCP,
self._port,
protocolFactory,
backlog=self._backlog,
interface=self._interface)
class TCP4ServerEndpoint(_TCPServerEndpoint):
"""
Implements TCP server endpoint with an IPv4 configuration
"""
def __init__(self, reactor, port, backlog=50, interface=''):
"""
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to, defaults to '' (all)
@type interface: str
"""
_TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
class TCP6ServerEndpoint(_TCPServerEndpoint):
"""
Implements TCP server endpoint with an IPv6 configuration
"""
def __init__(self, reactor, port, backlog=50, interface='::'):
"""
@param reactor: An L{IReactorTCP} provider.
@param port: The port number used for listening
@type port: int
@param backlog: Size of the listen queue
@type backlog: int
@param interface: The hostname to bind to, defaults to '' (all)
@type interface: str
"""
_TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
@implementer(interfaces.IStreamClientEndpoint)
class TCP4ClientEndpoint(object):
"""
TCP client endpoint with an IPv4 configuration.
"""
def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
"""
@param reactor: An L{IReactorTCP} provider
@param host: A hostname, used when connecting
@type host: str
@param port: The port number, used when connecting
@type port: int
Loading ...