Уведомления

Группа в Telegram: @pythonsu

#1 Дек. 23, 2011 07:56:50

minotavr_x86
От:
Зарегистрирован: 2010-05-21
Сообщения: 69
Репутация: +  0  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

Есть сервер, который создает отчеты. Отчет создается от 10 до 60 секунд.
Сервер падает, если
1. число соединений больше 300
2. клиент отсоединяется до получения ответа
Главная проблема это то что отчеты должны делаться максимум в 5 потоков.
Подскажите как переделать сервер.

#-*-coding:utf-8-*-
'''
Created on 11.11.2011

@author: minotavr_x86
'''
import thread
from Queue import Queue
import signal
import SocketServer
import SimpleXMLRPCServer
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
from utils import create_hash
from psycopg2 import connect
from time import sleep
from report_generator import ReportGen
import os, sys
import optparse
from options import *

# TODO: Перезапуск офиса

class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
pass

class ReportTask():
def __init__(self,task):
self.is_active = True
self.task = task
self.result = None

def set_active(self, result):
self.result = result
self.is_active = False
return True

def get_task(self):
return self.task

def get_active(self):
return self.is_active

def get_result(self):
return self.result

class RFunc():
def new_report(self,task):
if lock_client:
return {'error':1, 'error_msg':'server stoping'}
r = ReportTask(task)
q.put(r)
while True:
if not r.get_active():
return r.get_result()
sleep(1)


class RequestHandler(SimpleXMLRPCRequestHandler):
def authorize(self):
import base64
authorization = self.headers.get('authorization', ' ')
scheme, challenge = authorization.split(' ', 1)
if scheme.lower() == 'basic':
decoded = base64.decodestring(challenge)
if ':' in decoded:
username, password = decoded.split(':')
con=connect ("""dbname = report_oo_dev
user = report_serv
host = 10.29.0.30
password= c2hd2h
""")
cur=con.cursor()
cur.execute("SELECT name,passwd FROM users")
otv = cur.fetchall()
if (username, password) in otv:
return True
return False
def do_POST(self):
if self.authorize():
_data = self.rfile.read(int(self.headers["content-length"]))
print type(_data)
print _data
_data = _data.decode('utf-8')
response = self.server._marshaled_dispatch(_data, getattr(self, '_dispatch', None))
self.send_response(200)
print str(len(response))
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
self.wfile.flush()
self.connection.shutdown(1)
SimpleXMLRPCRequestHandler.do_POST(self)
else:
self.send_error(403, 'не авторизованы')


def RPCServ(host, port):
s = SimpleThreadedXMLRPCServer((host,port), logRequests=True, requestHandler=RequestHandler)
s.register_instance(RFunc())
s.serve_forever()
print 'Start report server on ' + host + ':' + str(port)

def create_report(q,lock,host,port):
while True:
task = q.get()
if task == 'exit':
lock.release()
break
# {name_templ:<>,templ:<>, vars:{}, format:odt/pdf}
tmp_name_template = None
name_templ = None
item = task.get_task()
if 'templ' in item:
name_templ = tmp_name_templ = os.path.join(PATH_TMP,'templ-' +create_hash() + '.odt')
tmp_file = open(tmp_name_templ,'wb')
tmp_file.write(item['templ'].data)
tmp_file.close()
elif 'name_templ' in item:
name_templ = os.path.join(PATH_TEMPLATE, item['name_templ'])
if 'vars' in item and name_templ:
name_result = os.path.join(PATH_TMP, 'result-' +create_hash() + '.odt')
from com.sun.star.uno import Exception as UnoException
try:
report = ReportGen(host = host,port = port)
if report.open(template = name_templ):
report.render(item['vars'])
report.iteration(item['vars'])
report.render_tables(item['vars'])
if item['format'] == 'pdf':
report.save_as_PDF(name_result)
else:
report.save(name_result)
report.close()
tmp_file = open(name_result, 'rb')
tmp_result = xmlrpclib.Binary(tmp_file.read())
tmp_file.close()
task.set_active({'error':0, 'error_msg':'','result':tmp_result})
os.remove(name_result)
else:
task.set_active({'error':3, 'error_msg':'ненайден или неверен фаил шаблона'})
except UnoException as e:
task.set_active({'error':999, 'error_msg':e})
if 'templ' in item:
os.remove(tmp_name_templ)
else:
task.set_active({'error':2, 'error_msg':'неверное количество параметров'})

def stop(signum, frame):
print '\nExiting'
lock_client = True
sleep(10)
for i in range(max_oo):
q.put('exit')
while(any([l.locked() for l in lock_list])):
sleep(10)
sleep(20)
raise IOError

if __name__ == '__main__':
p = optparse.OptionParser(usage="usage: %prog [options] --host 10.29.29.12 --port 8081", version="%prog 0.1")
p.add_option('-p','--port',action='store',dest='port', default=8081,type="int", help=u"IP адрес сервера")
p.add_option('--host',action='store',dest='host', type='string', default='127.0.0.1', help=u"Порт сервера")
p.add_option('--threads',action='store',dest='max_oo', type='int', default=1, help=u"количество потоков Office")
opts, args = p.parse_args()
host = opts.host
port = opts.port
max_oo = opts.max_oo

lock_client = False
q = Queue()
lock_list = []


if max_oo >= len(hosts_oo):
print u'Слишком много потоков Office. Максимальное количество ' + str(len(hosts_oo))
sys.exit(0)

for i in range(max_oo):
lock = thread.allocate_lock()
lock.acquire()
lock_list.append(lock)
thread.start_new_thread(create_report,(q,lock,hosts_oo[i]['host'],hosts_oo[i]['port']))

thread.start_new_thread(RPCServ,(host,port))
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
try:
while True:
sleep(10000)
except IOError:
print 'Stop server'



Офлайн

#2 Дек. 23, 2011 08:12:29

s0rg
От:
Зарегистрирован: 2011-06-05
Сообщения: 777
Репутация: +  25  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

minotavr_x86
Сервер падает,
Как именно он падает?

minotavr_x86
1. число соединений больше 300
На какое количество клиентов вы рассчитываете?

Офлайн

#3 Дек. 23, 2011 08:48:35

minotavr_x86
От:
Зарегистрирован: 2010-05-21
Сообщения: 69
Репутация: +  0  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

s0rg
Как именно он падает?
Сервер перестает отвечать на запросы, но процесс продолжает работать.
В консое пишет
Traceback (most recent call last):
File "/usr/lib/python2.7/SocketServer.py", line 582, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python2.7/SocketServer.py", line 323, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python2.7/SocketServer.py", line 641, in __init__
self.finish()
File "/usr/lib/python2.7/SocketServer.py", line 694, in finish
self.wfile.flush()
File "/usr/lib/python2.7/socket.py", line 303, in flush
self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe
s0rg
На какое количество клиентов вы рассчитываете?
500



Офлайн

#4 Дек. 23, 2011 12:15:34

s0rg
От:
Зарегистрирован: 2011-06-05
Сообщения: 777
Репутация: +  25  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

Ну давайте рассуждать )

Я бы под такую задачу взял Twisted (сейчас активно использую его в нескольких проэктах и очень доволен им) в нем также имеется возможность выполнять ‘тяжелые’ по ресурсам задачи в отдельных процессах, что как правило, эффективнее потоков.

Если вы не готовы/не хотите осваивать новый фреймворк - то я бы советовал все-таки перенести процесс генерации в отдельные процессы и подумать о системе кеширования для отчетов. Например если клиент отвалился - то он скорее всего полезет за тем же отчетом и отдавать готовое проще чем собирать заново. В пользу процессов еще говорит ваше требование о 5 генераторах отчетов - пул процессов очень удобная штука для таких вещей.

Отваливаются клиенты - вполне возможно что процесс генерации отчета выходит за рамки тайм-аута для сокета, выставлен ли SO_KEEPALIVE? Каковы тайм-ауты?

Офлайн

#5 Дек. 23, 2011 15:41:08

o7412369815963
От:
Зарегистрирован: 2009-06-17
Сообщения: 1986
Репутация: +  32  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

xmlRPC в питоне вроде как блокирующий… т.е. тут все отчеты идут последовательно?


Я бы сделал асинхронное “общение” (это как бы “штатный” режим xml-rpc):

Клиент подключился на 0,001 сек, оставил заявку - получил ИД, отключился.
Подключился через минуту - забрал результат по ИД.- (тоже быстро - зависит от объема)

Сервер: принял заказ, запустил процесс-отчет, положил ссылку в массив,
процесс-отчет отработал - сложил в файлик.
Сервер по требованию берет файлик, отправляет клиенту.

итого - можно “держать” хоть 1000 хоть 10000 клиентов (зависит от передаваемых объемов)

Офлайн

#6 Дек. 23, 2011 20:28:13

Lexander
От:
Зарегистрирован: 2008-09-19
Сообщения: 1139
Репутация: +  33  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

SimpleXMLRPCServer все таки не предназначен для больших нагрузок.
Присоединяюсь к s0rg за замену на Twisted.

Но, если заменить нельзя, то вариант o7412369815963 очень хорош.



Офлайн

#7 Дек. 26, 2011 06:07:56

minotavr_x86
От:
Зарегистрирован: 2010-05-21
Сообщения: 69
Репутация: +  0  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

На twisted я побывал переходить, но появлялась большая проблема я не никак не могу уловить суть в документации и понять как в нем писать.
Если кто нибудь может показать хотя бы макет кода моего сервера, то буду очень признателен.



Офлайн

#8 Дек. 26, 2011 06:49:49

s0rg
От:
Зарегистрирован: 2011-06-05
Сообщения: 777
Репутация: +  25  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

Мне помогло вот это:
http://krondo.com/?page_id=1327

Офлайн

#9 Дек. 26, 2011 11:09:29

minotavr_x86
От:
Зарегистрирован: 2010-05-21
Сообщения: 69
Репутация: +  0  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

С xmlrpc с горем пополам разобрался, но twisted так и не поддается.
Подскажите как сделать еще один потоки которые будет принимать задания от клиентов и возвращать результат.
Количество потоков create_report должно быть определенное количество и они должны делить задания между собой

from twisted.web import xmlrpc, server, http
import xmlrpclib
from psycopg2 import connect
from twisted.internet import defer
import os
Fault = xmlrpclib.Fault

class ReportXMLRPC(xmlrpc.XMLRPC):
def xmlrpc_new_report(self,task):
# Обращение к create_report
return True

def xmlrpc_update_template(self):
os.popen('hg up ~/templates')
return True

def create_report():
report = True
#Создание отчета
return report

if __name__ == '__main__':
from twisted.internet import reactor
r = ReportXMLRPC()
reactor.listenTCP(7080, server.Site(r))
reactor.run()



Офлайн

#10 Дек. 27, 2011 13:36:10

minotavr_x86
От:
Зарегистрирован: 2010-05-21
Сообщения: 69
Репутация: +  0  -
Профиль   Отправить e-mail  

Высоко нагуженный xml-rpc сервер

Написал вот такую штуку. Она работает, но мне кажется что написано как то не так.
Подскажите как лучше.

def xmlrpc_new_report(self,task):      
deferred = defer.Deferred()
def _gotContent(content):
deferred.callback(content)
deferToThread(lambda : create_report(task)).addCallbacks(_gotContent, deferred.errback)
return deferred
P.S. Преведущий вопрос до сих пор открыт. Может я конечно что то не так объяснил, так что выложу схему



Отредактировано (Дек. 27, 2011 13:39:12)

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version