Socket Server架构设计与 Python阻塞字典实现

/ 2评 / 2

需求

在项目中遇到了如下需求:

思路

架构设计

难点

Python 中并没有直接提供类似 “阻塞字典” 这样的概念,我们可以通过使用 threading 模块提供的 event 操作进行自己实现如下功能:

解决

代码

"""一个线程安全的阻塞字典"""

import threading

class ConcurrentDict(object):
    """该字典当要pop的键不存在时, 会阻塞操作线程"""
    def __init__(self):
        self.events = dict() # 用于存储键和查询锁对
        self.data = dict() # 用于存储键值对

    def put(self, key, value):
        """向键值对中加入存储
        该函数会检查 key 是否在 self.events 里
        当存在时则说明有某些查询操作正在被阻塞, 此时需要对该
        锁进行 set 操作, 通知对应的线程们取走数据
        """
        self.data[key] = value
        if key in self.events.keys():
            event = self.events.pop(key)
            if not event.isSet():
                event.set()

    def get(self, key, timeout = None):
        """从存储中返回值
        该函数会首先检查 self.data 中是否有对应的键
        如果有则直接返回
        否则在 self.events 中新建一个 threading.Event
        对象并将其 clear, 之后进行 wait 操作阻塞线程
        当 put 操作加入对应的键值对时, 该 event 对象
        会被 set, 此时对应的阻塞释放, 返回相应的值

        timeout: 超时时长, 超时之后触发 TimeoutError
        """
        if key in self.data.keys():
            return self.data.pop(key)

        # 判断是否已经有 Event 对象被创建, 如有则不创建
        event = None
        if not key in self.events.keys():
            event = threading.Event()
            event.clear()
            self.events[key] = event
        else:
            event = self.events[key]
            
        if timeout != None:
            event.wait(timeout)
        else:
            event.wait()

        # 当超时或者获取到之后
        if key in self.data.keys():
            return self.data[key]
        else:
            raise TimeoutError("key '%s' not found" % key)

    def erase(self, key):
        """从存储中移除对应的键值对"""
        if key in self.data.keys():
            self.data.pop(key)

很高兴 WordPress 自带的代码区块终于有了高亮功能。

知识共享许可协议
本作品采用知识共享署名 4.0 国际许可协议进行许可。

2条回应:“Socket Server架构设计与 Python阻塞字典实现”

  1. repostone说道:

    非技术的路过。

  2. Feng说道:

    这个样式比以前顺眼多了。

发表评论

电子邮件地址不会被公开。 必填项已用*标注