Найти - Пользователи
Полная версия: Повторное подключение сокета вызывает некорректное поведение веб-сервера
Начало » Python для экспертов » Повторное подключение сокета вызывает некорректное поведение веб-сервера
1
ioprst
Есть два py скрипта (python 2.7), которые запускаются на linux машине (.service):

1) Процесс П, который обрабатывает некие данные (запускается в GUI).

2) Сервер Flask. Позволяет клиенту через запросы получить актуальные данные из П и внести изменения.

Т.к. сервер должен возвращать данные П, то между ними (двумя процессами) организован обмен через сокеты (не некоторому собственному протоколу).

Два процесса могут быть запущены независимо друг от друга, т.е сначала может быть запущен П, а затем уже сервер, или наоборот. Объединить два процесса в один нельзя. Из-за отсутствия последовательности запуска необходимо организовать “динамическое” соединение через сокет. Т.е., если П активен, а сервер запускается, то сервер подключается к сокету, открытому на П. Иначе, если П запускается, а сервер уже работает, то сервер будет пытаться подключиться к сокету П, пока ему это не удастся сделать (пока нет соединения с сокетом, сервер на любой из запросов будет возвращать статус 500).

Основные части кода

Процесс П включает в себя два метода (вызываются по кнопке в GUI):

process_start запускаются при запуске процесса;

process_stop запускается при остановке процесса.
 from threading import Thread
import socket
import pickle
from datetime import datetime
 
def process_start():
    global working
    working = True
 
    global SocketThread
    SocketThread = Thread(target=SocketThreadProc)
    # SocketThread.daemon = True
    SocketThread.start()
 
def process_stop():
    global working
    working = False
 
    sock.close()
    SocketThread.join()
 
def SocketThreadProc():
    global sock
 
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('localhost', 9090))
    sock.listen(1)
 
    global conn
 
    while working:
        try:
            sock.settimeout(1)
            conn, addr = sock.accept()
            conn.setblocking(1)
            decoder = PacketDecoder()
 
            now = str(datetime.now())
            print "{}: {} connected.".format(now, ":".join(map(str, addr)))
 
            try:
                while working:
                    data = conn.recv(1024)
                    data = bytearray(data)
                    decoder.feed(data)
                    for packet in decoder.decode():
                        process_packet(packet)  #
            except Exception as err:
                print 'Sub try error:', err
            finally:
                conn.close()
                now = str(datetime.now())
                print "{}: {} closed.".format(now, ":".join(map(str, addr)))
        except socket.timeout as err:
            print err
            pass
        except socket.error as err:
            print err
            break
 
def process_packet(packet):
    _, set_method, _ = PacketDecoder.parse_header(packet[:8])
    data = pickle.loads(packet[8:])
 
    if not set_method:
        # GET
        # разбор пакета, формирование пакета для ответа
        ...
        conn.send(resp_packet)  # отправка пакета серверу
    else:
        # SET
        # разбор пакета, внесение изменений в данные процесса П
        ...
Сервер Flask

Два метода:

get вызывается каждые 2 сек (интервал задается). Формирует пакет, отправляет в сокет, получает ответ от процесса П и возвращает результат клиенту.

set вызывается при взаимодействии пользователя с формой (например, кнопка). Формирует пакет, отправляет серверу. Клиенту возвращает 200, если все хорошо.
 @app.route('/get-data', methods=['POST'])
def get():
    global sock_connected
 
    if not sock_connected:
        socket_connection()
        return Response({'success': False}, 500, {'ContentType': 'application/json'})
 
    # формирование пакета package
    ...
    try:
        sock.send(package)
 
        # прием ответа от процесса П
        data = bytearray(sock.recv(1024))
        llength = list(data[:3])
        length = (llength[0] << 16) | (llength[1] << 8) | llength[2]
        data = data[3:]
        while len(data) != length:
            data += bytearray(sock.recv(1024))
        data = pickle.loads(data)
 
        res = dict()
        for d in data["data"]:
            res[d['name']] = d['value']
        return json.dumps(res)
    except socket.error:
        # [10053] errno.WSAECONNABORTED стандартная ошибки
        sock.close()
        sock_connected = False
        return Response({'success': False}, 500, {'ContentType': 'application/json'})
 
 
@app.route('/set-data', methods=['POST'])
def set():
    global sock_connected
 
    if not sock_connected:
        socket_connection()
        return Response({'success': False}, 500, {'ContentType': 'application/json'})
 
    # формирование пакета package
    ...
    try:
        sock.send(package)
        return Response({'success': True}, 200, {'ContentType': 'application/json'})
    except socket.error:
        # [10053] errno.WSAECONNABORTED стандартная ошибки
        sock.close()
        sock_connected = False
        return Response({'success': False}, 500, {'ContentType': 'application/json'})
 
 
def socket_connection():
    global sock
    global sock_connected
    sock_connected = False
 
    sock = socket.socket()
    try:
        sock.connect(('localhost', 9090))
        sock_connected = True
    except socket.error as error:
        if error.errno == errno.ECONNREFUSED:
            print('Error: ECONNREFUSED')
        else:
            print(error)
    print 'STATUS SOCKET CONNECTED:', sock_connected
 
if __name__ == '__main__':
    socket_connection()
    app.run(host='0.0.0.0')
Если изначально запущен сервер, и я запускаю/останавливаю процесс П, то все отлично, если П отключен, сервер возвращает код 500, как только П запускается, статус 200.

Если изначально запущен процесс П, то при запуске сервера все тоже отлично.

Проблема возникает тогда, когда запущен процесс П, и я насколько раз запускаю/останавливаю сервер (выше я указал, что первый запуск проблем не вызывает). Не смотря на то, что серверу удается подключиться к сокету процесса П (это видно в логах: функция socket_connection выдает STATUS SOCKET CONNECTED: True), сервер работает некорректно: вызов get-data (каждые 2 сек) происходит, но не обрабатывается. В логах Flask не отображается, что пришел запрос get-data. В отладчике хрома (F12 - Network) все запросы get-data красные, а статус - “canceled”.

А вот если кликнуть на кнопку, то set-data нормально обрабатывается сервером. В логах Flask есть. Статус - 200.

Подскажите, пожалуйста, в чем может быть причина.
py.user.next
ioprst
2) Сервер Flask.
Вообще, он только для отладки применяется.

https://flask.palletsprojects.com/en/1.1.x/deploying/
While lightweight and easy to use, Flask’s built-in server is not suitable for production as it doesn’t scale well.
https://flask.palletsprojects.com/en/1.1.x/tutorial/deploy/#run-with-a-production-server
When running publicly rather than in development, you should not use the built-in development server (flask run). The development server is provided by Werkzeug for convenience, but is not designed to be particularly efficient, stable, or secure.
https://flask.palletsprojects.com/en/1.1.x/deploying/wsgi-standalone/
Короче, gunicorn надо ставить.

ioprst
  
@app.route('/get-data', methods=['POST'])
def get():
    global sock_connected
И глобальные переменные там нельзя делать, так как это не является потокобезопасным. Там есть специальная потокобезопасная переменная flask.g, через неё и идут все обмены между функциями в пределах одного request'а. А каждый request запускается в своём потоке, для которого и создаётся своя глобальная внутрипоточная flask.g.

ioprst
  
return Response({'success': False}, 500, {'ContentType': 'application/json'})
И Response можно не писать, а просто кортеж возвращать, так как кортеж передаётся потом в функцию make_response() неявно.


ioprst
Проблема возникает тогда, когда запущен процесс П, и я насколько раз запускаю/останавливаю сервер
ioprst
Т.к. сервер должен возвращать данные П, то между ними (двумя процессами) организован обмен через сокеты (не некоторому собственному протоколу).
Я думаю, в этой схеме чего-то не хватает. У тебя должно быть приложение на Flask; стоять оно должно правильно. У тебя должен быть процесс, обрабатывающий сами данные; работать с данными он должен правильно, чётко: должно быть точно видно, где он начал, где он закончил. И вот когда данные можно передать из приложения в процесс и когда данные можно вернуть из процесса в приложение, должен быть такой узел, который умеет ждать те данные и эти и передавать их туда-сюда, когда они точно готовы. Диспетчер сообщений, короче.

ioprst
Подскажите, пожалуйста, в чем может быть причина.
В сокетах. Ты их никак не контролируешь. Да и не надо это делать.
ioprst
py.user.next
Да и не надо это делать.
почему не нужно контролировать сокеты?
py.user.next
ioprst
почему не нужно контролировать сокеты?
Потому что система их уже контролирует оптимально. Если они не открываются/не закрываются, значит так надо. Когда система разберётся с процессами, там всё завершится правильно и сокеты будут готовы к дальнейшей работе.
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB