Найти - Пользователи
Полная версия: Высоко нагуженный xml-rpc сервер
Начало » Network » Высоко нагуженный xml-rpc сервер
1
minotavr_x86
Есть сервер, который создает отчеты. Отчет создается от 10 до 60 секунд.
Сервер падает, если
1. число соединений больше 300
2. клиент отсоединяется до получения ответа
Главная проблема это то что отчеты должны делаться максимум в 5 потоков.
Подскажите как переделать сервер.
#-*-coding:utf-8-*-
'''
Created on 11.11.2011

@author: minotavr_x86
'''
import thread
from Queue import Queue
import signal
import SocketServer
import SimpleXMLRPCServer
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
from utils import create_hash
from psycopg2 import connect
from time import sleep
from report_generator import ReportGen
import os, sys
import optparse
from options import *

# TODO: Перезапуск офиса

class SimpleThreadedXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer.SimpleXMLRPCServer):
pass

class ReportTask():
def __init__(self,task):
self.is_active = True
self.task = task
self.result = None

def set_active(self, result):
self.result = result
self.is_active = False
return True

def get_task(self):
return self.task

def get_active(self):
return self.is_active

def get_result(self):
return self.result

class RFunc():
def new_report(self,task):
if lock_client:
return {'error':1, 'error_msg':'server stoping'}
r = ReportTask(task)
q.put(r)
while True:
if not r.get_active():
return r.get_result()
sleep(1)


class RequestHandler(SimpleXMLRPCRequestHandler):
def authorize(self):
import base64
authorization = self.headers.get('authorization', ' ')
scheme, challenge = authorization.split(' ', 1)
if scheme.lower() == 'basic':
decoded = base64.decodestring(challenge)
if ':' in decoded:
username, password = decoded.split(':')
con=connect ("""dbname = report_oo_dev
user = report_serv
host = 10.29.0.30
password= c2hd2h
""")
cur=con.cursor()
cur.execute("SELECT name,passwd FROM users")
otv = cur.fetchall()
if (username, password) in otv:
return True
return False
def do_POST(self):
if self.authorize():
_data = self.rfile.read(int(self.headers["content-length"]))
print type(_data)
print _data
_data = _data.decode('utf-8')
response = self.server._marshaled_dispatch(_data, getattr(self, '_dispatch', None))
self.send_response(200)
print str(len(response))
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
self.wfile.flush()
self.connection.shutdown(1)
SimpleXMLRPCRequestHandler.do_POST(self)
else:
self.send_error(403, 'не авторизованы')


def RPCServ(host, port):
s = SimpleThreadedXMLRPCServer((host,port), logRequests=True, requestHandler=RequestHandler)
s.register_instance(RFunc())
s.serve_forever()
print 'Start report server on ' + host + ':' + str(port)

def create_report(q,lock,host,port):
while True:
task = q.get()
if task == 'exit':
lock.release()
break
# {name_templ:<>,templ:<>, vars:{}, format:odt/pdf}
tmp_name_template = None
name_templ = None
item = task.get_task()
if 'templ' in item:
name_templ = tmp_name_templ = os.path.join(PATH_TMP,'templ-' +create_hash() + '.odt')
tmp_file = open(tmp_name_templ,'wb')
tmp_file.write(item['templ'].data)
tmp_file.close()
elif 'name_templ' in item:
name_templ = os.path.join(PATH_TEMPLATE, item['name_templ'])
if 'vars' in item and name_templ:
name_result = os.path.join(PATH_TMP, 'result-' +create_hash() + '.odt')
from com.sun.star.uno import Exception as UnoException
try:
report = ReportGen(host = host,port = port)
if report.open(template = name_templ):
report.render(item['vars'])
report.iteration(item['vars'])
report.render_tables(item['vars'])
if item['format'] == 'pdf':
report.save_as_PDF(name_result)
else:
report.save(name_result)
report.close()
tmp_file = open(name_result, 'rb')
tmp_result = xmlrpclib.Binary(tmp_file.read())
tmp_file.close()
task.set_active({'error':0, 'error_msg':'','result':tmp_result})
os.remove(name_result)
else:
task.set_active({'error':3, 'error_msg':'ненайден или неверен фаил шаблона'})
except UnoException as e:
task.set_active({'error':999, 'error_msg':e})
if 'templ' in item:
os.remove(tmp_name_templ)
else:
task.set_active({'error':2, 'error_msg':'неверное количество параметров'})

def stop(signum, frame):
print '\nExiting'
lock_client = True
sleep(10)
for i in range(max_oo):
q.put('exit')
while(any([l.locked() for l in lock_list])):
sleep(10)
sleep(20)
raise IOError

if __name__ == '__main__':
p = optparse.OptionParser(usage="usage: %prog [options] --host 10.29.29.12 --port 8081", version="%prog 0.1")
p.add_option('-p','--port',action='store',dest='port', default=8081,type="int", help=u"IP адрес сервера")
p.add_option('--host',action='store',dest='host', type='string', default='127.0.0.1', help=u"Порт сервера")
p.add_option('--threads',action='store',dest='max_oo', type='int', default=1, help=u"количество потоков Office")
opts, args = p.parse_args()
host = opts.host
port = opts.port
max_oo = opts.max_oo

lock_client = False
q = Queue()
lock_list = []


if max_oo >= len(hosts_oo):
print u'Слишком много потоков Office. Максимальное количество ' + str(len(hosts_oo))
sys.exit(0)

for i in range(max_oo):
lock = thread.allocate_lock()
lock.acquire()
lock_list.append(lock)
thread.start_new_thread(create_report,(q,lock,hosts_oo[i]['host'],hosts_oo[i]['port']))

thread.start_new_thread(RPCServ,(host,port))
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
try:
while True:
sleep(10000)
except IOError:
print 'Stop server'
s0rg
minotavr_x86
Сервер падает,
Как именно он падает?

minotavr_x86
1. число соединений больше 300
На какое количество клиентов вы рассчитываете?
minotavr_x86
s0rg
Как именно он падает?
Сервер перестает отвечать на запросы, но процесс продолжает работать.
В консое пишет
Traceback (most recent call last):
File "/usr/lib/python2.7/SocketServer.py", line 582, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python2.7/SocketServer.py", line 323, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python2.7/SocketServer.py", line 641, in __init__
self.finish()
File "/usr/lib/python2.7/SocketServer.py", line 694, in finish
self.wfile.flush()
File "/usr/lib/python2.7/socket.py", line 303, in flush
self._sock.sendall(view[write_offset:write_offset+buffer_size])
error: [Errno 32] Broken pipe
s0rg
На какое количество клиентов вы рассчитываете?
500
s0rg
Ну давайте рассуждать )

Я бы под такую задачу взял Twisted (сейчас активно использую его в нескольких проэктах и очень доволен им) в нем также имеется возможность выполнять ‘тяжелые’ по ресурсам задачи в отдельных процессах, что как правило, эффективнее потоков.

Если вы не готовы/не хотите осваивать новый фреймворк - то я бы советовал все-таки перенести процесс генерации в отдельные процессы и подумать о системе кеширования для отчетов. Например если клиент отвалился - то он скорее всего полезет за тем же отчетом и отдавать готовое проще чем собирать заново. В пользу процессов еще говорит ваше требование о 5 генераторах отчетов - пул процессов очень удобная штука для таких вещей.

Отваливаются клиенты - вполне возможно что процесс генерации отчета выходит за рамки тайм-аута для сокета, выставлен ли SO_KEEPALIVE? Каковы тайм-ауты?
o7412369815963
xmlRPC в питоне вроде как блокирующий… т.е. тут все отчеты идут последовательно?


Я бы сделал асинхронное “общение” (это как бы “штатный” режим xml-rpc):

Клиент подключился на 0,001 сек, оставил заявку - получил ИД, отключился.
Подключился через минуту - забрал результат по ИД.- (тоже быстро - зависит от объема)

Сервер: принял заказ, запустил процесс-отчет, положил ссылку в массив,
процесс-отчет отработал - сложил в файлик.
Сервер по требованию берет файлик, отправляет клиенту.

итого - можно “держать” хоть 1000 хоть 10000 клиентов (зависит от передаваемых объемов)
Lexander
SimpleXMLRPCServer все таки не предназначен для больших нагрузок.
Присоединяюсь к s0rg за замену на Twisted.

Но, если заменить нельзя, то вариант o7412369815963 очень хорош.
minotavr_x86
На twisted я побывал переходить, но появлялась большая проблема я не никак не могу уловить суть в документации и понять как в нем писать.
Если кто нибудь может показать хотя бы макет кода моего сервера, то буду очень признателен.
s0rg
Мне помогло вот это:
http://krondo.com/?page_id=1327
minotavr_x86
С xmlrpc с горем пополам разобрался, но twisted так и не поддается.
Подскажите как сделать еще один потоки которые будет принимать задания от клиентов и возвращать результат.
Количество потоков create_report должно быть определенное количество и они должны делить задания между собой
from twisted.web import xmlrpc, server, http
import xmlrpclib
from psycopg2 import connect
from twisted.internet import defer
import os
Fault = xmlrpclib.Fault

class ReportXMLRPC(xmlrpc.XMLRPC):
def xmlrpc_new_report(self,task):
# Обращение к create_report
return True

def xmlrpc_update_template(self):
os.popen('hg up ~/templates')
return True

def create_report():
report = True
#Создание отчета
return report

if __name__ == '__main__':
from twisted.internet import reactor
r = ReportXMLRPC()
reactor.listenTCP(7080, server.Site(r))
reactor.run()
minotavr_x86
Написал вот такую штуку. Она работает, но мне кажется что написано как то не так.
Подскажите как лучше.
def xmlrpc_new_report(self,task):      
deferred = defer.Deferred()
def _gotContent(content):
deferred.callback(content)
deferToThread(lambda : create_report(task)).addCallbacks(_gotContent, deferred.errback)
return deferred
P.S. Преведущий вопрос до сих пор открыт. Может я конечно что то не так объяснил, так что выложу схему
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB