在项目中遇到了如下需求:
- 从远端服务器获取字节流数据,按照相关协议拆分字节流并进行校验
- 之后每一个不同的字节流对应了不同的命令字,需要根据命令字将字节流数据交予不同的handler处理
- 处理之后的结果通过对应的TCP链接进行send操作
我们逐个解决
思路
- handler需要进行大量的字节拆分/计算,我将这部分拆分出来使用了C++进行处理
- 项目中需要同时管理500~600个活动的TCP链接,于是使用了Python中的selector模块实现基于事件选择的socket server
- 由于命令字的位置相对于每一条拆分后的字节流是固定的,使用类似“阻塞字典”的模块将任意的任务拆分至对应的命令字中,通过“生产-消费”模型解决需求
难点
Python 中并没有直接提供类似 “阻塞字典” 这样的概念,我们可以通过使用 threading 模块提供的 event 操作进行自己实现如下功能:
- 当进行 get 操作时,如果字典中不存在相应的键,阻塞操作直到该键对应的值出现
- 当多个线程同时等待 get 操作时,如果数据出现,需要将相同的数据同时返回给它们
解决
- 使用 event 的原因第一是:event 可以同时阻塞多个线程,而在 set 操作被执行之后可以同时同时多个线程回收数据
- 使用 event 的原因第二是因为 event 的 wait 操作支持 Timeout,不需要我们单独封装超时
- 不能实现阻塞字典的 pop 操作,因为会导致数据被优先获取的线程抢占获取之后清除,所以需要将 erase 操作和 get 操作分离
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| """一个线程安全的阻塞字典"""
import threading
class ConcurrentDict(object): """该字典当要pop的键不存在时, 会阻塞操作线程""" def __init__(self): self.events = dict() self.data = dict()
def put(self, key, value): """向键值对中加入存储 该函数会检查 key 是否在 self.events 里 当存在时则说明有某些查询操作正在被阻塞, 此时需要对该 锁进行 set 操作, 通知对应的线程们取走数据 """ self.data[key] = value if key in self.events.keys(): event = self.events.pop(key) if not event.isSet(): event.set()
def get(self, key, timeout = None): """从存储中返回值 该函数会首先检查 self.data 中是否有对应的键 如果有则直接返回 否则在 self.events 中新建一个 threading.Event 对象并将其 clear, 之后进行 wait 操作阻塞线程 当 put 操作加入对应的键值对时, 该 event 对象 会被 set, 此时对应的阻塞释放, 返回相应的值
timeout: 超时时长, 超时之后触发 TimeoutError """ if key in self.data.keys(): return self.data.pop(key)
event = None if not key in self.events.keys(): event = threading.Event() event.clear() self.events[key] = event else: event = self.events[key] if timeout != None: event.wait(timeout) else: event.wait()
if key in self.data.keys(): return self.data[key] else: raise TimeoutError("key '%s' not found" % key)
def erase(self, key): """从存储中移除对应的键值对""" if key in self.data.keys(): self.data.pop(key)
|
很高兴 Wordpress 自带的代码区块终于有了高亮功能。
Author: 桂小方
Permalink:
https://init.blog/1746/
文章许可协议:
如果你觉得文章对你有帮助,可以 支持我
Comments