网站内部相关文章:

websockets

解决问题

围绕 HTTP 的请求和响应在低延时的交互场景中使用如下的技术:

  • 短轮询: Client 请求数据, Server 立刻返回数据并断开连接, 不管数据是否发生变化
  • 长轮询: Client 请求数据, Server 不会立刻断开连接, 保活连接一段时间(例如 1min), 直到超时断开, 之后重复上述流程

但是基于 HTTP 的长轮询和短轮询都有他们各自的缺点以及无法在低延时场景解决需求的问题:

  • 短轮询: 大量的短轮询造成服务器需要并发处理大量的请求, 对服务器造成极大的并发压力, 极大的占用网络带宽
  • 长轮询: 连接的挂起导致资源的极大浪费, 同时可能导致 TCP 连接(长连接)不能复用, 需要服务器横向扩展并发能力
  • 共同特点: 短轮询和长轮询仍然是基于 HTTP 协议, Client 和 Server 之间的通信有着非常大的开销, 不适用于低延时场景

websockets 协议

websockets 在维基上的定义如下:

WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI 模型应用层。WebSocket 协议在 2011 年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的 WebSocket API 由W3C标准化.

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手(在 HTTP 层面的一次握手, TCP 三次握手仍然需要),两者之间就可以创建持久性的连接,并进行双向数据传输.

一个可用的 websockets 连接 URL 如下:

1
2
3
4
# 1. 非加密的websockets连接
ws://127.0.0.1:8003
# 2. 加密的websockets连接
wss://127.0.0.1:8003

websockets 是一种不同于 HTTP 的通信协议, 由定义可知, 其也位于应用层, 依赖于 TCP 协议. 虽然, websockets 不同于 HTTP 协议, 但为了兼容 http, 其使用 80 和 443 端口来进行通信, 并且使用 HTTP upgrade 来完成一次握手协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 1. 客户端请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

# 2. 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

在完成上述一个简单的握手协议之后, client 和 server 就可以基于新协议互相发送接收数据包较小短消息, 降低了网络带宽, 降低了网络延时, 从而更好的在一些场景使用, 那么 websocket 的特点有哪些? websocket 都适用于哪些场景呢?

  • 较少的控制开销, 一次握手之后交互消息的头部大小一般为 2~10 字节
  • 更强的实时性, 全双工协议, 任何一端都可以主动的下发数据
  • 保持连接
  • 二进制支持
  • 更好的压缩支持和压缩效果

使用场景

  1. 社交订阅:订阅方可以在线即时的了解对方的信息, 从而更快的做出反馈
  2. 多玩家游戏
  3. 协同编程和编辑
  4. 点击流数据: 分析用户与你网站的互动, 从而更好的提升网站, 通过 websocket, 将客户端的详细数据不断的发送到服务器然后进行分析, 当然, 这个需要得到用户的授权以及避免恶意攻击
  5. 体育实况更新
  6. 股票基金报价
  7. 多媒体聊天
  8. 基于位置的应用

websocket 服务

使用websockets包构建一个非常简单的 websocket 服务, 并使用 postman 来验证 Client 和 Server 的通信. 首先, 让我们看下服务器端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
""" websocket 简单示例--ssh
python版本: 3.8
"""
import time
import asyncio
import websockets


_g_connect_number = 0
MAX_MSG_IDNEX = 10000


async def hello(websocket, path):
""" 基本echo 服务 """
global _g_connect_number # pylint: disable=global-statement

_name = await websocket.recv()
_g_connect_number += 1
print(f'< 收到消息:{_name}, 序号:{_g_connect_number}')

max_msg_number = 100
for i in range(max_msg_number):
greeting = f'你好{_name}, 你是第{_g_connect_number}号用户, 目前回传消息序号:{i + 1}'
await websocket.send(greeting)
print(f'> {greeting}')
time.sleep(1)
print('------------等待新的连接------------')


start_server = websockets.serve(hello, '0.0.0.0', 8003) # pylint: disable=no-member
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

之后, 在 postman 中使用 websocket 进行连接: ws://127.0.0.1:8003:

  • 每次 connect 成功之后, client 和 server 都不会有任何打印信息, 但是连接一直保持着
  • client 发送一个消息: bifeng, 此时在 postman 中会持续性的打印出服务器发送过来的消息
  • 注意, 每一次 connect 都是一个新的连接请求, 所以服务器需要处理好不同的连接请求并返回对应的值
1
2
3
4
5
6
7
8
9
10
11
# 1. 客户端
Disconnected from ws://127.0.0.1:8003
你好bifeng, 你是第3号用户, 目前回传消息序号:100
...
你好bifeng, 你是第3号用户, 目前回传消息序号:1

# 2. 服务器端
> 你好bifeng, 你是第3号用户, 目前回传消息序号:1
...
> 你好bifeng, 你是第3号用户, 目前回传消息序号:100
------------等待新的连接------------

flask sockets 服务

使用flask_sockets包构建一个仅仅支持原生 websocket 服务, 仅仅实现一个非常简单的功能, 代码参考官方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
""" 注意, flask_sockets不支持2.0+版本 """
import time
from flask import Flask
from flask_sockets import Sockets


app = Flask(__name__)
sockets = Sockets(app)


@sockets.route('/echo')
def echo_socket(ws):
while not ws.closed:
message = ws.receive()
print(f'收到客户端发送过来的消息:{message}')
for i in range(100):
ws.send(f'Hello, {message}, index:{i + 1}')
time.sleep(0.1)


@app.route('/')
def hello():
print('Receive a new http request')
return 'Hello World!'


if __name__ == "__main__":
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()

后续使用 postman 就可以访问: http://127.0.0.1:5000, ws://127.0.0.1:5000/echo, 输出如下:

1
2
3
4
5
< Hello, bifeng, index:4
< Hello, bifeng, index:3
< Hello, bifeng, index:2
< Hello, bifeng, index:1
> bifeng

NOTE: 这里有一个非常坑的问题, flask_sockets是一个很多年前写的一个非常简单的模块, 其不兼容 Flask2.0 以上的版本, 否则会报错, 这点非常重要.

socketio

socket.io 协议

在第二章, 我们介绍过 websocket 协议, 在此基础上该章节将介绍 socket.io 协议. Socket.IO 是一种传输协议(我觉的不应该叫协议, 不过官方文档这样定义的), 基于 event 的实时双向通信框架.

Socket.IO 客户端和服务器的标准组件是由 javascript 实现, 底层封装了 websockets 和 http 长轮询(long-polling). 到目前为止, Socket.IO 总共有 5 个版本, 其中 v1, v2 并未有真正的发布正式的 release 协议版本. 这里我们描述的 socket.io 协议都是基于 V5 版本来进行说明的, 下面简单的了解下 socket.io 协议的一些基本术语:

  • Engine.IO: 一个基于 Websocket 和 HTTP long-polling 更底层的基础传输系统, 这是 V4 版本提出的一个概念, socket.io 就是基于 Engine.io 的封装
  • IO 多路复用
  • Packet 包: 传输信息的包基本单元

那么, Packet 包的格式是怎么样的, 其是怎么进行交互的呢? 一个基本的 packet 包如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type:  类型
0--CONNECT, 客户端连接一个命名空间, 此时会携带上一个鉴权的payload
1--DISCONNECT, 断开连接
2--EVENT, 发送数据
3--ACk, 接收到EVENT 事件或者带有 ACK ID的binary_event事件时
4--CONNECT_ERROR, 服务器端拒绝一个NS的连接时
5--BINARY_EVENT, 解析二进制数据
6--BINARY_ACK, 而进行接收事件

namespace: 名称空间, 实际上就是 URL, 例如: "nsp": "/admin", "nsp": "/"

data: packet内容(对象, 数组), 可选字段, 例如:
{
"data": {
"token": "123"
}
}
id: ACK的 ID, 可选字段

下面就是一些基本的包例子:

  1. 连接, 其中 token 就是鉴权, 服务器会响应一个携带 socket id 的 payload 信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"type": 0,
"nsp": "/admin",
"data": {
"token": "123"
}
}

{
"type": 0,
"nsp": "/admin",
"data": {
"sid": "CjdVH4TQvovi1VvgAC5Z"
}
}
  1. 消息, 可能还包含 ack id 信息
1
2
3
4
5
6
7
8
9
10
11
{
"type": 2,
"nsp": "/",
"data": ["hello", 1]
}
{
"type": 2,
"nsp": "/admin",
"data": ["project:delete", 123],
"id": 456
}

到这里, 我们已经对 websocket 中的 packet 包有一个基本的了解了, 那么 Client 和 Server 之间按照什么规则来进行 packet 包的交互的呢? 实际上整个过程就是 Connect, Event, Event, Disconnect.

1
2
3
4
5
Client > { type: CONNECT, nsp: "/admin" }
# 连接成功
Server > { type: CONNECT, nsp: "/admin", data: { sid: "wZX3oN0bSVIhsaknAAAI" } }
# 连接失败
Server > { type: CONNECT_ERROR, nsp: "/admin", data: { message: "Not authorized" } }

socket.io 使用

4.1 节我们已经简单介绍过 socket.IO 协议通信过程中各个包的基本组成以及类型, 现在我们开始介绍其具体的实现和用法.

  1. 基于事件

Socket.IO 是基于事件来进行交互的, 连接事件, 断开时间, 发送消息事件等等, 每一个 event 都有 name 和 args, 客户端和服务器可以使用 event 和 on 装饰器来 register 事件处理函数, 例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 1. 客户端
import socketio
sio = socketio.Client()

@sio.event
def connect():
""" 连接事件回调函数 """
print('connection established')

@sio.on('my_client_message')
def on_message(data):
global client_index
print('message received with ', data)
client_index += 1
time.sleep(1)
sio.emit('my_message', {'name': 'bifeng', 'index': client_index}) # 发送消息


# 2. 服务器
import eventlet
import socketio
sio = socketio.Server()
app = socketio.WSGIApp(sio, static_files={
'/': {'content_type': 'text/html', 'filename': 'index.html'}
})
@sio.event
def connect(sid, environ):
""" 连接事件回调函数 """
print('connect ', sid)

@sio.event
def my_message(sid, data):
global server_index
print('message ', data)
server_index += 1
time.sleep(1)
# 注意首参名字
sio.emit('my_client_message', {'response': 'server', 'index': server_index})
  1. 连接和发送

每次客户端通过 connect 连接服务器, server 都会为客户端分配一个唯一的的SID, 表示会话标识符, 这个在客户端包中被隐藏了, 可以通过请求 URL 看到该值. 客户端和服务器之间则通过封装函数emit来传递数据, 见如下例子:

1
2
3
4
5
# 1. 客户端连接服务器
sio.connect('http://127.0.0.1:5000')
# 打印sio中的sid值
print(sio.sid)
sio.emit('my_message', {'message': 1})
  1. 名称空间

在 4.1 节我们讲解过名称空间的概念, 类似 URL 的一个概念, socket.io 协议支持多个逻辑连接(不同的 URL), 所有逻辑连接都复用在同一物理连接上, 例如:

1
2
3
4
5
6
7
8
9
10
11
# 1. 客户端连接
sio.connect('http://127.0.0.1:5000', namespace=['/chat'])

# 2. 服务器设置名称空间(仅仅使用/chat名称空间处理)
@sio.event(namespace='/chat')
def my_message(sid, data):
pass

@sio.on('connect', namespace='/chat')
def on_connect():
print("I'm connected to the /chat namespace!")

另外, 可以通过继承socketio.ClientNamespace来实现基于类的名称空间, 例如:

1
2
3
4
5
6
7
8
9
10
11
class MyCustomNamespace(socketio.ClientNamespace):
def on_connect(self):
pass

def on_disconnect(self):
pass

def on_my_event(self, data):
self.emit('my_response', data)

sio.register_namespace(MyCustomNamespace('/chat'))
  1. 房间

除了名称空间之外, socket.io 还提供了类似消息队列的组概念, 从而使服务器向某一个客户端组发送事件, 使该房间内的多个组同时收到消息, 实际上组内的任何一个人通过 emit 发送的消息, 默认会广播到其他组内用户, 可以通过skip_sid来跳过指定的 sid. 默认情况下emit的 room 参数为 sid, 即为每一个连接创建一个房间, 此时房间都是独立的, 但是如果显示指定了 room, 则可能存在多个客户端连接的情况, 例如:

1
2
3
4
5
6
7
8
9
10
11
# 1. 客户端加入房间, 离开房间
@sio.event
def begin_chat(sid):
sio.enter_room(sid, 'chat_users')

@sio.event
def exit_chat(sid):
sio.leave_room(sid, 'chat_users')

# 2. 广播消息
sio.emit('my message', data, room='chat_users', skip_sid=sid)

实例

下面是一个简单的基于标准而非 async 实现的官方客户端和服务器案例. 另外, 不知道为何不能通过 postman 模拟出来 socket.io 客户端, 可能对该协议的理解不够深入, 抓包分析的时候不够深入, 后续有时间再看看.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#1. 客户端
""" socketio客户端标准版 """
import time
import socketio


sio = socketio.Client()
client_index = 0


@sio.event
def connect():
print('connection established')


@sio.on('my_client_message')
def on_message(data):
global client_index
print('message received with ', data)
client_index += 1
time.sleep(1)
sio.emit('my_message', {'name': 'bifeng', 'index': client_index}) # 发送消息


@sio.event
def disconnect():
print('disconnected from server')


sio.connect('http://127.0.0.1:5000')
sio.emit('my_message', {'name': 'bifeng'}) # 发送消息
print('本次会话 ID:', sio.sid)
sio.wait()


# 2. 服务器
""" socketio服务器标准版, 实际上官方源代码examples中有几种版本的案例 """
import time
import eventlet
import socketio


sio = socketio.Server()
app = socketio.WSGIApp(sio, static_files={
'/': {'content_type': 'text/html', 'filename': 'index.html'}
})
server_index = 0


@sio.event
def connect(sid, environ):
print('connect ', sid)


@sio.event
def my_message(sid, data):
global server_index
print('message ', data)
server_index += 1
time.sleep(1)
sio.emit('my_client_message', {'response': 'server', 'index': server_index})


@sio.event
def disconnect(sid):
print('disconnect ', sid)


if __name__ == '__main__':
eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 5000)), app)

flask socketio

注意, flask_socketio不支持 Flask2.0 版本, 其他测试例子见官方文档, 其他整体逻辑就是:

  • a. 访问 HTTP 请求获取一个基础页面
  • b. 通过页面实现 socket.io 的交互逻辑

例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from flask import Flask, render_template, request
from flask_gzip import Gzip
from flask_socketio import SocketIO, emit

app = Flask(__name__, template_folder='./templates', static_folder="",
static_url_path="", )

app.config['SECRET_KEY'] = 'secret!'
gzip = Gzip(app)
socketio = SocketIO(app)

count = 0

@app.route('/')
def index():
return render_template('index.html')

@socketio.on('connect', namespace='/test')
def test_connect():
print('connected')
global count
count += 1
emit('my_response', {"count": count, 'data': 'Connected'})


@socketio.on('disconnect', namespace='/test')
def test_disconnect():
print('Client disconnected')


@socketio.on('my_ping', namespace='/test')
def ping_pong():
print("----ping消息----")
emit('my_pong')


if __name__ == '__main__':
socketio.run(app, port=8888, host='0.0.0.0', debug=True)

测试的时候, 通过浏览器访问http://127.0.0.1:8888/获取页面并进行后续的 websocket 交互逻辑.

引用: