много лет назад, писал свой сокет с поддержкой соксов и https.
Код страшно кривой, но вдруг….
from socket import socket as old_socket
import socket
import struct
class sup_socket(old_socket):
type = None
#exceptions
#bad tunel format - bad tunel format
#bad target format - bad target format
#bad target - cant connect to target
#bad tunel - cant connect to proxy, or wrong proxy
#blocked - bad tunel or bad target
def set_tunel(self, tunel):
'''
set tunel
parametrs:
tunel - string in format 'ip:port'
type - 'socks4', 'socks5', 'https'
'''
tmp = tunel.split('@')
if len(tmp) != 2:
raise socket.error, 'bad tunel format'
if tmp[1].upper() == 'SOCKS4':
self.type = 'SOCKS4'
elif tmp[1].upper() == 'SOCKS5':
self.type = 'SOCKS5'
elif tmp[1].upper() == 'HTTPS':
self.type = 'HTTPS'
else:
raise socket.error, 'bad tunel format'
tmp = tmp[0].split(':')
if len(tmp) != 2:
raise socket.error, 'bad tunel format'
try:
self.tunel_port = int(tmp[1])
except:
raise socket.error, 'bad tunel format'
try:
socket.inet_aton(tmp[0])
except:
raise socket.error, 'bad tunel format'
else:
self.tunel_ip = tmp[0]
def connect(self, target):
if not self.type:
#classic connect
try:
old_socket.connect(self, target)
except:
raise socket.error, 'bad target'
elif self.type == 'SOCKS4':
#socket4 tunel
dest_host = target[0]
dest_port = target[1]
try:
old_socket.connect(self, (self.tunel_ip, self.tunel_port))
except socket.error:
raise socket.error, 'bad tunel'
version = 4
reply = 1
try:
zap = struct.pack("!BBH",version,reply,dest_port)+socket.inet_aton(dest_host) + struct.pack('x00')
except:
old_socket.close(self)
raise socket.error, 'bad target format'
try:
self.send(zap)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
rez = self.recv(8)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
ver, code, tmp = struct.unpack("!BBH",rez[:4])
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
if ver != 0:
raise socket.error, 'bad tunel'
if code != 90:
if code == 91:
raise socket.error, 'blocked'
else:
raise socket.error, 'bad tunel'
elif self.type == 'SOCKS5':
#socket5 tunel
dest_host = target[0]
dest_port = target[1]
try:
old_socket.connect(self, (self.tunel_ip, self.tunel_port))
except socket.error:
raise socket.error, 'bad tunel'
version = 5
auth = 0
zap = struct.pack("!BB",version,auth)
try:
self.send(zap)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
rez = self.recv(2)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
ver, auth = struct.unpack("!BB",rez[:2])
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
if (ver != 5) or (auth != 0):
old_socket.close(self)
raise socket.error, 'bad tunel'
version = 5
cmd = 1
reserved = 0
ipflag = 1
try:
zap = struct.pack("!BBBB", version, cmd, reserved, ipflag) + socket.inet_aton(dest_host) + struct.pack("!H",dest_port)
except:
old_socket.close(self)
raise socket.error, 'bad target format'
try:
self.send(zap)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
rez = self.recv(4)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
try:
ver, code, rsv, atyp = struct.unpack("!BBBB", rez)
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
if (ver != 5) or (code != 0):
old_socket.close(self)
raise socket.error, 'blocked'
if atyp == 1:
try:
self.recv(6)
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
elif atyp == 4:
try:
self.recv(18)
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
elif atyp == 3:
print '!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
print 'atype = 3!!!'
print '!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
try:
self.recv(3)
except:
old_socket.close(self)
raise socket.error, 'bad tunel'
else:
old_socket.close(self)
raise socket.error, 'bad tunel'
elif self.type == 'HTTPS':
#todo: user-agent for main config
user_agent = 'Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1)'
dest_host = target[0]
dest_port = target[1]
try:
old_socket.connect(self, (self.tunel_ip, self.tunel_port))
except socket.error:
raise socket.error, 'bad tunel'
zap = 'CONNECT %s:%i HTTP/1.1\r\nUser-Agent: %s\r\n\r\n' % (dest_host, dest_port, user_agent)
try:
self.send(zap)
except socket.error:
old_socket.close(self)
raise socket.error, 'bad tunel'
f = self.makefile()
try:
rez = f.readline()
except socket.error:
f.close()
del f
old_socket.close(self)
raise socket.error, 'blocked'
if rez.find('200') == -1:
f.close()
del f
old_socket.close(self)
raise socket.error, 'blocked'
head = ''
while 1:
try:
tmp = f.readline()
except socket.error:
f.close()
del f
old_socket.close(self)
raise socket.error, 'bad tunel'
if not tmp:
f.close()
del f
old_socket.close(self)
raise socket.error, 'bad tunel'
head += tmp
if tmp == '\r\n':
break
if len(head) >= 2048:
f.close()
del f
old_socket.close(self)
raise socket.error, 'bad tunel'
f.close()
del f