1/41 Další nástroje pro synchronizaci Futexy Vláknové programování část III Lukáš Hejmánek, Petr Holub {xhejtman,hopet}@ics.muni.cz Laboratoř pokročilých síťových technologií PV192 2011–03–17 2/41 Další nástroje pro synchronizaci Futexy Přehled přednášky Další nástroje pro synchronizaci Futexy 3/41 Další nástroje pro synchronizaci Futexy Další nástroje pro synchronizaci 4/41 Další nástroje pro synchronizaci Futexy Spin locks • Klasické zámky (mutexy) používají systémové volání futex(). • Podpora jádra pro NPTL implementaci POSIX threads. • Mutexy používají systémová volání ⇒ nutnost přepnutí kontextu. • Zámky typu spin jsou implementovány kompletně v user space. • Nemusí se přepínat kontext. • Za cenu busy loopu při pokusu zamknout zámek (Vlákno se cyklicky dotazuje, zda je možno zámek zamknout – spinning). • Jsou situace, kdy přepnutí kontextu trvá déle než busy loop pro zamčení. • Kdy je vhodné použít spin locks? • Při velmi krátké kritické sekci (typicky zvýšení/snížení proměnné). • Nedojde-li k přepnutí kontextu jinou cestou (máme-li více vláken než procesorů, spin lock neurychlí běh). • Ne všechny implementace POSIX threads poskytují spin locks! 5/41 Další nástroje pro synchronizaci Futexy Spin locks 1 void spin_lock(volatile int *lock) 2 { 3 __sync_synchronize(); 4 while(! __sync_bool_compare_and_swap(lock, 0, 1)); 5 } 6 7 void spin_unlock(volatile int *lock) 8 { 9 *lock = 0; 10 __sync_synchronize(); 11 } 6/41 Další nástroje pro synchronizaci Futexy Spin locks 1 void spin_lock(volatile int *lock) 2 { 3 __sync_synchronize(); 4 while(! __sync_bool_compare_and_swap(lock, 0, 1)); 5 } 6 7 void spin_unlock(volatile int *lock) 8 { 9 *lock = 0; 10 __sync_synchronize(); 11 } • Je zde ABA problém? 7/41 Další nástroje pro synchronizaci Futexy Spin locks • Datový typ pthread_spin_t. • Inicializace pthread_spin_init (Inicializaci je vhodné provádět ještě před vytvořením vlákna). • Zamykání/odemykání • pthread_spin_lock • pthread_spin_unlock • Zrušení zámku pthread_spin_destroy. 8/41 Další nástroje pro synchronizaci Futexy Příklad 1 #include 2 #include 3 #include 4 5 int x=0; 6 7 pthread_spinlock_t x_lock; 8 9 void * 10 foo(void *arg) 11 { 12 int i; 13 while(x == 0); 14 for(i = 0; i < 100000000; i++) { 15 pthread_spin_lock(&x_lock); 16 x++; 17 pthread_spin_unlock(&x_lock); 18 } 19 printf("%d\n", x); 20 return NULL; 21 } 9/41 Další nástroje pro synchronizaci Futexy Příklad 22 int 23 main(void) 24 { 25 pthread_t t1, t2; 26 27 pthread_spin_init(&x_lock, 0); 28 pthread_create(&t1, NULL, foo, NULL); 29 pthread_create(&t2, NULL, foo, NULL); 30 x=1; 31 pthread_join(t1, NULL); 32 pthread_join(t2, NULL); 33 pthread_spin_destroy(&x_lock); 34 return 0; 35 } 10/41 Další nástroje pro synchronizaci Futexy Doba běhu příkladu • Test na 2 procesorovém systému. • V případě 2 vláken: • Za použití mutexů: 29 sec • Za použití spinů: 11 sec • V případě 3 vláken: • Za použití mutexů: 28 sec • Za použití spinů: 29 sec 11/41 Další nástroje pro synchronizaci Futexy Bariéry • Bariéry jsou v podstatě místa setkání. • Bariéra je místo, kde se vlákna setkají. • Bariéra zablokuje vlákno do doby než k bariéře dorazí všechna vlákna. • Příklad: • Vláknové násobení matic: M × N × O × P • Každé vlákno násobí a sčítá příslušný sloupec a řádek. • Po vynásobení M × N se vlákna setkají u bariéry. • Vynásobí předchozí výsledek ×O, opět se setkají u bariéry. • Dokončí výpočet vynásobením výsledku ×P. • Ne všechny implementace POSIX threads poskytují bariéry! 12/41 Další nástroje pro synchronizaci Futexy Bariéry barrier_init() pthread_create() barrier_wait() barrier_wait() barrier_wait() continue... 13/41 Další nástroje pro synchronizaci Futexy Bariéry • Datový typ pthread_barrier_t. • Inicializace pthread_barrier_init() (Inicializaci je vhodné provádět ještě před vytvořením vlákna). • Při inicializaci specifikujeme, pro kolik vláken bude bariéra sloužit. • Zastavení na bariéře pthread_barrier_wait(). • Zrušení bariéry pthread_barrier_destroy. 14/41 Další nástroje pro synchronizaci Futexy Příklad bariéry 1 #include 2 #include 3 #include 4 5 pthread_barrier_t barrier; 6 7 void * 8 foo(void *arg) { 9 int slp = (int)arg; 10 printf("Working..\n"); 11 sleep(slp); 12 printf("Waiting on barrier\n"); 13 pthread_barrier_wait(&barrier); 14 printf("Synchronized\n"); 15 return NULL; 16 } 15/41 Další nástroje pro synchronizaci Futexy Příklad bariéry 17 int 18 main(void) 19 { 20 pthread_t t1, t2; 21 22 pthread_barrier_init(&barrier, NULL, 2); 23 pthread_create(&t1, NULL, foo, (void*)2); 24 pthread_create(&t2, NULL, foo, (void*)4); 25 26 pthread_join(t1, NULL); 27 pthread_join(t2, NULL); 28 pthread_barrier_destroy(&barrier); 29 return 0; 30 } 16/41 Další nástroje pro synchronizaci Futexy Read Write zámky • Read Write zámky dovolují násobné čtení ale jediný zápis. • Příklad: • Několik vláken čte nějakou strukturu (velmi často!). • Jedno vlákno ji může měnit (velmi zřídka!). • Pozorování: • Je zbytečné strukturu zamykat mezi čtecími vlákny Nemohou ji měnit a netvoří tedy kritickou sekci. • Je nutné strukturu zamknout, mění-li ji zapisovací vlákno V této chvíli nesmí strukturu ani nikdo číst (není změněna atomicky). • Nastupují Read Write zámky. • Pravidla: • Není-li zámek zamčen v režimu Write, může být libovolněkrát zamčen v režimu Read. • Je-li zámek zamčen v režimu Write, nelze jej už zamknout v žádném režimu. • Je-li zámek zamčen v režimu Read, nelze jej zamknout v režimu Write. • Opět ne všechny implementace POSIX threads implementují RW zámky (korektně)! 17/41 Další nástroje pro synchronizaci Futexy RW zámky • Datový typ pthread_rwlock_t. • Inicializace pthread_rwlock_init() (Inicializaci je vhodné provádět ještě před vytvořením vlákna). • Zamknutí v režimu Read pthread_rwlock_rdlock(). • Zamknutí v režimu Write pthread_rwlock_wrlock(). • Opakované zamčení jednoho zámku stejným vláknem skončí chybou EDEADLK. Není možné povýšít Read zámek na Write zámek a naopak. • Odemknutí v libovolném režimu pthread_rwlock_unlock() Pthreads nerozlišují odemknutí dle režimů, některé implementace vláken párují rdlock s příslušným rdunlock, stejně tak pro wrlock. • Zrušení rw zámku pthread_rwlock_destroy. 18/41 Další nástroje pro synchronizaci Futexy Příklad 1 #include 2 #include 3 #include 4 5 struct x_t { 6 int a; 7 int b; 8 pthread_rwlock_t lock; 9 }; 10 11 struct x_t x; 12 13 int quit = 0; 14 15 pthread_barrier_t start; 19/41 Další nástroje pro synchronizaci Futexy Příklad 16 void * 17 reader(void *arg) 18 { 19 int n = (int)arg; 20 pthread_barrier_wait(&start); 21 22 while(!quit) { 23 pthread_rwlock_rdlock(&x.lock); 24 if((x.a + x.b)%n == 0) 25 printf("."); 26 else 27 printf("+"); 28 pthread_rwlock_unlock(&x.lock); 29 fflush(stdout); 30 sleep(1); 31 32 } 33 return NULL; 34 } 20/41 Další nástroje pro synchronizaci Futexy Příklad 35 36 void * 37 writer(void *arg) 38 { 39 int i; 40 pthread_barrier_wait(&start); 41 for(i=0; i < 10; i++) { 42 pthread_rwlock_wrlock(&x.lock); 43 x.a = i; 44 x.b = (i % 2)+1; 45 pthread_rwlock_unlock(&x.lock); 46 sleep(5); 47 } 48 quit = 1; 49 return NULL; 50 } 21/41 Další nástroje pro synchronizaci Futexy Příklad 52 53 int 54 main(void) 55 { 56 pthread_t t1, t2, t3; 57 58 x.a = 1; 59 x.b = 2; 60 pthread_rwlock_init(&x.lock, 0); 61 pthread_barrier_init(&start, NULL, 3); 62 pthread_create(&t1, NULL, reader, (void*)2); 63 pthread_create(&t2, NULL, reader, (void*)3); 64 pthread_create(&t3, NULL, writer, NULL); 65 pthread_join(t1, NULL); 66 pthread_join(t2, NULL); 67 pthread_join(t3, NULL); 68 pthread_rwlock_destroy(&x.lock); 69 pthread_barrier_destroy(&start); 70 return 0; 71 } 22/41 Další nástroje pro synchronizaci Futexy Problémy RW zámků • Nebezpečí stárnutí zámků. • Pokud je zamčená část kódu vykonávána déle než nezamčená, nemusí se nikdy podařit získat některý ze zámků. • V předchozím příkladě nesmí být sleep() v zamčené části kódu! 23/41 Další nástroje pro synchronizaci Futexy Try varianty synchronizace • Pomocí try variant volání lze zjistit, zda vstup do kritické sekce je volný či nikoli. • Funkce atomicky zkusí provést synchronizaci (např. zamknout zámek). • V případě neúspěchu není funkce blokující, ale okamžitě provede návrat. • Neúspěch je signalizován návratovým kódem funkce (dle manuálové stránky, pro jednotlivá volání se může lišit!). • Try varianty: • pthread_mutex_trylock() • pthread_spin_trylock() • pthread_rwlock_tryrdlock() • pthread_rwlock_trywrlock() • sem_trywait() 24/41 Další nástroje pro synchronizaci Futexy Příklad try zámku 1 #include 2 #include 3 #include 4 5 pthread_mutex_t lock; 6 7 void 8 foo(void) 9 { 10 if(pthread_mutex_trylock(&lock) == EBUSY) { 11 printf("Cannot acquire the lock right now\n"); 12 } else { 13 printf("Locked\n"); 14 } 15 } 16 17 int 18 main() 19 { 20 pthread_mutex_init(&lock, NULL); 21 22 foo(); 23 24 foo(); 25 } 25/41 Další nástroje pro synchronizaci Futexy Timed varianty synchronizace • Nástroje zabraňující „věčnému“ čekání. • Příklad: • Jak ukončit vlákno čekající na zámek pomocí globální proměnné? • K většině blokujících synchronizačních rozhraní existují ekvivalentní rozhraní s časovým omezením. • Po vypršení časového omezení je vrácena chyba návratovým kódem (dle manuálové stránky, pro jednotlivá volání se může lišit!). • Timed varianty: • pthread_cond_timedwait() • pthread_mutex_timedlock() • pthread_rwlock_timedrdlock() • pthread_rwlock_timedwrlock() • sem_timedwait() • Ne všechny implementace poskytují timed varianty. 26/41 Další nástroje pro synchronizaci Futexy Příklad timed zámku 1 #include 2 #include 3 #include 4 #include 5 6 pthread_mutex_t lock; 7 8 void 9 foo(void) 10 { 11 struct timespec tsp; 12 13 tsp.tv_sec = time(NULL)+5; 14 tsp.tv_nsec = 0; 15 if(pthread_mutex_timedlock(&lock, &tsp) == ETIMEDOUT) { 16 printf("Timeout expired\n"); 17 } else { 18 printf("Locked\n"); 19 } 20 } 21 22 int 23 main() 24 { 25 pthread_mutex_init(&lock, NULL); 26 foo(); 27 foo(); 28 } 27/41 Další nástroje pro synchronizaci Futexy Pojmenované semafory • Semafor nemusí být nutně jen paměťový objekt. • Semaforem může být „soubor“. • Zůstává zachován datový typ sem_t. • Semafor může být sdílen i mezi procesy. • Založení nebo otevření semaforu sem_open(). • Otevření či založení souboru se podobá volaní open(), akceptuje příznaky O_CREAT, O_EXCL. • Jméno semaforu není soubor ve filesytému, ale je pouze virtuálním jménem. • Zavření semaforu sem_close(). • Zrušení semaforu sem_unlink(). 28/41 Další nástroje pro synchronizaci Futexy Příklad pojmenovaného semaforu 1 #include 2 #include 3 #include 4 #include 5 6 int 7 main() 8 { 9 sem_t* sema = sem_open("/mysem", O_CREAT, 0644, 0); 10 11 if(sema == SEM_FAILED) { 12 perror("Cannot create semaphore /mysem"); 13 return 1; 14 } 15 16 sem_post(sema); 17 sem_wait(sema); 18 19 sem_close(sema); 20 sem_unlink("/mysem"); 21 22 return 0; 23 } 29/41 Další nástroje pro synchronizaci Futexy Jednorázové zavolání funkce • Funkce typu inicializace chceme zavolat jen jednou. • Konstrukce s příznakem, zda inicializace ještě nebyla provedena a následná inicializace je race condition. • Zamykaní zbytečně snižuje paralelismus. • pthread_once() zavolá jednou danou funkci. 30/41 Další nástroje pro synchronizaci Futexy Příklad jednorázového zavolání 1 #include 2 #include 3 4 5 pthread_once_t once_init = PTHREAD_ONCE_INIT; 6 char initialized=0; 7 8 void 9 init(void) 10 { 11 /* do initialization */ 12 printf("Initialized\n"); 13 initialized = 1; 14 } 15 16 int 17 main(void) 18 { 19 if(!initialized) { 20 pthread_once(&once_init, init); 21 } 22 return 0; 23 } 31/41 Další nástroje pro synchronizaci Futexy Futexy • Synchronizační nástroj Linuxu • Systémové volání long sys_futex(void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) • Operace: • FUTEX_WAIT pozastaví vlákno pokud obsah paměťového místa addr1 je stejný jako val1, v opačném případě je vrácena chyba EWOULDBLOCK. Vlákno pokračuje v běhu, obdrželo-li signál. • FUTEX_WAKE Vzbudí jedno nebo více vláken (počet udává proměnná val1). Kernel nesleduje prioritu čekajících vláken. • FUTEX_CMP_REQUEUE Podobně jako WAKE, vzbudí daný počet vláken a daný počet z ostatních vláken přeřadí do fronty čekaní z addr1 na addr2. To vše za předpokladu, že *addr1 má stejnou hodnotu jako val3. • FUTEX_WAKE_OP 32/41 Další nástroje pro synchronizaci Futexy Futexy • FUTEX_WAKE_OP 1 int oldval = *(int*)addr2; 2 *(int*)addr2 = oldval OP OPARG; 3 futex_wake(addr1, val1); 4 if(oldval CMP CMPARG) 5 futex_wake(addr2, (int)timeout); 6 7 /* OP, OPARG, CMP, CMPARG jsou kodovane ve val3 */ 33/41 Další nástroje pro synchronizaci Futexy Mutex pomocí futexu • futex_wait(int *val, int val) { return syscall(SYS_futex, val, FUTEX_WAIT, 1, NULL, NULL, 0) } • futex_wake(int *val, 1) { return syscall(SYS_futex, val, FUTEX_WAKE, 1, NULL, NULL, 0) } • val = 0 nezamčeno • val != 0 zamčeno 34/41 Další nástroje pro synchronizaci Futexy 1 volatile int val=0; 2 3 void lock() { 4 int c; 5 while((c=atomic_inc(val))!=0) 6 futex_wait(&val, c+1); 7 } 8 9 void unlock() { 10 val = 0; 11 futex_wake(&val, 1); 12 } 35/41 Další nástroje pro synchronizaci Futexy Problémy mutexu • unlock() vždy používá syscall • Kdy je to zbytečné? • Co se stane, když 2 vlákna vstoupí do lock() a budou ho provádět paralelně? • Stačí nám proměnná typu int? 36/41 Další nástroje pro synchronizaci Futexy Mutex pomocí futexu verze 2 • cmpxchg(int var, int old, int new): tmp = var; if(var==old) var=new; return tmp; • val = 0 nezamčeno • val = 1 zamčeno a nikdo nečeká • val = 2 zamčeno a někdo čeká 37/41 Další nástroje pro synchronizaci Futexy 1 volatile int val=0; 2 3 void lock() { 4 int c; 5 /* try lock */ 6 if((c = cmpxchg(val, 0, 1))!=0) { 7 /* already locked */ 8 do { 9 /* c==2 -> somebody is waiting already */ 10 if(c==2 || cmpxchg(val, 1, 2)!=0) 11 futex_wait(&val, 2); 12 } while((c = cmpxchg(val, 0, 2))!=0); 13 /* why val?0:val=*2*? */ 14 } 15 } 16 17 void unlock() { 18 if(atomic_dec(val) != 1) { 19 val = 0; 20 futex_wake(&val, 1); 21 } 22 } 38/41 Další nástroje pro synchronizaci Futexy Problémy mutexu • Zámek bez soupeření mutex mutex v2 atomic op 1 1 lock futex syscall 0 0 atomic op 0 1 unlock futex syscall 1 0 • Zámek se soupeřením mutex mutex v2 atomic op 1 + 1 2+1 3+2 lock futex syscall 1 + 1 1 + 1 atomic op 0 1 unlock futex syscall 1 1 39/41 Další nástroje pro synchronizaci Futexy Mutex pomocí futexu verze 3 1 volatile int val=0; 2 3 void lock() { 4 int c; 5 if((c = cmpxchg(val, 0, 1))!=0) { 6 if(c!=2) 7 c = xchg(val,2); 8 while(c!=0) { 9 futex_wait(&val, 2); 10 c = xchg(val, 2); 11 } 12 } 13 } 14 15 void unlock() { 16 if(atomic_dec(val) != 1) { 17 val = 0; 18 futex_wake(&val, 1); 19 } 20 } 40/41 Další nástroje pro synchronizaci Futexy Finální řešení • Zámek bez soupeření mutex mutex v2 mutex v3 atomic op 1 1 1 lock futex syscall 0 0 0 atomic op 0 1 1 unlock futex syscall 1 0 0 • Zámek se soupeřením mutex mutex v2 atomic op 1 + 1 2+1 3+2 1 2 + 1 lock futex syscall 1 + 1 1 + 1 1 + 1 atomic op 0 1 1 unlock futex syscall 1 1 1 41/41 Další nástroje pro synchronizaci Futexy Výkon různých variant zamykání Bez zámků 0.39s lock instrukce 5.72s Futex verze 2 28.93s Futex verze 3 22.22s Pthread mutex (un)lock 52.31s