Форум сайта python.su
Есть сервер, который создает отчеты. Отчет создается от 10 до 60 секунд.
Сервер падает, если
1. число соединений больше 300
2. клиент отсоединяется до получения ответа
Главная проблема это то что отчеты должны делаться максимум в 5 потоков.
Подскажите как переделать сервер.
#-*-coding:utf-8-*-
'''
Created on 11.11.2011
@author: minotavr_x86
'''
import thread
from Queue import Queue
import signal
import SocketServer
import SimpleXMLRPCServer
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
from utils import create_hash
from psycopg2 import connect
from time import sleep
from report_generator import ReportGen
import os, sys
import optparse
from options import *
# TODO: Перезапуск офиса
class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
pass
class ReportTask():
def __init__(self,task):
self.is_active = True
self.task = task
self.result = None
def set_active(self, result):
self.result = result
self.is_active = False
return True
def get_task(self):
return self.task
def get_active(self):
return self.is_active
def get_result(self):
return self.result
class RFunc():
def new_report(self,task):
if lock_client:
return {'error':1, 'error_msg':'server stoping'}
r = ReportTask(task)
q.put(r)
while True:
if not r.get_active():
return r.get_result()
sleep(1)
class RequestHandler(SimpleXMLRPCRequestHandler):
def authorize(self):
import base64
authorization = self.headers.get('authorization', ' ')
scheme, challenge = authorization.split(' ', 1)
if scheme.lower() == 'basic':
decoded = base64.decodestring(challenge)
if ':' in decoded:
username, password = decoded.split(':')
con=connect ("""dbname = report_oo_dev
user = report_serv
host = 10.29.0.30
password= c2hd2h
""")
cur=con.cursor()
cur.execute("SELECT name,passwd FROM users")
otv = cur.fetchall()
if (username, password) in otv:
return True
return False
def do_POST(self):
if self.authorize():
_data = self.rfile.read(int(self.headers["content-length"]))
print type(_data)
print _data
_data = _data.decode('utf-8')
response = self.server._marshaled_dispatch(_data, getattr(self, '_dispatch', None))
self.send_response(200)
print str(len(response))
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
self.wfile.flush()
self.connection.shutdown(1)
SimpleXMLRPCRequestHandler.do_POST(self)
else:
self.send_error(403, 'не авторизованы')
def RPCServ(host, port):
s = SimpleThreadedXMLRPCServer((host,port), logRequests=True, requestHandler=RequestHandler)
s.register_instance(RFunc())
s.serve_forever()
print 'Start report server on ' + host + ':' + str(port)
def create_report(q,lock,host,port):
while True:
task = q.get()
if task == 'exit':
lock.release()
break
# {name_templ:<>,templ:<>, vars:{}, format:odt/pdf}
tmp_name_template = None
name_templ = None
item = task.get_task()
if 'templ' in item:
name_templ = tmp_name_templ = os.path.join(PATH_TMP,'templ-' +create_hash() + '.odt')
tmp_file = open(tmp_name_templ,'wb')
tmp_file.write(item['templ'].data)
tmp_file.close()
elif 'name_templ' in item:
name_templ = os.path.join(PATH_TEMPLATE, item['name_templ'])
if 'vars' in item and name_templ:
name_result = os.path.join(PATH_TMP, 'result-' +create_hash() + '.odt')
from com.sun.star.uno import Exception as UnoException
try:
report = ReportGen(host = host,port = port)
if report.open(template = name_templ):
report.render(item['vars'])
report.iteration(item['vars'])
report.render_tables(item['vars'])
if item['format'] == 'pdf':
report.save_as_PDF(name_result)
else:
report.save(name_result)
report.close()
tmp_file = open(name_result, 'rb')
tmp_result = xmlrpclib.Binary(tmp_file.read())
tmp_file.close()
task.set_active({'error':0, 'error_msg':'','result':tmp_result})
os.remove(name_result)
else:
task.set_active({'error':3, 'error_msg':'ненайден или неверен фаил шаблона'})
except UnoException as e:
task.set_active({'error':999, 'error_msg':e})
if 'templ' in item:
os.remove(tmp_name_templ)
else:
task.set_active({'error':2, 'error_msg':'неверное количество параметров'})
def stop(signum, frame):
print '\nExiting'
lock_client = True
sleep(10)
for i in range(max_oo):
q.put('exit')
while(any([l.locked() for l in lock_list])):
sleep(10)
sleep(20)
raise IOError
if __name__ == '__main__':
p = optparse.OptionParser(usage="usage: %prog [options] --host 10.29.29.12 --port 8081", version="%prog 0.1")
p.add_option('-p','--port',action='store',dest='port', default=8081,type="int", help=u"IP адрес сервера")
p.add_option('--host',action='store',dest='host', type='string', default='127.0.0.1', help=u"Порт сервера")
p.add_option('--threads',action='store',dest='max_oo', type='int', default=1, help=u"количество потоков Office")
opts, args = p.parse_args()
host = opts.host
port = opts.port
max_oo = opts.max_oo
lock_client = False
q = Queue()
lock_list = []
if max_oo >= len(hosts_oo):
print u'Слишком много потоков Office. Максимальное количество ' + str(len(hosts_oo))
sys.exit(0)
for i in range(max_oo):
lock = thread.allocate_lock()
lock.acquire()
lock_list.append(lock)
thread.start_new_thread(create_report,(q,lock,hosts_oo[i]['host'],hosts_oo[i]['port']))
thread.start_new_thread(RPCServ,(host,port))
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
try:
while True:
sleep(10000)
except IOError:
print 'Stop server'
Офлайн
minotavr_x86Как именно он падает?
Сервер падает,
minotavr_x86На какое количество клиентов вы рассчитываете?
1. число соединений больше 300
Офлайн
s0rgСервер перестает отвечать на запросы, но процесс продолжает работать.
Как именно он падает?
Traceback (most recent call last):
File "/usr/lib/python2.7/SocketServer.py", line 582, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python2.7/SocketServer.py", line 323, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python2.7/SocketServer.py", line 641, in __init__
self.finish()
File "/usr/lib/python2.7/SocketServer.py", line 694, in finish
self.wfile.flush()
File "/usr/lib/python2.7/socket.py", line 303, in flush
self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe
s0rg500
На какое количество клиентов вы рассчитываете?
Офлайн
Ну давайте рассуждать )
Я бы под такую задачу взял Twisted (сейчас активно использую его в нескольких проэктах и очень доволен им) в нем также имеется возможность выполнять ‘тяжелые’ по ресурсам задачи в отдельных процессах, что как правило, эффективнее потоков.
Если вы не готовы/не хотите осваивать новый фреймворк - то я бы советовал все-таки перенести процесс генерации в отдельные процессы и подумать о системе кеширования для отчетов. Например если клиент отвалился - то он скорее всего полезет за тем же отчетом и отдавать готовое проще чем собирать заново. В пользу процессов еще говорит ваше требование о 5 генераторах отчетов - пул процессов очень удобная штука для таких вещей.
Отваливаются клиенты - вполне возможно что процесс генерации отчета выходит за рамки тайм-аута для сокета, выставлен ли SO_KEEPALIVE? Каковы тайм-ауты?
Офлайн
xmlRPC в питоне вроде как блокирующий… т.е. тут все отчеты идут последовательно?
Я бы сделал асинхронное “общение” (это как бы “штатный” режим xml-rpc):
Клиент подключился на 0,001 сек, оставил заявку - получил ИД, отключился.
Подключился через минуту - забрал результат по ИД.- (тоже быстро - зависит от объема)
Сервер: принял заказ, запустил процесс-отчет, положил ссылку в массив,
процесс-отчет отработал - сложил в файлик.
Сервер по требованию берет файлик, отправляет клиенту.
итого - можно “держать” хоть 1000 хоть 10000 клиентов (зависит от передаваемых объемов)
Офлайн
SimpleXMLRPCServer все таки не предназначен для больших нагрузок.
Присоединяюсь к s0rg за замену на Twisted.
Но, если заменить нельзя, то вариант o7412369815963 очень хорош.
Офлайн
На twisted я побывал переходить, но появлялась большая проблема я не никак не могу уловить суть в документации и понять как в нем писать.
Если кто нибудь может показать хотя бы макет кода моего сервера, то буду очень признателен.
Офлайн
Мне помогло вот это:
http://krondo.com/?page_id=1327
Офлайн
С xmlrpc с горем пополам разобрался, но twisted так и не поддается.
Подскажите как сделать еще один потоки которые будет принимать задания от клиентов и возвращать результат.
Количество потоков create_report должно быть определенное количество и они должны делить задания между собой
from twisted.web import xmlrpc, server, http
import xmlrpclib
from psycopg2 import connect
from twisted.internet import defer
import os
Fault = xmlrpclib.Fault
class ReportXMLRPC(xmlrpc.XMLRPC):
def xmlrpc_new_report(self,task):
# Обращение к create_report
return True
def xmlrpc_update_template(self):
os.popen('hg up ~/templates')
return True
def create_report():
report = True
#Создание отчета
return report
if __name__ == '__main__':
from twisted.internet import reactor
r = ReportXMLRPC()
reactor.listenTCP(7080, server.Site(r))
reactor.run()
Офлайн
Написал вот такую штуку. Она работает, но мне кажется что написано как то не так.
Подскажите как лучше.
def xmlrpc_new_report(self,task):
deferred = defer.Deferred()
def _gotContent(content):
deferred.callback(content)
deferToThread(lambda : create_report(task)).addCallbacks(_gotContent, deferred.errback)
return deferred
Отредактировано (Дек. 27, 2011 13:39:12)
Офлайн