#python

python parallel

developer

Python 并发并行

协程 async/await

async标记的实体会被识别为协程

await后面是高io语句

async def coro1():
    print("coro1")
    await asyncio.sleep(1) # python你去执行它吧,完了喊我
    return 1;

async def coro2():
    print("coro2")
    await asyncio.sleep(2)
    return 2;

# gather写法
async def main1():
    print("main1")
    await asyncio.gather(coro1(), coro2()) # 把两个协程组合起来执行

asyncio.run(main1())

# 标准写法
async def main2():
    print("main2")
    task1 = asyncio.create_task(coro1())
    task2 = asyncio.create_task(coro2())
    await task1
    await task2
    
# TaskGroup写法
async def main3():
    print("main3")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(coro1())
        tg.create_task(coro2())
    
1. 你(协程):"Python,帮我执行这个 I/O 操作,完成了叫我"
2. Python(事件循环):"好的,我发起了请求,你先休息,我去处理其他协程"
3. 操作系统/网络库:在后台处理 I/O(发送网络请求、读写文件等)
4. I/O 完成:操作系统通知 Python
5. Python:"喂,你要的结果好了,继续执行吧!"
6. 你(协程):从 await 的下一行继续执行

线程池 ThreadPoolExecutor

多线程的基础是一堆future对象在一个池子里。一个future对象就是一个任务,有函数名、函数参数、任务参数。

初始化线程池

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def task(n):
    time.sleep(1)
    print(f"task{n} done")
    return n * 2

# 创建线程池(max_workers 指定最大线程数)
executor = ThreadPoolExecutor(max_workers=3)

# with as 形式创建
with ThreadPoolExecutor(max_workers=5) as t: 
    do something

提交任务

  • submit(),单个提交,手动逐个传参
# submit 返回 Future 对象
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    future1 = executor.submit(task, 1)
    future2 = executor.submit(task, 2)
    future3 = executor.submit(task, 3)
    
    # 获取结果(会阻塞直到完成)
    print(future1.result())  # 2
    print(future2.result())  # 4
    print(future3.result())  # 6
  • map(),批量提交,批量传参
with ThreadPoolExecutor(max_workers=3) as executor:
    # map 自动分配任务,返回结果迭代器
    # 结果顺序与输入顺序一致
    results = executor.map(task, [1, 2, 3, 4, 5])
    
    for result in results:
        print(result)  # 按顺序输出: 2, 4, 6, 8, 10

获取结果

  • result(),阻塞等待,按开始顺序得到结果
with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 2)
    
    # 阻塞等待,直到任务完成
    result = future.result()  # 默认永久等待
    
    # 带超时
    try:
        result = future.result(timeout=5)  # 最多等 5 秒
    except TimeoutError:
        print("任务超时")
  • as_completed(),按完成顺序得到结果
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    futures = [executor.submit(task, n) for n in [3, 1, 2]]
    
    # 按完成顺序处理结果(不是提交顺序)
    for future in as_completed(futures):
        result = future.result()
        print(result)
    
    # 输出顺序:任务 1 完成, 任务 2 完成, 任务 3 完成
# 截断超时的任务
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in [3, 1, 2]]
    
    try:
        for future in as_completed(futures, timeout=5):
            result = future.result()
            print(result)
    except TimeoutError:
        print("某些任务超时了")
  • wait(),灵活决定什么时候结束
from concurrent.futures import wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, n) for n in [1, 2, 3]]
    
    # 开始任务,设置结束条件
    done, not_done = wait(futures, return_when=FLAG)
    
    print(f"完成数量: {len(done)}")
    print(f"未完成数量: {len(not_done)}")
    
    # 获取结果
    for future in done:
        print(future.result())
# 1. FIRST_COMPLETED - 任意一个任务完成就返回
done, not_done = wait(futures, return_when=FIRST_COMPLETED)

# 2. FIRST_EXCEPTION - 第一个异常或全部完成
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)

# 3. ALL_COMPLETED - 所有任务完成(默认)
done, not_done = wait(futures, return_when=ALL_COMPLETED)

# 4. 带超时
done, not_done = wait(futures, timeout=5, return_when=ALL_COMPLETED)

Future对象的其他用法

with ThreadPoolExecutor() as executor:
    future = executor.submit(task, 5)
    
    # 1. 检查状态
    print(future.running())    # 是否正在运行
    print(future.done())       # 是否已完成
    print(future.cancelled())  # 是否已取消
    
    # 2. 取消任务(只能取消未开始的)
    future.cancel()
    
    # 3. 获取结果
    result = future.result(timeout=10)
    
    # 4. 获取异常
    exception = future.exception(timeout=10)
    
    # 5. 添加回调(任务完成时自动调用)
    def callback(fut):
        print(f"任务完成,结果: {fut.result()}")
    
    future.add_done_callback(callback)

例子

实时进度

items = [3, 1, 4, 1, 5]
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交所有任务
    futures = {executor.submit(process, item): item for item in items}
    
    # 带异常处理
    for future in as_completed(futures):
        item = futures[future]  # 获取原始参数
        try:
            result = future.result()
            print(f"✓ {result}")
        except Exception as e:
            print(f"✗ 项目 {item} 失败: {e}")
            
    # 带进度条
    for future in tqdm(as_completed(futures), total=len(files)):
        result = future.result()

只要第一个

servers = ["server1.com","server2.com", "server3.com"]
with ThreadPoolExecutor(max_workers=len(servers)) as executor:
    # 同时尝试所有服务器
    futures = [executor.submit(try_server, s) for s in servers]
    
    # 等待第一个成功
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    
    # 获取最快的结果
    fastest = list(done)[0]
    print(f"最快响应: {fastest.result()}")
    
    # 取消其他任务
    for future in not_done:
        future.cancel()

concurrent.futures

都在concurrent.futures

  • ThreadPoolExecutor
    • 线程池,用于处理 I/O 密集型任务(如网络请求、文件操作等)。
    • 可能受 GIL 限制,无法显著提升 CPU 密集型任务的性能。
  • ProcessPoolExecutor
    • 进程池,用于处理计算密集型任务(如数学运算、大量数据处理等)。
    • 进程间通信成本较高,适合处理较大的任务,而非大量小任务。

多进程 multiprocessing

多线程 threading