1/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Vláknové programování část VIII Lukáš Hejmánek, Petr Holub {xhejtman,hopet}@ics.muni.cz Laboratoř pokročilých síťových technologií PV192 2010­03­23 2/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Přehled přednášky Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited 3/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Úlohy a vlákna Úloha vs. vlákno úloha ­ co se vykonává (Runnable, Callable) vlákno ­ kdo úlohu vykonává (Executor/Future/TPE/...) Oddělení úloh od vláken úloha nesmí předpokládat nic o chování vlákna, které ji vykonává Politika ukončení vs. politika přerušení (příklady povětšinou převzaty z JCiP, Goetz) 4/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Futures vs. CompletionService Problém: máme řadu odložených úloh (Future) a potřebujeme je v pořadí dokončení, nikoli zaslání 1. opakované procházení seznamu a používání get(0, TimeUnit.SECONDS); 2. použijeme CompletionService CompletionService kombinuje Executor a BlockingQueue submit() ­ vkládáme úlohy pomocí take() a poll() ­ vybíráme dokončené úlohy při prázdné frontě dokončných úloh se take() blokuje, poll() vrací null 5/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Futures vs. CompletionService ArrayList stahniSoubory(ArrayList list) { 2 ArrayList ald = new ArrayList(); CompletionService completionService = 4 new ExecutorCompletionService( new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, 6 new LinkedBlockingQueue())); for (final String s : list) { 8 completionService.submit(new Callable() { public FileData call() throws Exception { 10 FileData fd = new FileData(); fd.s = s; fd.data = getFile(s); 12 return fd; } 14 }); } 16 try { for (int i = 0, size = list.size(); i < size; i++) { 18 Future f = completionService.take(); ald.add(f.get()); 20 } } catch (InterruptedException e) { 22 Thread.currentThread().interrupt(); } catch (ExecutionException e) { launderThrowable(e.getCause()); } 24 return ald; 6/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Futures vs. CompletionService public static RuntimeException launderThrowable(Throwable t) { 2 if (t instanceof RuntimeException) return (RuntimeException) t; 4 else if (t instanceof Error) throw (Error) t; 6 else throw new IllegalStateException("Not unchecked", t); 8 } 7/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ukončování a přerušování pro pokročilé Kooperativní ukončování vláken příznakem proměnné přerušením ­ interrupt Thread.stop ­ deprecated Důvody ukončení vlákna uživatelem vyvolané (GUI, JMX) časově omezené úlohy události uvnitř ­ několik vláken hledá řešení paralelně, jedno ho najde externí chyby ukončení aplikace 8/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ukončování a přerušování pro pokročilé Politika ukončování (cancellation policy) vývojářem speci kováno pro každou úlohu (JavaDoc) jak? ­ jak se vyvolává ukončení? kdy? ­ kdy je možné vlákno ukončit? co? ­ co bude třeba udělat před ukončením? Ukončování příznakem a/nebo přerušením? 9/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Přerušení ­ interrupt Mechanismus zasílání zprávy mezi vlákny sémanticky de nováno jen jako signalizace mezi vlákny nastavení příznaku 1 public class Thread { public void interrupt() {...} 3 public boolean isInterrupted() {...} public static boolean interrupted() {...} 5 } Pozor na metodu interrupted() vrátí a vymaže stav příznaku Zpracování přerušení vyhození výjimky InterruptedException předání příznaku dále polknutí příznaku Typické metody na InterruptedException wait, sleep, join blokující operace na omezených frontách (BlockingQueue x.put) 10/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Přerušení ­ interrupt Politiky přerušení speci kováno vývojářem pro každé vlákno standardní chování: ukliď, dej vědět vlastníkovi (TPE) a zmiz nestandardrní chování: není vhodné pro normální úlohy vlákno může potřebovat předat stav interrupted svému TPE úloha by neměla předpokládat nic o politice vlákna, v němž běží předat stav dál buď throw new InterruptedException(); nebo Thread.currentThread().interrupt(); např. pokud je úloha Runnable vlákno/TPE může následně interrupted příznak potřebovat speci kace: kdy?, jak?, další předání? 11/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Přerušení ­ interrupt Kombinace blokujících operací s politikou přerušení a úlohy s ukončením až na konci public Task getNextTask(BlockingQueue queue) { 2 boolean interrupted = false; try { 4 while (true) { try { 6 return queue.take(); } catch (InterruptedException e) { 8 interrupted = true; } 10 } } finally { 12 if (interrupted) Thread.currentThread().interrupt(); } 14 } nesmíme příznak interrupted nastavit před voláním take(), protože by volání hned skončilo 12/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Omezený běh ­ Futures Future má metodu cancel(boolean mayInterruptIfRunnig) mayInterruptIfRunnig = true znamená, že se má běžící úloha přerušit mayInterruptIfRunnig = false znamená, že se pouze nemá spustit, pokud ještě neběží vrací, zda se ukončení povedlo Kdy můžeme použít mayInterruptIfRunnig = true? pokud známe politiku přerušení vlákna pro standardní implementace Executor to je známé a bezpečné 13/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Omezený běh ­ Futures public class FutureCancel { 2 ThreadPoolExecutor taskExec = new ThreadPoolExecutor(1,10,60, TimeUnit.SECONDS, new LinkedBlockingQueue()); 4 public void timedRun (Runnable r, long timeout, TimeUnit unit) throws InterruptedException { 6 Future task = taskExec.submit(r); try { 8 task.get(120, TimeUnit.SECONDS); } catch (ExecutionException e) { 10 throw new RuntimeException(e.getMessage()); } catch (TimeoutException e) { 12 // uloha bude ukoncena nize } 14 finally { // neskodne, pokud ukloha skoncila, 16 // jinak interrupt task.cancel(true); 18 } } 14/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Nepřerušitelná blokování Existují blokování, která nereagují na interrupt Příklady: synchronní soketové I/O v java.io problém: metody read a write na InputStream a OutputStream nereagují na interrupt řešení: zavřít socket, visící čtení/zápis vyhodí SocketException čekání na získání monitoru (intrinsic lock) problém: vlákno čekající na monitor (synchornized) nereaguje na interrupt řešení: neexistuje ,,násilné`` řešení pro monitory, musí se dočkat obejití: explicitní zámky Lock podporují metodu lockInterruptibly 15/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Nepřerušitelná blokování Další vychytávky: synchronní I/O v java.nio přerušení vyhází u všech zablokovaných vláken ClosedByInterruptException, pokud je kanál typu InterruptibleChannel zavření vyhází u všech zablokovaných vláken AsynchronousCloseException, pokud je kanál typu InterruptibleChannel asynchronní I/O při použítí Selector Selector.select vyhodí výjimku ClosedSelectorException, pokud obdrží interrupt 16/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Nepřerušitelná blokování Využití ThreadPoolExecutor.newTaskFor(callable) dostupné od Java 6 vrací RunnableFuture pro danou úlohu přepsání newTaskFor umožňuje vlastní tvorbu RunnableFuture a tudíž přepsat metodu cancel() uzavření synchronních socketů pro java.io statistiky, debugování, atd. lze napsat tak, že si Callable/Runnable dodá vlastní implementaci cancel() http://www.javaconcurrencyinpractice.com/ listings/SocketUsingTask.java 17/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb Problém dlouho běžících vláken vlákna v exekutorech často běží déle, než tvůrce executorů Vlákno by měl zastavovat jeho ,,vlastník`` vlastník vláken není de nován formálně bere se ten, kdo ho vytvořil vlastnictví není transitivní (jako u objektů ­ princip zapouzdření) vlastník by měl poskytovat metody na řízení životního cyklu požadavek na ukončení by měl být signalizován vlastníkovi 18/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb public class LogWriter { 2 private final BlockingQueue queue; private final LoggerThread logger; 4 private volatile boolean shutdownRequested = false; 6 public LogWriter() throws FileNotFoundException { this.queue = new LinkedBlockingQueue(); 8 this.logger = new LoggerThread(new PrintWriter("mujSoubor")); logger.start(); 10 } 12 private class LoggerThread extends Thread { private final PrintWriter writer; 14 private LoggerThread(PrintWriter writer) { 16 super("Logger Thread"); this.writer = writer; 18 } 20 public void run() { try { 22 while (true) writer.println(queue.take()); 24 } catch (InterruptedException ignored) { } finally { 26 writer.close(); } 28 } } 19/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb 1 public void stop() { shutdownRequested = true; 3 logger.interrupt(); } 5 public void log (String msg) throws InterruptedException { 7 queue.put (msg); } Potřeba ukončovat konzumenty i producenty konzument: run() producent: log(String msg) 20/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb public void logLepe (String msg) throws InterruptedException { 2 if (!shutdownRequested) queue.put (msg); 4 else throw new IllegalStateException("logger se ukoncuje"); 6 } Ukončení producenta jakpak zjistíme jeho vlákno? nijak ;-) už je to správně? 21/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb 1 public void logLepe (String msg) throws InterruptedException { if (!shutdownRequested) 3 queue.put (msg); else 5 throw new IllegalStateException("logger se ukoncuje"); } ... není! Race condition složené testování podmínky a volání metody! Složené zamykání testování a rezervace v jednom synchronized bloku konzument testuje, že zpracoval všechny rezervace 22/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb public class SafeLogWriter { 2 private final BlockingQueue queue; private final LoggerThread logger; 4 @GuardedBy("this") private volatile boolean shutdownRequested = false; 6 @GuardedBy("this") private int reservations; ... 1 public void run() { try { 3 while (true) { synchronized (this) { 5 if (shutdownRequested && reservations == 0) break; 7 } String msg = queue.take(); 9 synchronized (this) {--reservations;}; writer.println(msg); 11 } } catch (InterruptedException ignored) { 13 } finally { writer.close(); 15 } } 23/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb public void log (String msg) throws InterruptedException { 2 synchronized (this) { if (shutdownRequested) 4 throw new IllegalStateException("logger se ukoncuje"); ++reservations; 6 } queue.put (msg); 8 } 24/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb ExecutorService proč nepoužít, co je hotovo? shutdown() pohodové ukončení dokončí se zařazené úlohy shutdownNow() vrací seznam úloh, které ještě nenastartovaly problém, jak se dostat k seznamu úloh, které nastartovaly, ale byly ukončeny nemá metodu, která by umožnila dokončit bežící úlohy a nové už nestartovala zapouzdření do vlastního ukončování: exec.shutdown(); exec.awaitTermination(timeout, unit); využití i pro jednoduchá vlákna: newSingleThreadExecutor() 25/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb public class TrackingExecutor extends AbstractExecutorService { 2 private final ExecutorService exec; private final Set tasksCancelledAtShutdown = 4 Collections.synchronizedSet(new HashSet()); ... public List getCancelledTasks() { 2 if (!exec.isTerminated()) throw new IllegalStateException(/*...*/); 4 return new ArrayList(tasksCancelledAtShutdown); } 6 public void execute(final Runnable runnable) { 8 exec.execute(new Runnable() { public void run() { 10 try { runnable.run(); 12 } finally { if (isShutdown() 14 && Thread.currentThread().isInterrupted()) tasksCancelledAtShutdown.add(runnable); 16 } } 18 }); } 26/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Zastavování vláknových služeb Vzor ­ jedovaté sousto ukončování systému producent ­ konzument jedovaté sousto ­ jeden konkrétní typ zprávy funguje pro známý počet producentů konzument umře po požití Nprod otrávených soust lze rozšířit i na více konzumentů každý producent musí do fronty zapsat Nkonz otrávených soust problém s počtem zpráv Nprod Nkonz 27/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ošetření abnormálního ukončení vlákna Zachytávání RunTimeException normálně se nedělá, měla by vyustít v stacktrace potřeba zpracovat, pokud vlákno vykonává úplně cizí kód strategie: zachytit, uložit, pokračovat try {...} catch (...) {...} v případě, že se vlákno o sebe musí postarat samo ukončit a dát vědět vlastníkovi try {...} finally {...} možnost předat Throwable Throwable thrown = null; 2 try {runTask(getTaskFromQueue());} catch (Throwable e) {thrown = e;} 4 finally { threadExited (this, thrown);} 28/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ošetření abnormálního ukončení vlákna UncaughtExceptionHandler aplikace si může nastavit vlastní zpracování nezachycených výjimek pokud není nastaven, vypisuje se stacktrace na System.err 1. Thread.setUncaughtExceptionHandler Java 5.0 per vlákno 2. ThreadGroup Java < 5.0 zavolá se pouze první pro TPE se nastavuje pomocí vlastní ThreadFactory přes konstruktor TPE standardní TPE nechá po nezachycené výjimce ukončit dané vlákno bez UncaughtExceptionHandler mohou vlákna tiše mizet možnost task obalit do dalšího Runnable/Callable vlastní TPE s alternativním afterExecute Propagace nezachycených výjimek do UncaughtExceptionHandler se dostanou pouze úlohy zaslané přes execute() submit() vrací výjimku jakou součást návratové hodnoty/stavu ­ Future.get() 29/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ukončování JVM Normální ukončení (orderly termination) ukončení posledního nedémonického vlákna volání System.exit(); platformově závislé ukončení (SIGINT, Ctrl-C) Abnormální ukončení (abrupt termination) volání Runtime.halt(); platformově závislé ukončení (SIGKILL) Háčky při ukončení (shutdown hooks) Runtime.addShutdownHook předává se implementace vlákna JVM negarantuje pořadí pokud v době ukončování běží jiná vlákna, poběží paralelně s háčky háčky musí být thread-safe: synchronizace např. signalizace ukončení jiným vláknům, mazání dočasných souborů, ... pokud nějaké vlákno počítá se signalizací ukončení při ukončování JVM, může si samo zaregistrovat háček (ale ne z konstruktoru!) použití jednoho velkého háčku: odpadá problém se synchronizací, možnost zajištění de novaného pořadí ukončování komponent 30/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Ukončování JVM Démonická vlákna metoda setDaemon() démonický stav se dědí ukončování JVM: pokud běží jen démonická vlákna, JVM se normálně ukončí neprovedou se bloky finally neprovede se vyčištění zásobníku příklad: garbage collection, čištění dočasné paměťové cache nepoužívat z lenosti! Finalizers týká se objektů s netriviální metodou finalize() obtížné napsat správně musí být synchronizovány není garantováno pořadí výkonnostní penalta obvykle jde nahradit pomocí bloku finally a explicitního uvolnění zdrojů po doběhnutí háčku se spustí nalizers pokud runFinalizersOnExit == true vyhýbat se jim! 31/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Typy úloh pro TPE Nezávislé úlohy ­ ideální Problémy závislost/komunikace úloh zaslaných do jednoho TPE ohraničená velikost TPE jednovláknový executor TPE úlohy citlivé na latenci odpovědi ohraničená velikost TPE dlouho běžící úlohy problém s úlohami využívajícími ThreadLocal recyklace vláken nestejně velké úlohy v jednom TPE 32/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Typy úloh pro TPE Je tohle správně? static ExecutorService exec = Executors.newSingleThreadExecutor(); 2 public static class RenderPageTask implements Callable { 4 public String call() throws Exception { Future header, footer; 6 header = exec.submit(new LoadFileTask("header.html")); footer = exec.submit(new LoadFileTask("footer.html")); 8 String page = renderBody(); return header.get() + page + footer.get(); 10 } 12 private String renderBody() { return " body "; 14 } } 33/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Typy úloh pro TPE ANO 1 ExecutorService mainExec = Executors.newSingleThreadExecutor(); Future task = mainExec.submit(new RenderPageTask()); 3 try { System.out.println("Vysledek: " + task.get()); 5 } catch (InterruptedException e) { e.printStackTrace(); 7 } catch (ExecutionException e) { e.printStackTrace(); 9 } exec.shutdown(); 11 mainExec.shutdown(); 34/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Typy úloh pro TPE NE 1 Future task = exec.submit(new RenderPageTask()); try { 3 System.out.println("Vysledek: " + task.get()); } catch (InterruptedException e) { 5 e.printStackTrace(); } catch (ExecutionException e) { 7 e.printStackTrace(); } 9 exec.shutdown(); 35/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Typy úloh pro TPE Nezávislé úlohy ­ ideální Problémy závislost/komunikace úloh zaslaných do jednoho TPE ohraničená velikost TPE jednovláknový executor TPE úlohy citlivé na latenci odpovědi ohraničená velikost TPE dlouho běžící úlohy problém s úlohami využívajícími ThreadLocal recyklace vláken nestejně velké úlohy v jednom TPE 36/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Velikost TPE Doporučení Javy: NCPU + 1 pro výpočení úlohy Obecněji Nvlken = NCPU UCPU 1 + W C kde UCPU je cílové využití CPU, W je čas čekání, C je výpočetní čas Runtime.getRuntime().availableProcessors(); 37/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Vytváření a ukončování vláken v TPE corePoolSize ­ cílová velikost zásobárny vláken startují se, až jsou potřeba (default policy) prestartCoreThread() ­ nastaruje jedno core vlákno a vrátí boolean, zda se povedlo prestartAllCoreThreads() ­ nastartuje všechna core vlákna a vrátí jejich počet maximumPoolSize ­ maximální velikost zásobárny vláken keepAliveTime ­ doba lelkujícího života od Javy 6: allowCoreThreadTimeOut ­ dovoluje timeout i core vláknům 38/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Správa front v TPE Kdy se množí vlákna v TPE? pokud je fronta plná co se stane, pokud corePoolSize = 0 a používáme neomezenou frontu? Použití synchronní fronty SynchronousQueue není fronta v pravém slova smyslu! synchronní předávání dat mezi úlohami pokud žádné vlákno na předání úlohy nečeká, TPE natvoří nové při dosažení limitu se postupuje podle saturační politiky lze použít při neomezeném počtu vláken (Executors.newCachedThreadPool) nebo pokud je akceptovatelné použití saturační politiky efektivní (čas i zdroje) ­ Executors.newCachedThreadPool je efektivnější než Executors.newCachedThreadPool, který využívá LinkedBlockingQueue implementováno pomocí neblokujícího algoritmu v Java 6, 3× větší výkon než Java 5 39/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Správa front v TPE Použití prioritní fronty task musí implementovat Comparable (přirozené pořadí) nebo Comparator Saturační politiky nastupuje v okamžiku zaplnění fronty nastavuje se pomocí setRejectedExecutionHandler nebo konstruktoru TPE AbortPolicy ­ default, úloha dostane RejectedExecutionException CallerRunsPolicy ­ využítí volajícího vlákna řízení formou zpětné vazby DiscardPolicy ­ vyhodí nově zaslanou úlohu DiscardOldestPolicy ­ vyhodí ,,nejstarší`` úlohu vyhazuje z hlavy front nevhodné pro použití s prioritními frontami pomáhá vytlačit problém do vnějších vrstev: např. pro web server nemůže zavolat další accept ­ spojení čekají v TCP stacku 40/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Správa front v TPE ThreadPoolExecutor tpe = 2 new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(100)); 4 tpe.setRejectedExecutionHandler (new ThreadPoolExecutor.CallerRunsPolicy()); Implementace omezení plnění fronty pomocí semaforu semafor se nastaví na požadovanou velikost fronty + počet běžících úloh 1 @ThreadSafe public class BoundedExecutor { 3 private final Executor exec; private final Semaphore semaphore; 5 public BoundedExecutor(Executor exec, int bound) { 7 this.exec = exec; this.semaphore = new Semaphore(bound); 9 } 41/41 Úlohy a vlákna Ukončování a přerušování ThreadPoolExecutors Revisited Správa front v TPE public void submitTask(final Runnable command) 2 throws InterruptedException { semaphore.acquire(); 4 try { exec.execute(new Runnable() { 6 public void run() { try { 8 command.run(); } finally { 10 semaphore.release(); } 12 } }); 14 } catch (RejectedExecutionException e) { semaphore.release(); 16 } }