MUNI FI MapReduce: Simplified Data Processing on Large Clusters PA154 Language Modeling (7.3) Pavel Rychlý pary@fi.muni.cz March 30, 2023 Source: Jeff Dean, Sanjay Ghemawat Google, Inc. December, 2004 https ://research.google/pubs/pub62/ Davel Rychlý ■ MapReduce ■ March 30, 2023 Motivation: Large Scale Data Processing Many tasks: Process lots of data to produce other data Want to use hundreds or thousands of CPUs ■ ... but this needs to be easy MapReduce provides: ■ Automatic parallelization and distribution ■ Fault-tolerance ■ I/O scheduling ■ Status and monitoring Davel Rychlý ■ MapReduce ■ March 30, 202a Programming model Input & Output: each a set of key/value pairs Programmer specifies two functions: map (in.key, in.value) -> I i s t (out_key , intermediate.value) ■ Processes input key/value pair ■ Produces set of intermediate pairs reduce (out.key, I ist (intermediate.value )) -> I ist ( out.value ) ■ Combines all intermediate values for a particular key ■ Produces a set of merged output values (usually just one) Inspired by similar primitives in LISP and other languages Davel Rychlý ■ MapReduce ■ March 30, 2023 3 / 32 Example: Count word occurrences map(String input-key, String input-value ): // input.value: document contents for each word w in input-value: Emitlntermediate (w, "1"); reduce ( String output.key , Iterator intermediate.values ): // output-key : a word // output-values: a list of counts int result = 0; for each v in intermediate.values : result += Parselnt(v); Emit(AsString (result)); Pseudocode: See appendix in paper for real code Davel Rychlý ■ MapReduce ■ March 30, 2023 4/ 32 Model is Widely Applicable MapReduce Programs In Google Source Tree Implementation Overview Example uses: distributed grep term-vector per host document clustering distributed sort web access log stats machine learning web link-graph reversal inverted index construction statistical machine translation Typical cluster: ■ 100s/1000s of 2-CPU x86 machines, 2-4 GB of memory ■ Limited bisection bandwidth ■ Storage is on local IDE disks ■ GFS: distributed file system manages data (SOSP'03) ■ Job scheduling system: jobs made up of tasks, scheduler assigns tasks to machines Implementation is a C++ library linked into user programs Davel Rychlý ■ MapReduce ■ March 30, 2023 5/32 Davel Rychlý ■ MapReduce ■ March 30, 2023 6/ 32 Task Granularity And Pipelining Fine granularity tasks: many more map tasks than machines ■ Minimizes time for fault recovery ■ Can pipeline shuffling with map execution ■ Better dynamic load balancing Often use 200,000 map/5000 reduce tasks/ 2000 machines Process w---------- -> User Program MapReduceQ ... wait... Master ,1'JM tasks to worker machines.. Worker 1 Map 1 Map 3 Worker 2 Map 2 Worker 3 Read l.l| Read 1.3 Read 1.2 Reduce 1 Worker 4 Read 2.1 Read 2.2 Read 2.3 Reduce 2 Davel Rychly ■ MapReduce ■ March 30, 2022 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started Fri Nov? 09:51:07 2003 -- up 0 hr 00 n 323 workers, 0 deaths Type Shards Don, Active Input.;MK) DonefMK) UutputfMH) Mje 13S53 (i 323 878934 6 1314 4 717 0 Shuffle 500 Ü 323 717 r o n o n Pedur, 500 '■' 0 0 0 0.0 0.0 I- £ 30 Reduce Slum) Variable Minute Ma-cped (mb;s) 72.5 Shuffle CMB/s) 0.0 Output (MB/s) 0.0 doc-index-hits 145825686 docs- 506631 dups-in-index-merge 0 operator-calls 508192 operator- 506631 Davel Rychly ■ MapReduce ■ March 30, 2023 MapReduce stains: MR_Indexer-beta6-large-2003_10_2S_00_03 Started: Fn Nov 7 09 51:07 2003 ~ up 0 hr 05 n 1707 workers; 1 dea±s Type Shards Done Active LiputjMB) Done (MB) Output(MB) Map 13853 1857 1707 878934 6 191995. 8 113936.6 Shuffle 500 0 500 113936.6 57113.7 57113.7 Reduce 500 0 0 57113.7 0.0 0.0 % S bo Variable Minute Mapp ?■:! (MBA) 699 1 Shuffle (MB/0 349 5 Output (MB/;) 0.0 ill'lSi-liltS 5004411944 iadewed 17290135 dup;-in-index-meige 0 opp'atiir-calls 17331371 operator- 17290135 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started: FriNov7 09:51:07 2003 - up 0 hr 10 rr 1707 workers; 1 deaths Type Shards Done Active Input(ME) Done (MB) OutputfMB) Map 13353 5354 1707 :E7393- 6 406020.1 241053.2 Shuffle 500 0 500 24105S.2 196362.5 196362 5 Rc..;ui..e 500 0 0 196362.5 0.0 0.0 ■8 70 II operator- 17300709 I outputs Davel Rychly ■ MapReduce ■ March 30, 2023 11/32 Davel Rychly ■ MapReduce ■ March 30, 2023 12/32 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Started Fn Now 7 09:51:07 2003 - up 0 hr 15 rr 1707 wjikcis, 1 deaths Type Shards Done Active Input(MB) Done (MB) Output(MB) Map 13853 8841 1707 878934.6 6216085 369459 S Shuffle 500 0 500 369459. S 326986 8 326986 8 Reduce 500 0 0 326986.8 0.0 0.0 (MB/s) Shuftk (MB/s) Reduce Shard Davel Rychly ■ MapReduce ■ March 30, 2022 MapReduce status: MR_Indexer-beta6-large-2003 10 28 00 03 Started: FriNov7 09:51:07 2003 --up Ohr 31 mm34 sec 1707 worker;: 1 deaths Type Shards Done Active biputfMB) Done(ME) Output(ME) Map 13553 13c 53 0| 873934.6 378934.6 523499.2 ShufQe 500 500 0| 523499.2 523499.5 523499.5 :-:.c :i-.n:c 500 0 500| 523499 5 .33837.8 136929.6 Reduce Shard Counters Variable Mapped (MB/s) Shuffls (MB/s) I Output I (MB/s) indexed dups-in-index-raerge ■ J merge- 5 Davel Rychly ■ MapReduce ■ March 30, 2023 MapReduce status: MR Indexer-betao-large-2003 10 28 00 03 Started: Fn Nov7 09:51:07 2003 - up 0 hr 29 mm 45 ;ec 1707 workers; 1 deaths Type Shards Done A.tive Input(MB) Done(ME) Output(ME) Mac 13353 13S53 0 278934.6 373934(5 523499.2 Shuffl; 500 195 3C5 523493 2 5233396 5233396 i.e 5.n: e 500 0 -.95 523389.6 2685.2 2742.6 Reduce Shard Mapped (MB/s) Shuffle (MB/s) merge -outputs 2313178 7936 Davel Rychly ■ MapReduce ■ March 30, 2023 MapReduce status: MR_Indexer-beta64arge-2003_10_28_00_03 Started: FnKov7 09:51:07 2003 - up 0 hr 33 mm 22 sec 1707 workers; 1 deaths Type Shards Done Active Iiiput(MB) Done (MB) Output(ME) Map 13353 13353 0 378934.6 373934 6 523499 2 ShufQe 500 500 0 523499.2| 523499.5 523499.5 Reduce 500 500 523499.5 263283.3 269351.2 Reduce Shard Mapped A) Variable Minute O.J CO 31i.±£:-(MB/s) Output (MB/s) _ 1 1 de It-lilts indexed chips-in-index-merge merge- 51342100 merge- 51342100 outputs Davel Rychly ■ MapReduce ■ March 30, 2023 MapReduce status: MR_Tndexer-beta6-1aTge-2003 10 28 00 03 Started Fn Nov 7 09.51.07 2003 -- up 0 hr 35 rr 1707 workers; 1 deaths Type Shards Done 1 ctive Input(ME) Done(ME) Output(ME) Map 13853 13853 0 878934 6 878934.6 523499.2 Shuffle 500 500 oj 523499.2 523499.5 523499.5 Reduce 5ÖÖ| Öf Llapr i.'.: (MB/s) Shuffle (MB/s) MapReduce status: MR_Tudexer-beta6-laTge-2003_10 28 00 03 Started. FriKov 7 09.51:07 2003 - up 0 Irr 37 min. 01 sec 1707 workers; 1 deaths Ts-pe Shards Done Active Input(MB) Done (MB Output(MB) Map 13853 13853 0 378934.6 378934. 523499.2 Shuffle 500 500 0 523499.2j 520468. 520468.6 Reduce 500 406 94 52046s.6j 512265. 514373.3 ioc n Davel Rychly ■ MapReduce ■ March 30, 2023 17/32 Davel Rychly ■ MapReduce ■ March 30, 2023 18/32 ^5455^5482828793932379 89^2525482^8575857018^ MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 MapReduce status: MR_Indexer-beta6-large-2003_10_28_00_03 Staned: Fri Nov 7 09:51:07 2003 - up 0 hr 38 a 1707 workers, 1 deaths Started: FriNov7 09:51:07 2003 - up 0 hr 40 mm 43 s 1707 wtiikeiii, 1 deallis Type Shards Done Active biputiMB) DoneiMB) Output(MB) Mep 13S53 13353 0 878934.6 S7S934.6 523499.2 Shuffle 500 50 J 0 523499.2 519721. S 519721.8 ri'.c :1ml c 500 492 2 519731.3 519394.7 519440 7 Yah able Minute Mapped (MB/s) 0.0 Shuffle (MB/s) 0.0 Output (MB/s) 9.4 d,._-indcx-hits 0 1051 docs- 0 index- 0 mr-merge-calls 394792 outouts 394792 Type Shards Done Active Tjiput(ME) DonefMB) OutputfMB) Map 13853 13255 C 878934 6 378934 6 5234992 Shuffle 500 500 c 5234992 519774 3 519774 3 Reduce 500 499 1 519774.3 519735.2 519764.0 Reduce Shard Variable Minute Mapped (MB/s) 0.0 Shuffle (MB/s) 0.0 Output (MB/s) index-hits 0 105 0 dups-in-mdex-merge 0 merge -calls 73442 merge -outouts 73442 Davel Rychlý ■ MapReduce ■ March 30, 2022 Davel Rychlý ■ MapReduce ■ March 30, 2023 Fault tolerance: Handled via re-execution Refinement: Redundant Execution ■ On worker failure: ■ Detect failure via periodic heartbeats ■ Re-execute completed and in-progress map tasks ■ Re-execute in progress reduce tasks ■ Task completion committed through master ■ Master failure: ■ Could handle, but don't yet (master failure unlikely) Robust: lost 1600 of 1800 machines once, but finished fine Semantics in presence of failures: see paper Slow workers significantly lengthen completion time ■ Other jobs consuming resources on machine ■ Bad disks with soft errors transfer data very slowly ■ Weird things: processor caches disabled (I!) Solution: Near end of phase, spawn backup copies of tasks ■ Whichever one finishes first "wins" Effect: Dramatically shortens job completion time Davel Rychlý ■ MapReduce ■ March 30, 202a Davel Rychlý ■ MapReduce ■ March 30, 2023 Refinement: Locality Optimization Refinement: Skipping Bad Records Master scheduling policy: ■ Asks GFS for locations of replicas of input file blocks ■ Map tasks typically split into 64MB (== GFS block size) ■ Map tasks scheduled so GFS input block replica are on same machine or same rack Effect: Thousands of machines read input at local disk speed ■ Without this, rack switches limit read rate Map/Reduce functions sometimes fail for particular inputs ■ Best solution is to debug & fix, but not always possible ■ On seg fault: ■ Send UDP packet to master from signal handler ■ Include sequence number of record being processed ■ If master sees two failures for same record: ■ Next worker is told to skip the record Effect: Can work around bugs in third-party libraries Davel Rychlý ■ MapReduce ■ March 30, 2023 23/32 Davel Rychlý ■ MapReduce ■ March 30, 2023 24/32 Other Refinements (see paper) Sorting guarantees within each reduce partition Compression of intermediate data Combiner: useful for saving network bandwidth Local execution for debugging/testing User-defined counters Performance Tests run on cluster of 1800 machines: ■ 4 GB of memory ■ Dual-processor 2 GHz Xeons with Hyperthreading ■ Dual 160 GB IDE disks ■ Gigabit Ethernet per machine ■ Bisection bandwidth approximately 100 Gbps Two benchmarks: MR.Grep Scan 1010 100-byte records to extract records matching a rare pattern (92K matching records) MFLSort Sort1010 100-byte records (modeled after TeraSort benchmark) Davel Rychly ■ MapReduce ■ March 30, 202a Davel Rychly ■ MapReduce ■ March 30, 2023 MR_Grep Locality optimization helps: ■ 1800 machines read 1 TB of data at peak of ~ 31 GB/s ■ Without this, rack switches would limit to 10 GB/s Startup overhead is significant for short jobs MR_Sort ■ Backup tasks reduce job completion time significantly ■ System deals well with failures 2o:i:ü 1 10000 - A, /s 200 processes killed 200i:i: - Davel Rychly ■ MapReduce ■ March 30, 2023 Davel Rychly ■ MapReduce ■ March 30, 2023 Experience: Rewrite of Production Indexing System Rewrote Google's production indexing system using MapReduce ■ Set of +0, 44, 47-, 24, 24 MapReduce operations ■ New code is simpler, easier to understand ■ MapReduce takes care of failures, slow machines ■ Easy to make indexing faster by adding more machines Usage: MapReduce jobs run in August 2004 Number of jobs 29,423 Average job completion time 634 sees Machine days used 79,186 days Input data read 3,288 TB Intermediate data produced 758 TB Output data written 193 TB Average worker machines per job 157 Average worker deaths per job 1.2 Average map tasks per job 3,351 Average reduce tasks per job 55 Unique map implementations 395 Unique reduce implementations 269 Unique map/reduce combinations 426 Davel Rychly ■ MapReduce ■ March 30, 2023 29/32 Davel Rychly ■ MapReduce ■ March 30, 2023 30/32 Related Work Conclusions Programming model inspired by functional language primitives Partitioning/shuffling similar to many large-scale sorting systems ■ NOW-Sort ['97] Re-execution for fault tolerance ■ BAD-FS ['04] and TACC ['97] Locality optimization has parallels with Active Disks/Diamond work ■ Active Disks ['01], Diamond ['04] Backup tasks similar to Eager Scheduling in Charlotte system ■ Charlotte ['96] Dynamic load balancing solves similar problem as River's distributed queues ■ River ['99] MapReduce has proven to be a useful abstraction Greatly simplifies large-scale computations at Google Fun to use: focus on problem, let library deal w/ messy details Thanks to Josh Levenberg, who has made many significant improvements and to everyone else at Google who has used and helped to improve MapReduce. Davel Rychlý ■ MapReduce ■ March 30, 202a 31 /32 Davel Rychlý ■ MapReduce ■ March 30, 2023 32/32