Сервер падает, если
1. число соединений больше 300
2. клиент отсоединяется до получения ответа
Главная проблема это то что отчеты должны делаться максимум в 5 потоков.
Подскажите как переделать сервер.
#-*-coding:utf-8-*-
'''
Created on 11.11.2011
@author: minotavr_x86
'''
import thread
from Queue import Queue
import signal
import SocketServer
import SimpleXMLRPCServer
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
from utils import create_hash
from psycopg2 import connect
from time import sleep
from report_generator import ReportGen
import os, sys
import optparse
from options import *
# TODO: Перезапуск офиса
class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
pass
class ReportTask():
def __init__(self,task):
self.is_active = True
self.task = task
self.result = None
def set_active(self, result):
self.result = result
self.is_active = False
return True
def get_task(self):
return self.task
def get_active(self):
return self.is_active
def get_result(self):
return self.result
class RFunc():
def new_report(self,task):
if lock_client:
return {'error':1, 'error_msg':'server stoping'}
r = ReportTask(task)
q.put(r)
while True:
if not r.get_active():
return r.get_result()
sleep(1)
class RequestHandler(SimpleXMLRPCRequestHandler):
def authorize(self):
import base64
authorization = self.headers.get('authorization', ' ')
scheme, challenge = authorization.split(' ', 1)
if scheme.lower() == 'basic':
decoded = base64.decodestring(challenge)
if ':' in decoded:
username, password = decoded.split(':')
con=connect ("""dbname = report_oo_dev
user = report_serv
host = 10.29.0.30
password= c2hd2h
""")
cur=con.cursor()
cur.execute("SELECT name,passwd FROM users")
otv = cur.fetchall()
if (username, password) in otv:
return True
return False
def do_POST(self):
if self.authorize():
_data = self.rfile.read(int(self.headers["content-length"]))
print type(_data)
print _data
_data = _data.decode('utf-8')
response = self.server._marshaled_dispatch(_data, getattr(self, '_dispatch', None))
self.send_response(200)
print str(len(response))
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
self.wfile.flush()
self.connection.shutdown(1)
SimpleXMLRPCRequestHandler.do_POST(self)
else:
self.send_error(403, 'не авторизованы')
def RPCServ(host, port):
s = SimpleThreadedXMLRPCServer((host,port), logRequests=True, requestHandler=RequestHandler)
s.register_instance(RFunc())
s.serve_forever()
print 'Start report server on ' + host + ':' + str(port)
def create_report(q,lock,host,port):
while True:
task = q.get()
if task == 'exit':
lock.release()
break
# {name_templ:<>,templ:<>, vars:{}, format:odt/pdf}
tmp_name_template = None
name_templ = None
item = task.get_task()
if 'templ' in item:
name_templ = tmp_name_templ = os.path.join(PATH_TMP,'templ-' +create_hash() + '.odt')
tmp_file = open(tmp_name_templ,'wb')
tmp_file.write(item['templ'].data)
tmp_file.close()
elif 'name_templ' in item:
name_templ = os.path.join(PATH_TEMPLATE, item['name_templ'])
if 'vars' in item and name_templ:
name_result = os.path.join(PATH_TMP, 'result-' +create_hash() + '.odt')
from com.sun.star.uno import Exception as UnoException
try:
report = ReportGen(host = host,port = port)
if report.open(template = name_templ):
report.render(item['vars'])
report.iteration(item['vars'])
report.render_tables(item['vars'])
if item['format'] == 'pdf':
report.save_as_PDF(name_result)
else:
report.save(name_result)
report.close()
tmp_file = open(name_result, 'rb')
tmp_result = xmlrpclib.Binary(tmp_file.read())
tmp_file.close()
task.set_active({'error':0, 'error_msg':'','result':tmp_result})
os.remove(name_result)
else:
task.set_active({'error':3, 'error_msg':'ненайден или неверен фаил шаблона'})
except UnoException as e:
task.set_active({'error':999, 'error_msg':e})
if 'templ' in item:
os.remove(tmp_name_templ)
else:
task.set_active({'error':2, 'error_msg':'неверное количество параметров'})
def stop(signum, frame):
print '\nExiting'
lock_client = True
sleep(10)
for i in range(max_oo):
q.put('exit')
while(any([l.locked() for l in lock_list])):
sleep(10)
sleep(20)
raise IOError
if __name__ == '__main__':
p = optparse.OptionParser(usage="usage: %prog [options] --host 10.29.29.12 --port 8081", version="%prog 0.1")
p.add_option('-p','--port',action='store',dest='port', default=8081,type="int", help=u"IP адрес сервера")
p.add_option('--host',action='store',dest='host', type='string', default='127.0.0.1', help=u"Порт сервера")
p.add_option('--threads',action='store',dest='max_oo', type='int', default=1, help=u"количество потоков Office")
opts, args = p.parse_args()
host = opts.host
port = opts.port
max_oo = opts.max_oo
lock_client = False
q = Queue()
lock_list = []
if max_oo >= len(hosts_oo):
print u'Слишком много потоков Office. Максимальное количество ' + str(len(hosts_oo))
sys.exit(0)
for i in range(max_oo):
lock = thread.allocate_lock()
lock.acquire()
lock_list.append(lock)
thread.start_new_thread(create_report,(q,lock,hosts_oo[i]['host'],hosts_oo[i]['port']))
thread.start_new_thread(RPCServ,(host,port))
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
try:
while True:
sleep(10000)
except IOError:
print 'Stop server'