1/28 Pokročilé vlasnosti Javy Vláknové programování část V Lukáš Hejmánek, Petr Holub {xhejtman,hopet}@ics.muni.cz Laboratoř pokročilých síťových technologií PV192 2010­03­23 2/28 Pokročilé vlasnosti Javy Přehled přednášky Pokročilé vlasnosti Javy Atomické typy Concurrent Collections Explicitní zamykání Executors, Thread Pools a Futures 3/28 Pokročilé vlasnosti Javy Atomické typy čekání na synchronized monitor vede na přeplánování vlákna atomické proměnné to zvládnou bez přepínání kontextu vyšší výkon pro nízkou až střední míru soutěžení o zámek (lock contention) tzv. wait-free synchronizace Zdroj: Goetz B., Peierls T., Bloch J., Bowbeer J., Holmes D., Lea D. Java Concurrency in Practice. Addison Wesley Professional, 2006 4/28 Pokročilé vlasnosti Javy Atomické typy čekání na synchronized monitor vede na přeplánování vlákna atomické proměnné to zvládnou bez přepínání kontextu vyšší výkon pro nízkou až střední míru soutěžení o zámek (lock contention) tzv. wait-free synchronizace Zdroj: Goetz B., Peierls T., Bloch J., Bowbeer J., Holmes D., Lea D. Java Concurrency in Practice. Addison Wesley Professional, 2006 5/28 Pokročilé vlasnosti Javy Atomické typy podpora v HW compare-and-swap (CAS) CAS(x, y) funkce: porovnej obsah paměti s x a pokud je identický, nahraď jej za y návratová hodnota: úspěch změny (buď jako boolean nebo jako hodnota, kterou má paměť před provedením instrukce) podpora: IA-32, Sparc double compare-and-swap (DCAS/CAS2) funkce: výměna hodnot na dvou místech v paměti na základě původních hodnot jednoduchá implementace atomické Deque lze emulovat pomocí CAS (pomalá) podpora u Motorol 68k double-wide compare-and-swap funkce: výměna hodnot na dvou přilehlých místech v paměti podpora: CMPXCHG8B a CMPXCHG16B na novějších x86 Single compare, double swap funkce: výměna hodnot na dvou místech v paměti v závislosti na jedné původní hodnotě podpora: cmp8xchg16 u Ithania 6/28 Pokročilé vlasnosti Javy Atomické typy podpora v HW load-link/store-conditional (LL/SC) funkce: (1) LL načte hodnotu paměti, (2) SC ji změní pouze pokud se původní hodnota od operace LL nezměnila, jinak selže silnější než CAS ­ řeší i problém ABA podpora: ldl_l/stl_c a ldq_l/stq_c (Alpha), lwarx/stwcx (PowerPC), ll/sc (MIPS), ldrex/strex (ARM version 6 avyšší) fetch-and-add funkce: atomická inkrementace obsahu paměti návratová hodnota: původní hodnota paměti podpora: x86 od 8086 (ADD s prvním operandem speci kujícím místo v paměti, nicméně nevrací původní hodnotu ­ s LOCK pre xem atomické i u více procesorů), XADD od 486 vrací původní hodnotu 7/28 Pokročilé vlasnosti Javy Atomické typy AtomicX z java.util.concurrent AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference Zajištěné atomické aktualizace Podpora od Java 5.0 HW optimalizace CAS instrukce (IA-32, Sparc) podpora v JVM od 5.0 8/28 Pokročilé vlasnosti Javy Využítí atomických typů Návrh algoritmu buď vyžaduje pouze jednu atomickou změnu nebo z první změny musí být odvoditelné ostatní a musí je být schopen dokončit ,,kdokoli`` Kolekce ConcurrentLinkedQueue WaitFreeReadQueue http://www.rtsj.org/specjavadoc/javax/realtime/ WaitFreeReadQueue.html WaitFreeWriteQueue http://www.rtsj.org/specjavadoc/javax/realtime/ WaitFreeWriteQueue.html 9/28 Pokročilé vlasnosti Javy Neblokující seznam: Michael-Scott, 1996 10/28 Pokročilé vlasnosti Javy Neblokující seznam: Michael-Scott, 1996 1 import java.util.concurrent.atomic.AtomicReference; // dle http://www.javaconcurrencyinpractice.com/listings/LinkedQueue.java 3 public class AtomickySeznam { 5 private static class Node { final E polozka; 7 final AtomicReference> next; 9 public Node(E polozka, AtomickySeznam.Node next) { this.polozka = polozka; 11 this.next = new AtomicReference>(next); 13 } } 15 private final AtomickySeznam.Node dummy = 17 new AtomickySeznam.Node(null, null); private final AtomicReference> hlava 19 = new AtomicReference>(dummy); private final AtomicReference> konec 21 = new AtomicReference>(dummy); 11/28 Pokročilé vlasnosti Javy Neblokující seznam: Michael-Scott, 1996 22 public boolean put(E polozka) { 24 AtomickySeznam.Node newNode = new AtomickySeznam.Node(polozka, null); 26 while (true) { AtomickySeznam.Node curkonec = konec.get(); 28 AtomickySeznam.Node konecNext = curkonec.next.get(); if (curkonec == konec.get()) { 30 if (konecNext != null) { // dokoncime rozpracovany stav - posuneme konec 32 konec.compareAndSet(curkonec, konecNext); } else { 34 // pokusime se vlozit if (curkonec.next.compareAndSet(null, newNode)) { 36 // pri uspechu se pokusime posunout konec konec.compareAndSet(curkonec, newNode); 38 return true; } 40 } } 42 } } 44 } 12/28 Pokročilé vlasnosti Javy Problém ABA Problém, jak detekovat změnu A B A podpora HW: LL/SC ,,verzování``: počítadlo změn AtomicStampedReference odkaz + int počítadlo změn AtomicMarkedReference odkaz + boolean indikátor některé algoritmy používají indikátor k označení uzlu v seznamu jako smazaného 13/28 Pokročilé vlasnosti Javy Concurrent Collections optimalizace kolekcí na výkon při paralelních přístupech CopyOnWriteArrayList, CopyOnWriteArraySet optimalizované pro režim čti-často-měň-zřídka CopyOnWriteArraySet obdoba HashSet CopyOnWriteArrayList obdoba ArrayList, na rozdíl od Vector poskytuje složené operace iterace poskytuje pohled na objekt v době konstrukce iterátoru 1 import java.util.concurrent.*; 3 public class CoW { CopyOnWriteArraySet cowAS = new CopyOnWriteArraySet(); 5 CopyOnWriteArrayList cowAL = new CopyOnWriteArrayList(); public void narabaj() { 7 cowAS.addAll(kolekce); cowAS.contains(o); 9 cowAS.clear(); 11 cowAL.addAllAbsent(kolekce); cowAL.addIfAbsent(o); 13 cowAL.retainAll(kolekce); } 15 } 14/28 Pokročilé vlasnosti Javy Concurrent Collections ConcurrentHashMap kolekce optimalizovaná na vyhledávání prvků mnohem lepší výkon v porovnání se synchronizedMap a Hashtable Threads ConcurrentHashMap Hashtable 1 1,00 1,03 2 2,59 32,40 4 5,58 78,23 8 13,21 163,48 16 27,58 341,21 32 57,27 778,41 http://www.ibm.com/developerworks/java/library/j-jtp07233.html úspěšná operace get() obvykle proběhne bez zamykání na iteraci se nezamyká celá kolekce mírně relaxovaná sémantika při získávání prvků je možné najít i prvek, jehož vkládání ještě není dokončeno (nikdy však nesmysl) iterátor může ale nemusí re ektovat změny od té doby, co byl vytvořen synchronizedMap a Hashtable lze nahradit tam, kde se nespoléhá na zamykání celé tabulky 15/28 Pokročilé vlasnosti Javy Concurrent Collections ConcurrentHashMap 1 import java.util.concurrent.ConcurrentHashMap; 3 public class CHT { ConcurrentHashMap cht = new ConcurrentHashMap(10); 5 public void narabaj() { 7 cht.put(klic, objekt); cht.putAll(mapa); 9 cht.putIfAbsent(klic, objekt); cht.containsKey(klic); 11 cht.containsValue(objekt); // take contains() cht.entrySet(); 13 cht.keySet(); cht.values(); 15 cht.clear(); } 17 } 16/28 Pokročilé vlasnosti Javy Explicitní zamykání potřeba jemnějšího zamykání zvýšení výkonu ­ např. paralelizace read-only přístupů potřeba rozšířené funkcionality ReentrantLock ekvivalent synchronized, pouze explicitní rozšířené schopnosti (např. gettery) nezapomenou správně odemknout 1 import java.util.concurrent.locks.ReentrantLock; 3 public class RELock { public static void main(String[] args) { 5 ReentrantLock relock = new ReentrantLock(); relock.lock(); 7 try { Thread.sleep(1000); 9 // kod } catch (InterruptedException e) { 11 } finally { relock.unlock(); 13 } } 15 } 17/28 Pokročilé vlasnosti Javy Explicitní zamykání ReentrantReadWriteLock paralelizace na čtení, exkluzivní přístup na zápis reentrantní zámek jak pro čtení, tak pro zápis politiky: writer preference | fair speci kací v konstruktoru downgrade zámku: získání read zámku před uvolněním write zámku neumožňuje upgrade zámku instrumentace pro monitoring (informace o držení zámků) ­ nikoli pro synchronizaci! možno si naimplementovat vlastní zámky, např. RW zámek s podporou upgrade http://www.jtoolkit.org/articles/ ReentrantReadWriteLock-upgrading.html upgrade je nevýhodný z pohledu výkonu 18/28 Pokročilé vlasnosti Javy Explicitní zamykání 1 import java.util.concurrent.locks.ReentrantReadWriteLock; 3 public class RWLock { boolean cacheValid = false; 5 public void pouzijCache() { // rwlock s fair politikou 7 ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(true); rwlock.readLock().lock(); 9 if (!cacheValid) { rwlock.readLock().unlock(); 11 rwlock.writeLock().lock(); if (!cacheValid) { // znovu zkontroluj, 13 // neumime upgrade bez preruseni // uloz data do cache 15 cacheValid = true; } 17 // rucni downgrade zamku rwlock.readLock().lock(); // jeste drzim na zapis 19 rwlock.writeLock().unlock(); } 21 // pouzij data rwlock.readLock().unlock(); 23 } } 19/28 Pokročilé vlasnosti Javy Explicitní zamykání Conditions čekání na splnění podmínky interface Condition { void await() throws IE; boolean await(long time, TimeUnit unit) throws IE; long awaitNanos(long nanosTimeout) throws IE; void awaitUninterruptibly() boolean awaitUntil(Date deadline) throws IE; void signal(); void signalAll(); } výhody oproti wait()/notify() více podmínek per zámek absolutní a relativní timeouty po návratu se dozvíme, proč jsme se vrátili možnost nepřerušitelného čekání 20/28 Pokročilé vlasnosti Javy Explicitní zamykání 1 import java.util.concurrent.locks.*; 3 public class OmezenyBuffer { Lock lock = new ReentrantLock(); 5 Condition notFull = lock.newCondition(); Condition notEmpty = lock.newCondition(); 7 Object[] items = new Object[100]; int putptr, takeptr, count; 9 public void put(Object x) throws InterruptedException { lock.lock(); 11 try { while (count == items.length) notFull.await(); 13 items[putptr] = x; if (++putptr == items.length) putptr = 0; 15 ++count; notEmpty.signal(); 17 } finally { lock.unlock(); 19 } } 21 public Object take() throws InterruptedException { lock.lock(); 23 try { while (count == 0) notEmpty.await(); 25 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; 27 --count; notFull.signal(); 29 return x; } finally { 31 lock.unlock(); } 33 } } http://www.cs.umd.edu/class/spring2005/cmsc433/lectures/util-concurrent-new.pdf 21/28 Pokročilé vlasnosti Javy Executors, Thread Pools Koncept vykonavatelů kódu: Executors vykonávají se objekty implementující Runnable různé typy Executors ExecutorService přidává schopnost zastavit vykonávání schopnost vykonávat Callable, nikoli pouze Runnable() vracet objekty representované jako Future ThreadPoolExecutor všeobecně použitelný executor, jednoduché API minimální i maximální počet vláken recyklace vláken likvidace nepoužívaných vláken použití si ukážeme u FutureTask 22/28 Pokročilé vlasnosti Javy Runnable vs. Callable Interface Runnable implementuje ,,střeva`` vlákna lze použít s konstruktorem třídy Thread konceptuálně čistější přístup: nerozšiřujeme třídu, kterou vlastně rozšiřovat nechceme použití i v hlavním vlákně public class PrikladRunnable { 2 static class RunnableVlakno implements Runnable { public void run() { 4 System.out.println("Tu je vlakno."); } 6 } 8 public static void main(String[] args) { System.out.print("Startuji vlakno: "); 10 new Thread(new RunnableVlakno()).start(); System.out.println("hotovo."); 12 System.out.println("Spoustim primo v hlavnim vlakne: "); new RunnableVlakno().run(); 14 } } 23/28 Pokročilé vlasnosti Javy Runnable vs. Callable Interface Callable na rozdíl od Runnable může vracet výsledek (typu V) a vyhodit výjimku 1 import java.util.concurrent.Callable; 3 public class PrikladCallable { static class CallableVlakno implements Callable { 5 public String call() throws Exception { return "Retezec z Callable"; 7 } } 9 public static void main(String[] args) { 11 try { String s = new CallableVlakno().call(); 13 System.out.println(s); } catch (Exception e) { 15 System.out.println("Chytil jsem vyjimku"); } 17 } } 24/28 Pokročilé vlasnosti Javy Executors Typy Executorů SingleThreadExecutor sekvenční vykonávání úloh pokud vlákno selže, pokračuje se vykonáváním následujícího ScheduledThreadPool zpožděné či opakované vykonávání vláken FixedThreadPool použivá pevný počet vláken CachedThreadPool vytváří nová vlákna dle potřeby opakovaně používá existující uvolněná vlákna ScheduledExecutorService implmentace spouštění s de novaným zpožděním a opakovaného spouštění http://java.sun.com/j2se/1.5.0/docs/api/java/util/ concurrent/ScheduledExecutorService.html Executors factory implementace vlastních typů Executorů http://java.sun.com/j2se/1.5.0/docs/api/java/util/ concurrent/Executors.html 25/28 Pokročilé vlasnosti Javy Executors import java.util.concurrent.*; 2 import java.util.Random; 4 public class TPE { public static void main(String[] args) { 6 final Random random = new Random(); // forkbomba: ;-) 8 // ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newFixedThreadPool( 10 Runtime.getRuntime().availableProcessors()-1); for (int i = 0; i < 100; i++) { 12 executor.execute(new Runnable() { public void run() { 14 int max = random.nextInt(); for(int j = 0; j < max; j++) { j += 2; j--; } 16 System.out.println("Dobehlo vlakno s max = " + max); } 18 }); } 20 try { Thread.sleep(10000); 22 executor.shutdown(); executor.awaitTermination(1000, TimeUnit.SECONDS); 24 } catch (InterruptedException e) { } 26 } } 26/28 Pokročilé vlasnosti Javy Futures Princip: někdy v budoucnu bude volající potřebovat výsledek výpočtu X v době, kdy si volající řekne o výsledek výpočtu X: (a) výsledek je okamžitě vrácen, pokud je již k dispozici, nebo (b) volající se zablokuje, výsledek se dopočítá a vrátí, volající se odblokuje H. Baker, C. Hewitt, "The Incremental Garbage Collection of Processes". Proceedings of the Symposium on Arti cial Intelligence Programming Languages, SIGPLAN Notices 12. August 1977. podobný koncept D. Friedman. "CONS should not evaluate its arguments". S. Michaelson and R. Milner, editors, Automata, Languages and Programming, pages 257-284. Edinburgh University Press, Edinburgh. Also available as Indiana University Department of Computer Science Technical Report TR44. 1976 27/28 Pokročilé vlasnosti Javy Futures a ThreadPoolExecutor 1 import java.util.concurrent.*; 3 public class Futures { public static class StringCallable implements Callable { 5 public String call() throws Exception { System.out.println("FT: Pocitam."); 7 Thread.sleep(5000); System.out.println("FT: Vypocet hotov."); 9 return "12345"; } 11 } public static void main(String[] args) { 13 ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 8, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()); 15 FutureTask ft = new FutureTask(new StringCallable()); System.out.println("main: Poustim vypocet."); 17 tpe.execute(ft); // alternativa: Future ft = tpe.submit(new StringCallable()); 19 try { System.out.println("main: Chci vysledek."); 21 String s = (String) ft.get(); System.out.println("main: Mam vysledek: " + s); 23 tpe.shutdown(); tpe.awaitTermination(1, TimeUnit.MINUTES); 25 } catch (InterruptedException e) {} catch (ExecutionException e) {} 27 } } 28/28 Pokročilé vlasnosti Javy Programování v reálném čase http://www.rtsj.org/ P. Dibble: Real-Time Java Platform Programming http://www.sun.com/books/catalog/dibble.xml Interoperability with non-RT code, tradeo s in real-time development, and RT issues for the JVM software Garbage collection, non-heap access, physical and ,,immortal`` memory, and constant-time allocation of non-heap memory Priority scheduling, deadline scheduling, and rate monotonic analysis Closures, asynchronous transfer of control, asynchronous events, and timers