最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。
如果仅仅使用async with asyncio.Lock()的话。会使所有请求都排队处理。
import asyncio import datetime lock = asyncio.Lock() async def place_order(user_id, order_id): async with lock: # 模拟下单处理 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}") await asyncio.sleep(1) # 假设处理需要 1 秒 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done") # 定义一个测试函数 async def test(): # 创建四个任务,模拟两个 UserID 的并发请求 tasks = [ asyncio.create_task(place_order(1, 101)), asyncio.create_task(place_order(1, 102)), asyncio.create_task(place_order(2, 201)), asyncio.create_task(place_order(2, 202)), ] # 等待所有任务完成 await asyncio.gather(*tasks) if __name__ == '__main__': # 运行测试函数 asyncio.run(test())
这显然不是想要的结果,第二种方案是定义一个字典,key使用user_id,value为asyncio.Lock(),每次执行前从字典里面获取lock,相同的user_id将会使用同一个lock,那就实现了功能。
import asyncio import datetime locks = {} async def place_order(user_id, order_id): if user_id not in locks: locks[user_id] = asyncio.Lock() async with locks[user_id]: # 模拟下单处理 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}") await asyncio.sleep(1) # 假设处理需要 1 秒 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done") # 定义一个测试函数 async def test(): # 创建四个任务,模拟两个 UserID 的并发请求 tasks = [ asyncio.create_task(place_order(1, 101)), asyncio.create_task(place_order(1, 102)), asyncio.create_task(place_order(2, 201)), asyncio.create_task(place_order(2, 202)), ] # 等待所有任务完成 await asyncio.gather(*tasks) if __name__ == '__main__': # 运行测试函数 asyncio.run(test())
但是这个方案会有缺点是,user_id执行完成之后没有释放资源,当请求的user_id变多之后,势必会造成占用过多的资源。继续改进方案,将locks的value加一个计数器,当获取lock时计数器加1,使用完之后计数器-1,当计数器变为小于等于0时,释放locks对应的key。最后将这个功能封装为一个类方便其他地方调用。
import asyncio mutex_locks = {} class MutexObj: def __init__(self): self.lock = asyncio.Lock() self.count = 0 class Mutex: def __init__(self, key: str): if key not in mutex_locks: mutex_locks[key] = MutexObj() self.__mutex_obj = mutex_locks[key] self.__key = key def lock(self): """ 获取锁 :return: """ self.__mutex_obj.count += 1 return self.__mutex_obj.lock def release(self): """ 释放锁 :return: """ self.__mutex_obj.count -= 1 if self.__mutex_obj.count <= 0: del mutex_locks[self.__key]
import asyncio import datetime from utils.mutex import Mutex, mutex_locks locks = {} async def place_order(user_id, order_id): mutex = Mutex(user_id) async with mutex.lock(): try: # 模拟下单处理 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}") await asyncio.sleep(1) # 假设处理需要 1 秒 print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done") except Exception as ex: print(ex) finally: mutex.release() print('=====================') print(mutex_locks) print('=====================') # 定义一个测试函数 async def test(): # 创建四个任务,模拟两个 UserID 的并发请求 tasks = [ asyncio.create_task(place_order(1, 101)), asyncio.create_task(place_order(1, 102)), asyncio.create_task(place_order(2, 201)), asyncio.create_task(place_order(2, 202)), ] # 等待所有任务完成 await asyncio.gather(*tasks) if __name__ == '__main__': # 运行测试函数 asyncio.run(test())
至此实现了我的需求,此方案只考虑了单应用场景,如果是分布式部署,需要更换方案如redis锁,这里暂且不考虑。如果有其他实现方式,欢迎留言交流。
1.本站内容仅供参考,不作为任何法律依据。用户在使用本站内容时,应自行判断其真实性、准确性和完整性,并承担相应风险。
2.本站部分内容来源于互联网,仅用于交流学习研究知识,若侵犯了您的合法权益,请及时邮件或站内私信与本站联系,我们将尽快予以处理。
3.本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
4.根据《计算机软件保护条例》第十七条规定“为了学习和研究软件内含的设计思想和原理,通过安装、显示、传输或者存储软件等方式使用软件的,可以不经软件著作权人许可,不向其支付报酬。”您需知晓本站所有内容资源均来源于网络,仅供用户交流学习与研究使用,版权归属原版权方所有,版权争议与本站无关,用户本人下载后不能用作商业或非法用途,需在24个小时之内从您的电脑中彻底删除上述内容,否则后果均由用户承担责任;如果您访问和下载此文件,表示您同意只将此文件用于参考、学习而非其他用途,否则一切后果请您自行承担,如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。
5.本站是非经营性个人站点,所有软件信息均来自网络,所有资源仅供学习参考研究目的,并不贩卖软件,不存在任何商业目的及用途
暂无评论内容