#python
python parallel
developer
Python 并发并行
协程 async/await
async标记的实体会被识别为协程
await后面是高io语句
async def coro1():
print("coro1")
await asyncio.sleep(1) # python你去执行它吧,完了喊我
return 1;
async def coro2():
print("coro2")
await asyncio.sleep(2)
return 2;
# gather写法
async def main1():
print("main1")
await asyncio.gather(coro1(), coro2()) # 把两个协程组合起来执行
asyncio.run(main1())
# 标准写法
async def main2():
print("main2")
task1 = asyncio.create_task(coro1())
task2 = asyncio.create_task(coro2())
await task1
await task2
# TaskGroup写法
async def main3():
print("main3")
async with asyncio.TaskGroup() as tg:
tg.create_task(coro1())
tg.create_task(coro2())
1. 你(协程):"Python,帮我执行这个 I/O 操作,完成了叫我"
2. Python(事件循环):"好的,我发起了请求,你先休息,我去处理其他协程"
3. 操作系统/网络库:在后台处理 I/O(发送网络请求、读写文件等)
4. I/O 完成:操作系统通知 Python
5. Python:"喂,你要的结果好了,继续执行吧!"
6. 你(协程):从 await 的下一行继续执行
线程池 ThreadPoolExecutor
多线程的基础是一堆future对象在一个池子里。一个future对象就是一个任务,有函数名、函数参数、任务参数。
初始化线程池
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time
def task(n):
time.sleep(1)
print(f"task{n} done")
return n * 2
# 创建线程池(max_workers 指定最大线程数)
executor = ThreadPoolExecutor(max_workers=3)
# with as 形式创建
with ThreadPoolExecutor(max_workers=5) as t:
do something
提交任务
- submit(),单个提交,手动逐个传参
# submit 返回 Future 对象
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
future1 = executor.submit(task, 1)
future2 = executor.submit(task, 2)
future3 = executor.submit(task, 3)
# 获取结果(会阻塞直到完成)
print(future1.result()) # 2
print(future2.result()) # 4
print(future3.result()) # 6
- map(),批量提交,批量传参
with ThreadPoolExecutor(max_workers=3) as executor:
# map 自动分配任务,返回结果迭代器
# 结果顺序与输入顺序一致
results = executor.map(task, [1, 2, 3, 4, 5])
for result in results:
print(result) # 按顺序输出: 2, 4, 6, 8, 10
获取结果
- result(),阻塞等待,按开始顺序得到结果
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 2)
# 阻塞等待,直到任务完成
result = future.result() # 默认永久等待
# 带超时
try:
result = future.result(timeout=5) # 最多等 5 秒
except TimeoutError:
print("任务超时")
- as_completed(),按完成顺序得到结果
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, n) for n in [3, 1, 2]]
# 按完成顺序处理结果(不是提交顺序)
for future in as_completed(futures):
result = future.result()
print(result)
# 输出顺序:任务 1 完成, 任务 2 完成, 任务 3 完成
# 截断超时的任务
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in [3, 1, 2]]
try:
for future in as_completed(futures, timeout=5):
result = future.result()
print(result)
except TimeoutError:
print("某些任务超时了")
- wait(),灵活决定什么时候结束
from concurrent.futures import wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in [1, 2, 3]]
# 开始任务,设置结束条件
done, not_done = wait(futures, return_when=FLAG)
print(f"完成数量: {len(done)}")
print(f"未完成数量: {len(not_done)}")
# 获取结果
for future in done:
print(future.result())
# 1. FIRST_COMPLETED - 任意一个任务完成就返回
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
# 2. FIRST_EXCEPTION - 第一个异常或全部完成
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)
# 3. ALL_COMPLETED - 所有任务完成(默认)
done, not_done = wait(futures, return_when=ALL_COMPLETED)
# 4. 带超时
done, not_done = wait(futures, timeout=5, return_when=ALL_COMPLETED)
Future对象的其他用法
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
# 1. 检查状态
print(future.running()) # 是否正在运行
print(future.done()) # 是否已完成
print(future.cancelled()) # 是否已取消
# 2. 取消任务(只能取消未开始的)
future.cancel()
# 3. 获取结果
result = future.result(timeout=10)
# 4. 获取异常
exception = future.exception(timeout=10)
# 5. 添加回调(任务完成时自动调用)
def callback(fut):
print(f"任务完成,结果: {fut.result()}")
future.add_done_callback(callback)
例子
实时进度
items = [3, 1, 4, 1, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有任务
futures = {executor.submit(process, item): item for item in items}
# 带异常处理
for future in as_completed(futures):
item = futures[future] # 获取原始参数
try:
result = future.result()
print(f"✓ {result}")
except Exception as e:
print(f"✗ 项目 {item} 失败: {e}")
# 带进度条
for future in tqdm(as_completed(futures), total=len(files)):
result = future.result()
只要第一个
servers = ["server1.com","server2.com", "server3.com"]
with ThreadPoolExecutor(max_workers=len(servers)) as executor:
# 同时尝试所有服务器
futures = [executor.submit(try_server, s) for s in servers]
# 等待第一个成功
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
# 获取最快的结果
fastest = list(done)[0]
print(f"最快响应: {fastest.result()}")
# 取消其他任务
for future in not_done:
future.cancel()
concurrent.futures
都在concurrent.futures
- ThreadPoolExecutor:
- 线程池,用于处理 I/O 密集型任务(如网络请求、文件操作等)。
- 可能受 GIL 限制,无法显著提升 CPU 密集型任务的性能。
- ProcessPoolExecutor:
- 进程池,用于处理计算密集型任务(如数学运算、大量数据处理等)。
- 进程间通信成本较高,适合处理较大的任务,而非大量小任务。