# pragma mypy relaxed from typing import \ Coroutine, Generator, Iterator, List, Tuple, Any, Optional from queue import PriorityQueue from time import time, sleep Task = Coroutine[ object, object, object ] class IOPlex: def __init__( self, factory: Any ) -> None: self.tasks: dict[ int, Any ] = {} self.factory = factory self.reply: Optional[ str ] = None self.counter = 0 self.queue: Any = PriorityQueue() class read: def __await__( self ): x = yield (); return x def schedule( self, i: int ) -> None: try: task = self.tasks[ i ] delay = next( task ) item = ( time() + delay / 1000, i ) self.queue.put( item ) except StopIteration: pass def write( self, data: str ) -> None: self.reply = data def connect( self ) -> int: ident = self.counter self.counter += 1 self.tasks[ ident ] = \ self.factory( self.read, self.write ).__await__() next( self.tasks[ ident ] ) return ident def close( self, ident: int ) -> None: del self.tasks[ ident ] def send( self, ident: int, data: str ) -> Optional[ str ]: if ident not in self.tasks: return None try: self.tasks[ ident ].send( data ) except StopIteration: return None r, self.reply = self.reply, None return r import run_tests