Motivation: Large Scale Data Processing MapReduce: Simplified Data Processing on Large Clusters PA154 Jazykové modelování (9.2) Pavel Rychlý pary@fi.muni.cz April 27, 2021 Source: Jeff Dean, Sanjay Ghemawat Google, Inc. December, 2004 https://research.go ogle/pubs/pub62/ Many tasks: Process lots of data to produce other data Want to use hundreds or thousands of CPUs ■ ... but this needs to be easy MapReduce provides: ■ Automatic parallelization and distribution ■ Fault-tolerance ■ I/O scheduling ■ Status and monitoring PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) Programming model Example: Count word occurrences Input & Output: each a set of key/value pairs Programmer specifies two functions: map (in.key, in.value) -> I ist (out-key , i ntermediate.valu e) ■ Processes input key/value pair ■ Produces set of intermediate pairs reduce (out.key, list (intermediate.value )) -> list (out.value) ■ Combines all intermediate values for a particular key ■ Produces a set of merged output values (usually just one) Inspired by similar primitives in LISP and other languages map(String input-key, String input.value ): // input.key: document name // input-value: document contents for each word w in input-value: Emitl ntermediate (w, "1"); reduce (String output-key, Iterator intermediate-values): // output-key : a word // output-values: a list of counts int result = 0; for each v in intermediate-values: result += Parselnt(v); Emit (AsString (result)); Pseudocode: See appendix in paper for real code PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) Model is Widely Applicable MapReduce Programs In Google Source Tree Implementation Overview Example uses: distributed grep distributed sort web link-graph reversal term-vector per host web access log stats inverted index construction document clustering machine learning statistical machine translation Typical cluster: ■ 100s/1000s of 2-CPU x86 machines, 2-4 GB of memory ■ Limited bisection bandwidth ■ Storage is on local IDE disks ■ GFS: distributed file system manages data (SOSP'03) ■ Job scheduling system: jobs made up of tasks, scheduler assigns tasks to machines Implementation is a C++ library linked into user programs PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) Execution Parallel Execution Input í 111 III M j (M)(m) (m) (M) (m) [m^ Intermediate kl :v kl:v k2:v kl:v k3:v k4:v k4:v k5:v k4:v kl:v k3:v Group by Key kl:v,v,v,v k2:v k3:v,v k4:v,v,v k5:v Output R ) ( R ) í R ) I R] (R I . I. I . I . I Reduce Task 1 Map Task 1 t r Map Task 2 | [ Map Task 3 ľ 1 ; : (i) (i) ; ; (m) (m) <£) (Ů) kl:vkl;vk2:v Ii 1 :v k3:v k4;v k4:v k5;v i i k4;v kJ v k3 v Partitioning Function Partitioning Function Partitioning Function kl:v.v,v.v j k.3-v I Reduce Task 2 PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) Task Granularity And Pipelining Fine granularity tasks: many more map tasks than machines Minimizes time for fault recovery ■ Can pipeline shuffling with map execution ■ Better dynamic load balancing Often use 200,000 map/5000 reduce tasks/2000 machines Process Time----------------------> User Program MnpRechiceQ ... wait... Master *\3ilj-'1i tasks to worker machines.. Worker 1 Map J Map 3 1 Worker 2 Map 2 Worker 3 Read 1.1 JRead 1.3 | Read 1.2 Reduce 1 Worker 4 Read 2.1 Read 2.2 | Read 2.3 Reduce 2 MapReduce status: MR Indexer-beta6-large-2003_10_28_00 03 Started FriNov? 09:51:07 2003 --up 0 hi 00 rain 18 sec 323 workers, 0 deaths Type Shards Done Active Input(ME) DonefMB) OutputfMB) Map 13853 0 323 878934 6 1314 4 717 0 Shuffle 500 0 323 717.0 0 0 0 0 Křdurř 500 0 0 00 0.0 0.0 'S /u I s i Ž 30 Reduce Shard Counters Variable Minute Mapped (MB/s) 72.5 Skiffle (MB/s) 0.0 Output (MB/s) 0.0 doc-index-hits 145825686 docs-indexed 506631 dups-tn-index-merge 0 rnr-operator-calls 508192 mr-operator- 506631 PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) MapReduce status: MR_Indexer-betaó4arge-2003_10_28_00_03 MapReduce status: MR_Indexer-betaó-large-2003_10_28_00_03 Started: Fn Nov 7 09:51:07 2003 -- up 0 hr 05 min 07 sec 1707 worker?: 1 deaths Type Shards Done Active Input(MB) Done(M£) OutputCME) Map 13853 1857 1707 878934.6 191995 8 113936 6 Shuffle 500 C 500 113936 6 571137 571137 Reduce 500 C 57113.7 00 0.0 Reduce Shard Shuffle (MB/s) Output (MB/s) doc- itii-lŕi: hit (MB/s) dup s - in -merge call; operator- ulltl'illtS 699.1 349.5 0.0 5004411944 17290135 Started: FnNov7 09:51:07 2003 -- up 0 hr 10 min 18 sec 1707 workers; 1 deaths Type Shards Done Active Input(MB) Done(MB) Output(MB) Map 13853 5354 1707 873934 6 406020.1 2410582 Shuffle 500 0 500 241058 2 196362 5 196362 5 Reduce 500 0 0 196362.5 0 0 0 0 "2 70 Reduce Shard Counters _ Variable Minute Mapped (MB/s) 704.4 i P Shuffle (MB/s) Output (MB/s) doc- 371.9 0.0 500036422S 17300709 dups-in-merge operator- 17342493 calls operator-outputs PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 MapReduce status: MRIndexer-betaó-large-2003 10 28 00 03 Started: Fri Nov 7 09:51:07 2003 - up 0 hr 15 min 31 sec 1707 workers; 1 deaths Started: FnNov7 09:51:07 2003 -- up 0 hr 29 min45 sec 1707 workers; 1 deaths Type Shards Done Active InputtMB) DoneflVIE) OutputlMB) Map 13853 3841 1707 878934 6 6216085 369439 8 Shuttle 500 0 500 369459 8 326986 8 326986 8 Reduce 500 0 0 326986.8 00 0.0 Variable Minute Mapped (MB/s) 706.5 Shuffle (ME/s) 419.2 Output (MB/s) 0.0 doc-index-hits 4982870667 docs-indexed 17229926 dups-in-index-merge 0 operator-calls 17272056 operator-cutouts 17229926 Type Shards Done Active lnput(MB) DonelMBt OutputlMBt Map 13853 138531 0| 8789346 878934.61 523499.2 Shuffle 500 195 305 525499 2 523389 6 523389 6 R e iure 500 0 195| 5233896 2685 2 2742 6 Reduce Shard Variable Minute Mapped (MB/s) 0.3 Shuffle (MB/s) 0.5 Output (MB/s) 45.7 doc-it-..;fi:-:-hi:.t- 231317S 10; docs-indexed 7936 index-merge 0 calkf 1954105 mr-merge-outputs 1954105 PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fri Kov 7 09:51:07 2003 -1707 workers; 1 deaths up 0 hr 31 min 34 s Type Shards Done Active Input(ME) Done(MB) Output(MB) Map 13S53 0 S7S934.Č S7B934.6 523499.2 Shuffle 500 500 0 523499 2 523495.5 523499.5 l-í.c ::-.n: i: 500 0 500 523499 5 133837.8 136929.6 Reduce Shard Mapl.'l d I (MB/s) Shuffle (MB/s) I Output H (MB/s) outputs 0.1 1238.S 0 Started: FnNov7 09:51:07 2003 -171 ~i workers. 1 deaths up Ohr 33 min 22 s. Type Shards Done Active Input(MB) Done(MB) Outpnt(MB) Map 13353 13S53 0 878034 6 373934 6 523499 2 Shuffle 500 530 0 523499 2 523499 5 523499.5 jLc ;-.],: i: 500 0 500 523499 5 263283.3 269351.2 Mapped (MB/s) Shuffle (MB/s) merge-i outputs PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) MapReduce status: MR_Indexer-betaó-large-2003_10_28_00_03 MapReduce status: MRJndexer-beta6-large-2003_10_28_00_03 Started Fn Nov 7 09:51:07 2003 - up 0 hr 35 n 1707 workers; 1 deaths Type Shards Done Activ. Input(M£) Done(MB) Output(ME) Map 13853 13853 0 878934 6 878934.6 523499 2 Shuffle 500 5O0 0 523499 2 52349S.5 523499.5 5r llff 500 0 500 523499 5 390447.6 399457.2 Mapped (ME/s) Shuffle (MB/s) I Output (MB/s) doc- uide.l-hits dups-in- ■ merge- 51640600 OUtDUtS Started FnNov7 09:51:07 2003 -- up 0 hr 37 min 01 sec 1707 workers; 1 deafcis Type Shards Done Activ. Input(ME) Done(MB) Output(MB) Map 13853 13853 0 8789346 378934 6 523499.2 Shuffle 500 500 D 523499 2 520468 6 520468 6 ] ŕ 1 irr 500 94 520468 6 512265.2 514373.3 Reduce Shard Mapped (MB/s) Output (MB/s) doc- index-hits Shuffle (MB/s) in d c-:-:c d dup s-in- PA154 Jazykové modelování (9.2) PA154 Jazykové modelování (9.2) ^544^^294748075887878^ 22254551 ^99999999999999^^ ^551515181^18194942727 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 MapReduce status: MRIndexer-betaó-large-2003 10 28 00 03 Started: Fn Nov 7 09:51:07 2003 -- up 0 hr 33 mm 56 sec 1707 workers; 1 deaths Type Shards Done Active Input