# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and SOCKSv4a protocols. """ import struct, socket from twisted.trial import unittest from twisted.test import proto_helpers from twisted.internet import defer, address, reactor from twisted.internet.error import DNSLookupError from twisted.protocols import socks class StringTCPTransport(proto_helpers.StringTransport): stringTCPTransport_closing = False peer = None def getPeer(self): return self.peer def getHost(self): return address.IPv4Address('TCP', '2.3.4.5', 42) def loseConnection(self): self.stringTCPTransport_closing = True class FakeResolverReactor: """ Bare-bones reactor with deterministic behavior for the resolve method. """ def __init__(self, names): """ @type names: C{dict} containing C{str} keys and C{str} values. @param names: A hostname to IP address mapping. The IP addresses are stringified dotted quads. """ self.names = names def resolve(self, hostname): """ Resolve a hostname by looking it up in the C{names} dictionary. """ try: return defer.succeed(self.names[hostname]) except KeyError: return defer.fail( DNSLookupError("FakeResolverReactor couldn't find " + hostname)) class SOCKSv4Driver(socks.SOCKSv4): # last SOCKSv4Outgoing instantiated driver_outgoing = None # last SOCKSv4IncomingFactory instantiated driver_listen = None def connectClass(self, host, port, klass, *args): # fake it proto = klass(*args) proto.transport = StringTCPTransport() proto.transport.peer = address.IPv4Address('TCP', host, port) proto.connectionMade() self.driver_outgoing = proto return defer.succeed(proto) def listenClass(self, port, klass, *args): # fake it factory = klass(*args) self.driver_listen = factory if port == 0: port = 1234 return defer.succeed(('6.7.8.9', port)) class Connect(unittest.TestCase): """ Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol. """ def setUp(self): self.sock = SOCKSv4Driver() self.sock.transport = StringTCPTransport() self.sock.connectionMade() self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"}) def tearDown(self): outgoing = self.sock.driver_outgoing if outgoing is not None: self.assert_(outgoing.transport.stringTCPTransport_closing, "Outgoing SOCKS connections need to be closed.") def test_simple(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('1.2.3.4')) self.assert_(not self.sock.transport.stringTCPTransport_closing) self.assert_(self.sock.driver_outgoing is not None) # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # the other way around self.sock.driver_outgoing.dataReceived('hi there') self.assertEqual(self.sock.transport.value(), 'hi there') self.sock.connectionLost('fake reason') def test_socks4aSuccessfulResolution(self): """ If the destination IP address has zeros for the first three octets and non-zero for the fourth octet, the client is attempting a v4a connection. A hostname is specified after the user ID string and the server connects to the address that hostname resolves to. @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol} """ # send the domain name "localhost" to be resolved clientRequest = ( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('0.0.0.1') + 'fooBAZ\0' + 'localhost\0') # Deliver the bytes one by one to exercise the protocol's buffering # logic. FakeResolverReactor's resolve method is invoked to "resolve" # the hostname. for byte in clientRequest: self.sock.dataReceived(byte) sent = self.sock.transport.value() self.sock.transport.clear() # Verify that the server responded with the address which will be # connected to. self.assertEqual( sent, struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1')) self.assertFalse(self.sock.transport.stringTCPTransport_closing) self.assertNotIdentical(self.sock.driver_outgoing, None) # Pass some data through and verify it is forwarded to the outgoing # connection. self.sock.dataReceived('hello, world') self.assertEqual( self.sock.driver_outgoing.transport.value(), 'hello, world') # Deliver some data from the output connection and verify it is # passed along to the incoming side. self.sock.driver_outgoing.dataReceived('hi there') self.assertEqual(self.sock.transport.value(), 'hi there') self.sock.connectionLost('fake reason') def test_socks4aFailedResolution(self): """ Failed hostname resolution on a SOCKSv4a packet results in a 91 error response and the connection getting closed. """ # send the domain name "failinghost" to be resolved clientRequest = ( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('0.0.0.1') + 'fooBAZ\0' + 'failinghost\0') # Deliver the bytes one by one to exercise the protocol's buffering # logic. FakeResolverReactor's resolve method is invoked to "resolve" # the hostname. for byte in clientRequest: self.sock.dataReceived(byte) # Verify that the server responds with a 91 error. sent = self.sock.transport.value() self.assertEqual( sent, struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) # A failed resolution causes the transport to drop the connection. self.assertTrue(self.sock.transport.stringTCPTransport_closing) self.assertIdentical(self.sock.driver_outgoing, None) def test_accessDenied(self): self.sock.authorize = lambda code, server, port, user: 0 self.sock.dataReceived( struct.pack('!BBH', 4, 1, 4242) + socket.inet_aton('10.2.3.4') + 'fooBAR' + '\0') self.assertEqual(self.sock.transport.value(), struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) self.assert_(self.sock.transport.stringTCPTransport_closing) self.assertIdentical(self.sock.driver_outgoing, None) def test_eofRemote(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # now close it from the server side self.sock.driver_outgoing.transport.loseConnection() self.sock.driver_outgoing.connectionLost('fake reason') def test_eofLocal(self): self.sock.dataReceived( struct.pack('!BBH', 4, 1, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(self.sock.driver_outgoing.transport.value(), 'hello, world') # now close it from the client side self.sock.connectionLost('fake reason') class Bind(unittest.TestCase): """ Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol. """ def setUp(self): self.sock = SOCKSv4Driver() self.sock.transport = StringTCPTransport() self.sock.connectionMade() self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"}) ## def tearDown(self): ## # TODO ensure the listen port is closed ## listen = self.sock.driver_listen ## if listen is not None: ## self.assert_(incoming.transport.stringTCPTransport_closing, ## "Incoming SOCKS connections need to be closed.") def test_simple(self): self.sock.dataReceived( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9')) self.assert_(not self.sock.transport.stringTCPTransport_closing) self.assert_(self.sock.driver_listen is not None) # connect incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) self.assertNotIdentical(incoming, None) incoming.transport = StringTCPTransport() incoming.connectionMade() # now we should have the second reply packet sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 0) + socket.inet_aton('0.0.0.0')) self.assert_(not self.sock.transport.stringTCPTransport_closing) # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(incoming.transport.value(), 'hello, world') # the other way around incoming.dataReceived('hi there') self.assertEqual(self.sock.transport.value(), 'hi there') self.sock.connectionLost('fake reason') def test_socks4a(self): """ If the destination IP address has zeros for the first three octets and non-zero for the fourth octet, the client is attempting a v4a connection. A hostname is specified after the user ID string and the server connects to the address that hostname resolves to. @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol} """ # send the domain name "localhost" to be resolved clientRequest = ( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('0.0.0.1') + 'fooBAZ\0' + 'localhost\0') # Deliver the bytes one by one to exercise the protocol's buffering # logic. FakeResolverReactor's resolve method is invoked to "resolve" # the hostname. for byte in clientRequest: self.sock.dataReceived(byte) sent = self.sock.transport.value() self.sock.transport.clear() # Verify that the server responded with the address which will be # connected to. self.assertEqual( sent, struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9')) self.assertFalse(self.sock.transport.stringTCPTransport_closing) self.assertNotIdentical(self.sock.driver_listen, None) # connect incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345)) self.assertNotIdentical(incoming, None) incoming.transport = StringTCPTransport() incoming.connectionMade() # now we should have the second reply packet sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 0) + socket.inet_aton('0.0.0.0')) self.assertNotIdentical( self.sock.transport.stringTCPTransport_closing, None) # Deliver some data from the output connection and verify it is # passed along to the incoming side. self.sock.dataReceived('hi there') self.assertEqual(incoming.transport.value(), 'hi there') # the other way around incoming.dataReceived('hi there') self.assertEqual(self.sock.transport.value(), 'hi there') self.sock.connectionLost('fake reason') def test_socks4aFailedResolution(self): """ Failed hostname resolution on a SOCKSv4a packet results in a 91 error response and the connection getting closed. """ # send the domain name "failinghost" to be resolved clientRequest = ( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('0.0.0.1') + 'fooBAZ\0' + 'failinghost\0') # Deliver the bytes one by one to exercise the protocol's buffering # logic. FakeResolverReactor's resolve method is invoked to "resolve" # the hostname. for byte in clientRequest: self.sock.dataReceived(byte) # Verify that the server responds with a 91 error. sent = self.sock.transport.value() self.assertEqual( sent, struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) # A failed resolution causes the transport to drop the connection. self.assertTrue(self.sock.transport.stringTCPTransport_closing) self.assertIdentical(self.sock.driver_outgoing, None) def test_accessDenied(self): self.sock.authorize = lambda code, server, port, user: 0 self.sock.dataReceived( struct.pack('!BBH', 4, 2, 4242) + socket.inet_aton('10.2.3.4') + 'fooBAR' + '\0') self.assertEqual(self.sock.transport.value(), struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) self.assert_(self.sock.transport.stringTCPTransport_closing) self.assertIdentical(self.sock.driver_listen, None) def test_eofRemote(self): self.sock.dataReceived( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # connect incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) self.assertNotIdentical(incoming, None) incoming.transport = StringTCPTransport() incoming.connectionMade() # now we should have the second reply packet sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 0) + socket.inet_aton('0.0.0.0')) self.assert_(not self.sock.transport.stringTCPTransport_closing) # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(incoming.transport.value(), 'hello, world') # now close it from the server side incoming.transport.loseConnection() incoming.connectionLost('fake reason') def test_eofLocal(self): self.sock.dataReceived( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # connect incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345)) self.assertNotIdentical(incoming, None) incoming.transport = StringTCPTransport() incoming.connectionMade() # now we should have the second reply packet sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 90, 0) + socket.inet_aton('0.0.0.0')) self.assert_(not self.sock.transport.stringTCPTransport_closing) # pass some data through self.sock.dataReceived('hello, world') self.assertEqual(incoming.transport.value(), 'hello, world') # now close it from the client side self.sock.connectionLost('fake reason') def test_badSource(self): self.sock.dataReceived( struct.pack('!BBH', 4, 2, 34) + socket.inet_aton('1.2.3.4') + 'fooBAR' + '\0') sent = self.sock.transport.value() self.sock.transport.clear() # connect from WRONG address incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666)) self.assertIdentical(incoming, None) # Now we should have the second reply packet and it should # be a failure. The connection should be closing. sent = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(sent, struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0')) self.assert_(self.sock.transport.stringTCPTransport_closing)