Socket Server架构设计与 Python阻塞字典实现

tech

This article was last updated on <span id="expire-date"></span> days ago, the information described in the article may be outdated.

在项目中遇到了如下需求:

  • 从远端服务器获取字节流数据,按照相关协议拆分字节流并进行校验
  • 之后每一个不同的字节流对应了不同的命令字,需要根据命令字将字节流数据交予不同的handler处理
  • 处理之后的结果通过对应的TCP链接进行send操作

我们逐个解决

思路

  • handler需要进行大量的字节拆分/计算,我将这部分拆分出来使用了C++进行处理
  • 项目中需要同时管理500~600个活动的TCP链接,于是使用了Python中的selector模块实现基于事件选择的socket server
  • 由于命令字的位置相对于每一条拆分后的字节流是固定的,使用类似“阻塞字典”的模块将任意的任务拆分至对应的命令字中,通过“生产-消费”模型解决需求

难点

Python 中并没有直接提供类似 “阻塞字典” 这样的概念,我们可以通过使用 threading 模块提供的 event 操作进行自己实现如下功能:

  • 当进行 get 操作时,如果字典中不存在相应的键,阻塞操作直到该键对应的值出现
  • 当多个线程同时等待 get 操作时,如果数据出现,需要将相同的数据同时返回给它们

解决

  • 使用 event 的原因第一是:event 可以同时阻塞多个线程,而在 set 操作被执行之后可以同时同时多个线程回收数据
  • 使用 event 的原因第二是因为 event 的 wait 操作支持 Timeout,不需要我们单独封装超时
  • 不能实现阻塞字典的 pop 操作,因为会导致数据被优先获取的线程抢占获取之后清除,所以需要将 erase 操作和 get 操作分离

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""一个线程安全的阻塞字典"""

import threading

class ConcurrentDict(object):
"""该字典当要pop的键不存在时, 会阻塞操作线程"""
def __init__(self):
self.events = dict() # 用于存储键和查询锁对
self.data = dict() # 用于存储键值对

def put(self, key, value):
"""向键值对中加入存储
该函数会检查 key 是否在 self.events 里
当存在时则说明有某些查询操作正在被阻塞, 此时需要对该
锁进行 set 操作, 通知对应的线程们取走数据
"""
self.data[key] = value
if key in self.events.keys():
event = self.events.pop(key)
if not event.isSet():
event.set()

def get(self, key, timeout = None):
"""从存储中返回值
该函数会首先检查 self.data 中是否有对应的键
如果有则直接返回
否则在 self.events 中新建一个 threading.Event
对象并将其 clear, 之后进行 wait 操作阻塞线程
当 put 操作加入对应的键值对时, 该 event 对象
会被 set, 此时对应的阻塞释放, 返回相应的值

timeout: 超时时长, 超时之后触发 TimeoutError
"""
if key in self.data.keys():
return self.data.pop(key)

# 判断是否已经有 Event 对象被创建, 如有则不创建
event = None
if not key in self.events.keys():
event = threading.Event()
event.clear()
self.events[key] = event
else:
event = self.events[key]

if timeout != None:
event.wait(timeout)
else:
event.wait()

# 当超时或者获取到之后
if key in self.data.keys():
return self.data[key]
else:
raise TimeoutError("key '%s' not found" % key)

def erase(self, key):
"""从存储中移除对应的键值对"""
if key in self.data.keys():
self.data.pop(key)

很高兴 Wordpress 自带的代码区块终于有了高亮功能。

Author: 桂小方

Permalink: https://init.blog/1746/

文章许可协议:

如果你觉得文章对你有帮助,可以 支持我

Comments