#define _POSIX_C_SOURCE 200809L #include /* read, write, close */ #include /* socket, connect */ #include /* getaddrinfo */ #include /* openat, O_* */ #include /* strlen */ #include /* calloc, free */ #include #include /* atomic_int */ #include #include #include /* Uvažme situaci, kdy potřebujeme provést větší přenos dat * z nějakého vzdáleného systému. To typicky zabere nějaký čas, a * budeme-li data stahovat synchronně, náš program (bez dalších * opatření) nebude schopen reagovat na žádné jiné události. * * V podobných případech se může hodit podobnou dlouhotrvající úlohu * delegovat do samostatného vlákna, aby byla provedena asynchronně * – to bude fungovat jak pro úlohy vstupně/výstupní, tak pro úlohy * výpočetní. Podobně jako v předchozí kapitole budeme podobné * asynchronní úkoly reprezentovat pomocí neprůhledného ukazatele. * Úkol vytvoříme podprogramem ‹fetch_start› a na jeho dokončení * vyčkáme pomocí ‹fetch_await›. */ /* Podprogram ‹fetch_start› spustí stahování dat ze vzdáleného * systému (zadaného jménem počítače a číslem nebo jménem portu) – * podprogram bude používat protokol HTTP, ale není těžké ho * přizpůsobit nějakému jinému. Požadavek bude podprogramu * ‹fetch_start› předán jako připravený řetězec. S implementací ale * ještě chvíli počkáme – nejprve potřebujeme definovat strukturu * ‹fetch_handle›. */ void *fetch_start( const char *host, const char *port, const char *request, int dir_fd, const char *file ); struct fetch_handle { /* Identifikátor vlákna, kterému tato řídicí struktura patří. Je * zejména potřebný pro synchronizaci ukončení pracovního * vlákna službou ‹pthread_join›. */ pthread_t tid; /* Dále si do struktury uložíme informace o požadavku, které * volající předal podprogramu ‹fetch_start› – prakticky veškeré * zpracování bude probíhat asynchronně, v pracovním vlákně. */ const char *host, *port, *request, *file; int dir_fd; /* Konečně si zde nachystáme položky pro uložení výsledku. Krom * informace o úspěchu (nebo neúspěchu) zde máme příležitost * uložit podrobnější informace o případné chybě. Pro účely * ukázky si zde uložíme hodnotu ‹errno›, i když v realistickém * programu bychom potřebovali informací o něco více. Pozor: * položka struktury se nesmí jmenovat ‹errno›, protože se může * jednat o makro. */ int result; int error; }; void *fetch_thread( void * ); void *fetch_start( const char *host, const char *port, const char *request, int dir_fd, const char *file ) { /* V případě vláken je situace oproti procesům poněkud jiná – * úklid se nijak zásadně nekomplikuje ani v situaci, kdy * alokace zdrojů proběhne až v samotném pracovním vlákně. * Naopak má takové uspořádání důležitou výhodu – některé * inicializační operace mohou trvat relativně dlouho, např. * proto, že komunikují po síti. Odsunem těchto činností do * pracovního vlákna se tak zlepší latence hlavního programu. * * Nicméně jednu inicializační akci musíme provést v hlavním * vlákně – totiž musíme alokovat řídicí strukturu budoucího * vlákna. */ struct fetch_handle *fh = calloc( 1, sizeof( struct fetch_handle ) ); fh->host = host; fh->port = port; fh->request = request; fh->dir_fd = dir_fd; fh->file = file; fh->result = -1; /* Nyní je řídicí struktura vyplněna a můžeme nastartovat * pracovní vlákno. Nepodaří-li se to, vrátíme nulový ukazatel, * všechny další chyby již bude řešit samotné pracovní vlákno * a/nebo podprogram ‹fetch_await›. */ if ( pthread_create( &fh->tid, NULL, fetch_thread, fh ) != 0 ) { free( fh ); return NULL; } return fh; } /* Podprogram ‹fetch_thread› představuje pracovní vlákno, které * provede veškerou práci spojenou s překladem jména, navázáním * spojení, vytvořením souboru a konečně samotnou komunikací se * vzdáleným serverem. Všechny tyto části již znáte z dřívějších * kapitol, nebude ale na škodu si je zopakovat. */ void *fetch_thread( void *handle ) { struct fetch_handle *fh = handle; struct addrinfo *res = NULL; int server_fd = -1, file_fd = -1; /* Nejprve nastavíme výsledek na hodnotu -1, aby se nám lépe * zapisoval kód, který ošetřuje chyby. Případný úspěch * poznamenáme až na závěr. */ fh->result = -1; /* Adresu vzdáleného stroje přeložíme známým knihovním * podprogramem ‹getaddrinfo›. Na přesném typu adresy nám * nezáleží – výběr ponecháme na operačním systému. */ if ( getaddrinfo( fh->host, fh->port, NULL, &res ) != 0 ) return NULL; /* Podle výsledků přichystáme socket a navážeme spojení. */ if ( ( server_fd = socket( res->ai_family, SOCK_STREAM, 0 ) ) == -1 ) goto err; if ( connect( server_fd, res->ai_addr, res->ai_addrlen ) == -1 ) goto err; /* Na vytvořeném spojení odešleme požadavek. Soubor prozatím * otevírat nebudeme, protože se jedná o potenciálně * destruktivní akci – je lepší provést co nejvíce práce * v režimu, kdy můžeme bez větších důsledků z celé operace * „vycouvat“ – prozatím jsme pouze spotřebovali nějaké zdroje, * ale jinak jsme žádné nezvratné akce neprovedli. */ if ( send( server_fd, fh->request, strlen( fh->request ), 0 ) == -1 ) goto err; /* Začneme přijímat odpověď – pro jednoduchost se nebudeme * zabývat obsahem hlaviček a budeme zapisovat pouze tělo * odpovědi. Samozřejmě by bylo více realistické hlavičky * zpracovat, tím jsme se ale zabývali jinde. */ /* Musíme nicméně nalézt oddělovač – sekvenci "\r\n\r\n" – po * kterém až následuje tělo, které chceme zapsat. To provedeme * v relativně jednoduchém cyklu, který není závislý na * velikosti jednotlivých čtení ze socketu – data zpracovává po * jednom bajtu. Cyklus skončí jakmile oddělovač nalezneme * (‹matches == 4›) nebo narazíme na chybu nebo konec spojení * (což také považujeme za chybu, protože oddělovač je povinný, * i kdyby bylo tělo prázdné). */ int matches = 0; int index, bytes; char buffer[ 13 ]; while ( matches < 4 && ( bytes = read( server_fd, buffer, sizeof buffer ) ) > 0 ) { for ( index = 0; index < bytes && matches < 4; ++index ) if ( ( matches % 2 == 0 && buffer[ index ] == '\r' ) || ( matches % 2 == 1 && buffer[ index ] == '\n' ) ) ++ matches; else matches = 0; } if ( matches < 4 ) /* bytes == -1 implies matches < 4 */ goto err; /* malformed reply */ /* Nyní konečně vytvoříme nebo přepíšeme soubor, do kterého * budeme zbývající data ukládat. Zejména v případě, že tento * soubor již existoval, se jedná o nezvratnou akci. Do * vytvořeného souboru zapíšeme zbytek posledního přenosového * okna (po nalezeném oddělovači) a pak už pouze přímočarým * způsobem kopírujeme data. */ int flags = O_APPEND | O_TRUNC | O_WRONLY | O_CREAT; if ( ( file_fd = openat( fh->dir_fd, fh->file, flags, 0666 ) ) == -1 ) goto err; if ( write( file_fd, buffer + index, bytes - index ) == -1 ) goto err; while ( ( bytes = read( server_fd, buffer, sizeof buffer ) ) > 0 ) if ( write( file_fd, buffer, bytes ) == -1 ) goto err; /* Tím je asynchronní operace ukončena. Uvolníme lokální zdroje, * úspěch (nebo neúspěch) poznamenáme do struktury * ‹fetch_handle› a ukončíme vlákno příkazem ‹return›. Řízení * bude pokračovat podprogramem ‹fetch_await›, jakmile jej * hlavní vlákno aktivuje. */ fh->result = 0; err: fh->error = errno; if ( res ) freeaddrinfo( res ); if ( server_fd != -1 ) close( server_fd ); if ( file_fd != -1 && close( file_fd ) == -1 ) fh->result = -1; return NULL; } /* Poslední částí skládačky je podprogram ‹fetch_await›, který vyčká * na ukončení pracovního vlákna popsaného předanou strukturou * ‹fetch_handle›. Zároveň z této vyzvedne informace o výsledku * (úspěch nebo neúspěch a případně hodnotu ‹errno›) a uvolní jak * zdroje spojené s vláknem, tak paměť ‹handle›. */ int fetch_await( void *handle ) { struct fetch_handle *fh = handle; int rv = -1; if ( pthread_join( fh->tid, NULL ) != 0 ) goto err; rv = fh->result; errno = fh->error; err: free( handle ); return rv; } /* Použití demonstrujeme stažením webové stránky * ‹http://neverssl.com› do souboru ‹zt.neverssl.txt›. Soubor si * můžete otevřít a srovnat s tím, co vidíte v internetovém * prohlížeči. Můžete si také zkusit stáhnout několik souborů * souběžně (násobným voláním ‹fetch_start› s různými parametry). */ int main( void ) /* demo */ { const char *file = "zt.neverssl.txt"; int dir_fd, file_fd; void *fh; char buffer[ 8 ]; if ( ( dir_fd = open( ".", O_DIRECTORY ) ) == -1 ) err( 1, "opening working directory" ); if ( !( fh = fetch_start( "neverssl.com", "80", "GET / HTTP/1.0\r\n\r\n", dir_fd, file ) ) ) err( 1, "starting asynchronous download" ); if ( fetch_await( fh ) == -1 ) err( 1, "waiting for asynchronous download" ); if ( ( file_fd = openat( dir_fd, file, O_RDONLY ) ) == -1 ) err( 1, "opening %s", file ); if ( read( file_fd, buffer, sizeof( buffer ) ) == -1 ) err( 1, "reading %s", file ); assert( memcmp( buffer, "", 6 ) == 0 ); close( file_fd ); close( dir_fd ); return 0; }