from asyncio.subprocess import PIPE import asyncio from typing import List async def counters( queue: asyncio.Queue[ list[ int ] ], sleeps: list[ float ], iterations: int ) -> None: ctr = [ 0 for _ in sleeps ] proc = [ await asyncio.create_subprocess_shell( f"while true; do echo .; sleep {i}; done", stdin=PIPE, stdout=PIPE ) for i in sleeps ] async def monitor( idx : int ) -> None: out = proc[ idx ].stdout assert out is not None async for l in out: assert l == b".\n" ctr[ idx ] += 1 async def printer() -> None: await asyncio.sleep( 1 ) for i in range( iterations ): await queue.put( ctr ) await asyncio.sleep( 1 ) for p in proc: p.kill() await p.wait() await asyncio.gather( printer(), *[ monitor( i ) for i in range(len(sleeps)) ] ) import run_tests