MUNI FI MapReduce: Simplified Data Processing on Large Clusters PA154 Language Modeling (8.3) Pavel Rychlý pary@fi.muni.cz April 16, 2024 Source: Jeff Dean, Sanjay Ghemawat Google, Inc. December, 2004 https://research.google/pubs/pub62/ Pavel Rychlý • MapReduce • April 16, 2024 1 /32 Motivation: Large Scale Data Processing Many tasks: Process lots of data to produce other data Want to use hundreds or thousands of CPUs ■ ... but this needs to be easy MapReduce provides: ■ Automatic parallelization and distribution ■ Fault-tolerance ■ I/O scheduling ■ Status and monitoring Pavel Rychlý • MapReduce • April 16, 2024 2/32 Programming model Input & Output: each a set of key/value pairs Programmer specifies two functions: map (in_key, in_value) -> I i st (out-key , intermediate_value ) ■ Processes input key/value pair ■ Produces set of intermediate pairs reduce (out-key, list (interimediate_value )) -> I ist ( out.value ) ■ Combines all intermediate values for a particular key ■ Produces a set of merged output values (usually just one) Inspired by similar primitives in LISP and other languages Pavel Rychly • MapReduce • April 16, 2024 3/32 Example: Count word occurrences map(String input-key, String input-value): // input_key: document name // input_value: document contents for each word w in input-value: Emitlntermediate (w, "1"); reduce( String output-key , Iterator interimediate_values): // output-key: a word // output-values : a list of counts int result = 0; for each v in intermediate-values : result += Parselnt(v); Emit (As St ring (result )); Pseudocode: See appendix in paper for real code Pavel Rychly • MapReduce • April 16, 2024 4/32 Model is Widely Applicable MapReduce Programs In Google Source Tree 1000 -I Mar May Jul Sep Nov Jan Mar May Jul Sep 2003 2004 Example uses: distributed grep distributed sort web link-graph reversal term-vector per host web access log stats inverted index construction document clustering machine learning statistical machine translation Pavel Rychly • MapReduce • April 16, 2024 5/32 Implementation Overview Typical cluster: ■ 100s/1000s of 2-CPU x86 machines, 2-4 GB of memory ■ Limited bisection bandwidth ■ Storage is on local IDE disks ■ GFS: distributed file system manages data (SOSP'03) ■ Job scheduling system: jobs made up of tasks, scheduler assigns tasks to machines Implementation is a C++ library linked into user programs Pavel Rychly • MapReduce • April 16, 2024 6/32 Execution Input t 7 Intermediate Group by Key 0 Grouped kl :v,v,v3v k2:v k3\\,\ k4:v,v,v k5:v R)U) 0 © kl:vkl:v k2:v kl :v k3:vk4:v k4:v k5\\ k4:v kl:vk3:v Output Pavel Rychly • MapReduce • April 16, 2024 7/32 Parallel Execution Map Task 1 "i r Sort and Group k2:v k4 :v,V|V k5:v <^ Reduce Task 1 Map Task 2 Sort and Group kl ;v,V|V,v k3 :v.v Reduce Task 2 1 1 ' 1 (i) (i) (*) (i) (i) (±) k 1 ;v kl ;v k2:v kl:v I | k3 :v k4 ;v k4 :v k5 :v 1 | k4:v kl:v k3;v Partitioning Function 1 1 Partitioning 5 Function 1 1 Partitioning Function Map Task 3 Pavel Rychly • MapReduce • April 16, 2024 8/32 Task Granularity And Pipelining Fine granularity tasks: many more map tasks than machines ■ Minimizes time for fault recovery ■ Can pipeline shuffling with map execution ■ Better dynamic load balancing Often use 200,000 map/5000 reduce tasks/ 2000 machines Process Time---------- > User Program MapRednce() ... wait... Master Assign tasks to worker machines... Worker 1 Map 1 Map 3 Worker 2 Map 2 Worker 3 Read 1.1 Read 1.3 Read 1.2 Reduce 1 Worker 4 Read 2.1 Read 2.2 Read 2.3 Reduce 2 Pavel Rychly • MapReduce • April 16, 2024 9/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fri Nov7 09:51:07 2003 - up 0 hr 00 min 18 sec 323 workers, 0 deaths Type Shards Done Active Input(MB) Done (ME) Output(ME) Map 13853 0 323 878934.6 13144 717.0 Shuffle 500 0 323 717.0 0.0 0.0 Reduce 500 0 0 0.0 0.0 0.0 100 - 90 30 "S 70 o -H 0. 60 C o u 50 = o 40 o t- c 30 20 10 4 Counters c 2 o O o Reduce Shard Variable Mimite Mapped (MB/s) 72.5 Shuffle (MB/s) 0,0 Output (MB/s) 0 0 doc-index-hits 145325686 docs-indexed 506631 dups-in-index-merge 0 rar-operator-calis 508192 mr-operator- 506631 Pavel Rychly • MapReduce • April 16, 2024 10/32 MapReduee status: MR Indexer-beta6-large-2003 10 28 00 03 Started: FnNov7 09:51:07 2003 - up 0 hr 05 min 07 sec 1707 workers; 1 deaths Counters Type Shards Done Active Liput(MB) Done(ME) Output(MB) Map 13853 1857 1707 B78934.6 191995.S 113936.6 Shuffle 500 0 500 113936 6 57113.7 57113.7 Reduce 500 0 0 57113.7 0.0 0.0 Reduce Shard Variable Minute Mapped (MB/s) 699.1 Shuffle (MB/s) 349.5 Output (MB/s) 0.0 doc-index-hits 5004411944 docs-indexed 17290135 dups-in-mdex-merge 0 mr-operator-calls 17331371 mr-operator-OUtDUtS 17290135 Pavel Rychly • MapReduee • April 16, 2024 11/32 MapReduce status: MR_Indexer-beta64aige-2003_10_28_00_03 Started: Fri Nov 7 09:51:07 2003 — up 0 hr 10 mm IS sec 1707 workers: 1 deaths Type Shards Done Active InputfME) Done (MB) Output(MB) Map 13853 5354 1707 878934 6 406020.1 241058.2 Shuffle 500 0 500 241058.2 196362.5 196362.5 Reduce 500 0 0 196362.5 0,0 0.0 100 90 30 S 70 +> B 60 c a 0 50 it 40 O L. V OA ■I- f j « Reduce Shard Counters Variable - Minute Mapped (MB/s) 704.4 Shuffle (MB/s) 371.9 Output 0.0 (MB/s) doc-index-hits 500036422B docs-indexed 17300709 dups-in- m d ex- 0 op erator- _ - 17342493 mr- operator- 17300709 outputs Pavel Rychly • MapReduce • April 16, 2024 12/32 8989999623^1^^58588888 MapReduce status: MR_Indexer-beta6-laxge-2003_10_28_00_03 Started: FriNov7 09:51:07 2003 - up 0 hr 15 min 31 sec 1707 workers, 1 deaths 1 Type Shards Done Active Liput(ME) Done (MB) Output(MB) Map 13S53 3841 1707 878934.6 621608.5 369459.8 Shuffle 500 0 500 369459.8 326986 8 326986.8 Reduce 500 0 0 326986.8 0.0 0.0 100 x D 1 60 I c 3 o 50 +> v 40 u L £ 30 20 10 CM Reduce 5hard Counters - Variable Minute Mapped (MB/s) 706.5 Shuffle (MB/s) 419.2 Output (MB/s) 0.0 doc-lndex-hits 4982870667 docs-mdexed 17229926 dups-in-ind ex-merge 0 mr-operator-calls 17272056 mr-operator-OUtDUtS 17229926 Pavel Rychly • MapReduce • April 16, 2024 13/32 89^1525486888192^^868^ MapReduee status: MR Indexer-beta6-large-200310 28 00 03 Started: Fri Nov 7 09:51:07 2003 - up 0 hr 29 nun 45 sec 1707 workers: 1 deaths Type Shards Done Active Input(ME) Done (ME) Output(ME) Map 13853 13853 0 878934 6 378934.6 523499.2 Shuffle 500 195 305 523499 2 523389.6 523389 6 Reduce 500 0 195 523389.6 2685 2 2742 6 o c ■z- j ■z-■z- o ■z Counters o ■z IT Reduce Shard -1- Variable Minute Mapped (MB/s) 0.3 Shuffle (MB/s) 0.5 Output (MB/s) 45,7 doc-index-hits 2313178 10, docs-indexed 7936 dups-in-index-merge 0 rnr-merge-calls 1954105 mr-rnerge-outputs 1954105 Pavel Rychly • MapReduee • April 16, 2024 14/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: FriNöv7 09:51:07 2003 up 0 hr 31 min 34 sec 1707 workers: 1 deaths 1ÜO w 30 U 70 *J V a. SO c □ Li 50 *J z u 40 J L U a. 3" 20 10 Type Shards Done Active - Input(ME) ■- Done(MB) Output(MB) Map 13353 13353 0 878934.6 878934.6 523499 2 Shuffle 500 500 0 523499.2 523499.5 523499.5 Reduce 500 0 500 523499 5 133837.8 136929.6 Reduce Shard Counters Variable Mapped (MB/s) Shuffle (MB/s) Output (MB/b) doc- index-hits docs-indexed dups-in- mdex- merge mr-merge-c alls o mr-^ merge -outputs Minute 0.0 0.1 1238.3 0 51738599 51738599 Pavel Rychly • MapReduce • April 16, 2024 15/32 Map Reduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: FriNov7 09:51:07 2003 -- up 0 hr 33 min 22 sec 1707 workers: 1 deaths Type Shards Done Active Input(MB) Dime (ME) Output(ME) Map 13353 13853 0 £78934.6 373934 6" 523499.2 Shuffle 500 500 0 523499.2 523499 5 523499.5 Reduce 500 0 500 523499.5 263233.3 269351.2 u *J u C 0 u *J z u J L u L □ O o Reduce Shard Counters ■z LT Variable (MB/s) Shuffle (MB/s) Output (MB/s) doc-index-hits docs-indexed dups-in- index- merge mr- merge- calls inr- merge- outputs lYIinute 0.0 0.0 1225.1 0 51842100 51842100 Pavel Rychly • MapReduce • April 16, 2024 16/32 ^^0^56568887 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fri Nov 7 09:51:07 2003 - up 0 hr 35 min 08 sec 1707 workers, 1 deaths 1 Type Shards Done Active Input(MB) Done(MB) Output(MB) Map 13353 13353 0 873934.6 373934.6 523499.2 Shuttle 500 500 0 523499.2 523499.5 523499.5 Reduce 500 0 500 523499.5 390447.6 399457.2 '"-J fl Reduce Shard Counters Variable Mapped (MB/s) Shuffle (MB/s) Output (MB/s) doc- index-hits docs-indexed dups-in-index-merge merge-calls merge-outouts Minute 00 0.0 1222.0 0 51640600 51640600 Pavel Rychly • MapReduce • April 16, 2024 17/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fn Nov 7 09:51:07 2003 - up 0 hr 37 min 01 sec 1707 workers, 1 deaths Type Shards Done Active Input(MB) Done(MB) Output(MB) Map 13853 13853 0 87S934.6 878934.6 523499.2 Shuffle 500 500 0 523499.2 520468.6 520468.6 Reduce 500 406 94 520468.6 512265.2 514373.3 cm n Reduce Shard Counters Variable Mapped (MB/s) Shuffle (MB/s) Output (MB/s) doc- index-hits docs-indexed dups-in- index- merge mr- ir.rt "£f - calls mr- merge- outouts TMinute 0.0 0.0 849.5 0 0 35083350 35083350 Pavel Rychly • MapReduce • April 16, 2024 18/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fri Nov 7 09:51:07 2003 -- up 0 hr 33 min 56 sec 1707 workers: 1 deaths Type Shards Done Active Diput(MB) Done (MB) Output(MB) Map 13353 1335? 0 878934.6 373934.6 523499.2 Shuffle 500 500 0 523499.2 519731.8 519731.S Reduce 500 498 2 519781.8 519394.7 519440 7 ■z-'.■J -1- Counters o G L. Reduce Shard Variable Minute Mapped 0.0 Shuffle (MB/s) 0.0 Output (MB/s) 9.4 doc-index-hits 0 docs-indexed 0 ■ dups-in-index-merge 0 mr-rnerge-calls 394792 * rnr-rnerge-OlltDlltS 394792 - Pavel Rychly • MapReduce • April 16, 2024 19/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: Fn Nov 7 09:51:07 2003 - up 0 hr 40 min 43 sec 1707 workers, 1 deaths 1 Type Shards Done Active Input(MB) Done(MB) Output(MB) Map 13353 13353 0 873934.6 373934.6 523499.2 Shuttle 500 500 0 523499.2 519774 3 519774.3 Reduce 500 499 1 519774.3 519735.2 519764.0 -3 31 31 3 U -> = u _ 31 a. '"-J fl Reduce Shard Counters Variable Minute Mapped (MB/s) 0.0 Shuffle (MB/s) 0.0 Output |(MB/s) 1.9 doc-index-hits 0 1051 docs-indexed 0 ■ dups-in-mdex-merge 0 mr-merge-calls 73442 mr-merge-outouts 73442 - Pavel Rychlý • MapReduce • April 16, 2024 20/32 ^0999998^^^545514 Fault tolerance: Handled via re-execution ■ On worker failure: ■ Detect failure via periodic heartbeats ■ Re-execute completed and in-progress map tasks ■ Re-execute in progress reduce tasks ■ Task completion committed through master ■ Master failure: ■ Could handle, but don't yet (master failure unlikely) Robust: lost 1600 of 1800 machines once, but finished fine Semantics in presence of failures: see paper Pavel Rychlý • MapReduce • April 16, 2024 21 /32 Refinement: Redundant Execution Slow workers significantly lengthen completion time ■ Other jobs consuming resources on machine ■ Bad disks with soft errors transfer data very slowly ■ Weird things: processor caches disabled (!!) Solution: Near end of phase, spawn backup copies of tasks ■ Whichever one finishes first "wins" Effect: Dramatically shortens job completion time Pavel Rychly • MapReduce • April 16, 2024 22/32 Refinement: Locality Optimization Master scheduling policy: ■ Asks GFS for locations of replicas of input file blocks ■ Map tasks typically split into 64MB (== GFS block size) ■ Map tasks scheduled so GFS input block replica are on same machine or same rack Effect: Thousands of machines read input at local disk speed ■ Without this, rack switches limit read rate Pavel Rychly • MapReduce • April 16, 2024 23/32 Refinement: Skipping Bad Records Map/Reduce functions sometimes fail for particular inputs ■ Best solution is to debug & fix, but not always possible ■ On seg fault: ■ Send UDP packet to master from signal handler ■ Include sequence number of record being processed ■ If master sees two failures for same record: ■ Next worker is told to skip the record Effect: Can work around bugs in third-party libraries Pavel Rychly • MapReduce • April 16, 2024 24/32 Other Refinements (see paper) ■ Sorting guarantees within each reduce partition ■ Compression of intermediate data ■ Combiner: useful for saving network bandwidth ■ Local execution for debugging/testing ■ User-defined counters Pavel Rychly • MapReduce • April 16, 2024 25/32 Performance Tests run on cluster of 1800 machines: ■ 4 GB of memory ■ Dual-processor 2 GHz Xeons with Hyperthreading ■ Dual 160 GB IDE disks ■ Gigabit Ethernet per machine ■ Bisection bandwidth approximately 100 Gbps Two benchmarks: MFLGrep Scan 1010 100-byte records to extract records matching a rare pattern (92K matching records) MFLSort Sort 1010 100-byte records (modeled after TeraSort benchmark) Pavel Rychly • MapReduce • April 16, 2024 26/32 MR Grep Locality optimization helps: ■ 1800 machines read 1 TB of data at peak of « 31 GB/s ■ Without this, rack switches would limit to 10 GB/s Startup overhead is significant for short jobs Pavel Rychly • MapReduce • April 16, 2024 27/32 MRSort Backup tasks reduce job completion time significantly System deals well with failures Normal No backup tasks 200 processes killed 20000 -, in CD 10000 - 4-> - Done: 839 s 1-1-r 0 200 400 600 80 ) 10001200 to v. CD Ü-Ü- CO to ÜJ 20000 10000 - 0 200 400 600 20000 i 80 J 10000 - 3 EL 3 o t i r 10001200 l—I—I—r—I—r 0 200 400 600 800 10001200 Seconds 20000 -, 10000 - Done; 1235 1-1-1-1-1-r 0 200 400 600 800 1000 12(0 20000 10000 - 0 r~" I—l—n—I—r 0 200 400 600 800 100012 10000 - l—I—I—I—t—r 0 200 400 600 800 10001200 Seconds 20000 -, 10000 - Done: 886 s 4 Tyn—I-r 0 200 400 600 800 20000 10000 - 0 i I r 0 200 400 600 800 20000 -> 10000 - "~I-r 10001200 ^ r 10001200 I I I I I r 0 200 400 600 800 10001200 Seconds Pavel Rychly • MapReduce • April 16, 2024 28/32 Experience: Rewrite of Production Indexing System Rewrote Google's production indexing system using MapReduce ■ Set of +9, 44, 47, 24, 24 MapReduce operations ■ New code is simpler, easier to understand ■ MapReduce takes care of failures, slow machines ■ Easy to make indexing faster by adding more machines Pavel Rychly • MapReduce • April 16, 2024 29/32 Usage: MapReduce jobs run in August 2004 Number of jobs 29,423 Average job completion time 634 sees Machine days used 79,186 days Input data read 3,288 TB Intermediate data produced 758 TB Output data written 193 TB Average worker machines per job 157 Average worker deaths per job 1.2 Average map tasks per job 3,351 Average reduce tasks per job 55 Unique map implementations 395 Unique reduce implementations 269 Unique map/reduce combinations 426 Pavel Rychly • MapReduce • April 16, 2024 30/32 Related Work ■ Programming model inspired by functional language primitives ■ Partitioning/shuffling similar to many large-scale sorting systems ■ NOW-Sort ['97] ■ Re-execution for fault tolerance ■ BAD-FS ['04] and TACC ['97] ■ Locality optimization has parallels with Active Disks/Diamond work ■ Active Disks ['01], Diamond ['04] ■ Backup tasks similar to Eager Scheduling in Charlotte system ■ Charlotte ['96] ■ Dynamic load balancing solves similar problem as River's distributed queues ■ River ['99] Pavel Rychlý • MapReduce • April 16, 2024 31 /32 Conclusions ■ MapReduce has proven to be a useful abstraction ■ Greatly simplifies large-scale computations at Google ■ Fun to use: focus on problem, let library deal w/ messy details Thanks to Josh Levenberg, who has made many significant improvements and to everyone else at Google who has used and helped to improve MapReduce. Pavel Rychly • MapReduce • April 16, 2024 32/32