夜间模式暗黑模式
字体
阴影
滤镜
圆角
Socket Server架构设计与 Python阻塞字典实现
本文最后更新于 312 天前,其中的信息可能已经有所发展或是发生改变。

需求

在项目中遇到了如下需求:

  • 从远端服务器获取字节流数据,按照相关协议拆分字节流并进行校验
  • 之后每一个不同的字节流对应了不同的命令字,需要根据命令字将字节流数据交予不同的handler处理
  • 处理之后的结果通过对应的TCP链接进行send操作

思路

  • handler需要进行大量的字节拆分/计算,我将这部分拆分出来使用了C++进行处理
  • 项目中需要同时管理500~600个活动的TCP链接,于是使用了Python中的selector模块实现基于事件选择的socket server
  • 由于命令字的位置相对于每一条拆分后的字节流是固定的,使用类似“阻塞字典”的模块将任意的任务拆分至对应的命令字中,通过“生产-消费”模型解决需求
架构设计

难点

Python 中并没有直接提供类似 “阻塞字典” 这样的概念,我们可以通过使用 threading 模块提供的 event 操作进行自己实现如下功能:

  • 当进行 get 操作时,如果字典中不存在相应的键,阻塞操作直到该键对应的值出现
  • 当多个线程同时等待 get 操作时,如果数据出现,需要将相同的数据同时返回给它们

解决

  • 使用 event 的原因第一是:event 可以同时阻塞多个线程,而在 set 操作被执行之后可以同时同时多个线程回收数据
  • 使用 event 的原因第二是因为 event 的 wait 操作支持 Timeout,不需要我们单独封装超时
  • 不能实现阻塞字典的 pop 操作,因为会导致数据被优先获取的线程抢占获取之后清除,所以需要将 erase 操作和 get 操作分离

代码

"""一个线程安全的阻塞字典"""

import threading

class ConcurrentDict(object):
    """该字典当要pop的键不存在时, 会阻塞操作线程"""
    def __init__(self):
        self.events = dict() # 用于存储键和查询锁对
        self.data = dict() # 用于存储键值对

    def put(self, key, value):
        """向键值对中加入存储
        该函数会检查 key 是否在 self.events 里
        当存在时则说明有某些查询操作正在被阻塞, 此时需要对该
        锁进行 set 操作, 通知对应的线程们取走数据
        """
        self.data[key] = value
        if key in self.events.keys():
            event = self.events.pop(key)
            if not event.isSet():
                event.set()

    def get(self, key, timeout = None):
        """从存储中返回值
        该函数会首先检查 self.data 中是否有对应的键
        如果有则直接返回
        否则在 self.events 中新建一个 threading.Event
        对象并将其 clear, 之后进行 wait 操作阻塞线程
        当 put 操作加入对应的键值对时, 该 event 对象
        会被 set, 此时对应的阻塞释放, 返回相应的值

        timeout: 超时时长, 超时之后触发 TimeoutError
        """
        if key in self.data.keys():
            return self.data.pop(key)

        # 判断是否已经有 Event 对象被创建, 如有则不创建
        event = None
        if not key in self.events.keys():
            event = threading.Event()
            event.clear()
            self.events[key] = event
        else:
            event = self.events[key]
            
        if timeout != None:
            event.wait(timeout)
        else:
            event.wait()

        # 当超时或者获取到之后
        if key in self.data.keys():
            return self.data[key]
        else:
            raise TimeoutError("key '%s' not found" % key)

    def erase(self, key):
        """从存储中移除对应的键值对"""
        if key in self.data.keys():
            self.data.pop(key)

很高兴 WordPress 自带的代码区块终于有了高亮功能。

评论

  1. 非技术的路过。

    9月前
    2019-9-04 11:20:45
  2. 这个样式比以前顺眼多了。

    10月前
    2019-8-24 20:57:25

发送评论 编辑评论


上一篇
下一篇