Advanced Search Techniques for Large Scale Data Analytics Pavel Zezula and Jan Sedmidubsky Masaryk University Memory Disk CPU Machine Learning, Statistics "Classical" Data Mining ¡20+ billion web pages x 20KB = 400+ TB ¡1 computer reads 30-35 MB/sec from disk §~4 months to read the web ¡~1,000 hard drives to store the web ¡Takes even more to do something useful with the data! ¡Today, a standard architecture for such problems is emerging: §Cluster of commodity Linux nodes §Commodity network (ethernet) to connect them Mem Disk CPU Mem Disk CPU … Switch Each rack contains 16-64 nodes Mem Disk CPU Mem Disk CPU … Switch Switch 1 Gbps between any pair of nodes in a rack 2-10 Gbps backbone between racks In 2011 it was guestimated that Google had 1M machines, ¡Large-scale computing for data mining problems on commodity hardware ¡Challenges: §How do you distribute computation? §How can we make it easy to write distributed programs? §Machines fail: §One server may stay up 3 years (1,000 days) §If you have 1,000 servers, expect to loose 1/day §People estimated Google had ~1M machines in 2011 §1,000 machines fail every day! ¡Issue: Copying data over a network takes time ¡Idea: §Bring computation close to the data §Store files multiple times for reliability ¡Map-reduce addresses these problems §Google's computational/data manipulation model §Elegant way to work with big data §Storage Infrastructure – File system §Google: GFS. Hadoop: HDFS §Programming model §Map-Reduce ¡Problem: §If nodes fail, how to store data persistently? ¡Answer: §Distributed File System: §Provides global file namespace §Google GFS; Hadoop HDFS; ¡Typical usage pattern §Huge files (100s of GB to TB) §Data is rarely updated in place §Reads and appends are common ¡Chunk servers §File is split into contiguous chunks §Typically each chunk is 16-64MB §Each chunk replicated (usually 2x or 3x) §Try to keep replicas in different racks ¡Master node §a.k.a. Name Node in Hadoop's HDFS §Stores metadata about where files are stored §Might be replicated ¡Client library for file access §Talks to master to find chunk servers §Connects directly to chunk servers to access data ¡Reliable distributed file system ¡Data kept in "chunks" spread across machines ¡Each chunk replicated on different machines §Seamless recovery from disk or machine failure C0 C1 C2 C5 D1 C5 C1 C3 C5 … C2 Bring computation directly to the data! C0 C5 C2 Chunk servers also serve as compute servers ¡Case 1: §File too large for memory, but all pairs fit in memory ¡Case 2: ¡Count occurrences of words: §words(doc.txt) | sort | uniq -c §where words takes a file and outputs the words in it, one per a line ¡Case 2 captures the essence of MapReduce §Great thing is that it is naturally parallelizable ¡Sequentially read a lot of data ¡Map: §Extract something you care about ¡Group by key: Sort and Shuffle ¡Reduce: §Aggregate, summarize, filter or transform ¡Write the result Outline stays the same, Map and Reduce change to fit the problem v k k v k v map v k v k … k v map Input key-value pairs Intermediate key-value pairs … k v k v … k v k v k v Intermediate key-value pairs Group by key reduce reduce k v k v k v … k v … k v k v v v v Key-value groups Output key-value pairs ¡Input: a set of key-value pairs ¡Programmer specifies two methods: §Map(k, v) ® * §Takes a key-value pair and outputs a set of key-value pairs §E.g., key is the filename, value is a single line in the file §There is one Map call for every (k,v) pair §Reduce(k', *) ® * §All values v' with same key k' are reduced together and processed in v' order §There is one Reduce function call per unique key k' The crew of the space shuttle Endeavor recently returned to Earth as ambassadors, harbingers of a new era of space exploration. Scientists at NASA are saying that the recent assembly of the Dextre bot is the first step in a long-term space-based man/mache partnership. '"The work we're doing now -- the robotics we're doing -- is what we're going to need …………………….. Big document (The, 1) (crew, 1) (of, 1) (the, 1) (space, 1) (shuttle, 1) (Endeavor, 1) (recently, 1) …. (crew, 1) (crew, 1) (space, 1) (the, 1) (the, 1) (the, 1) (shuttle, 1) (recently, 1) … (crew, 2) (space, 1) (the, 3) (shuttle, 1) (recently, 1) … MAP: Read input and produces a set of key-value pairs Group by key: Collect all pairs with same key Reduce: Collect all values belonging to the key and output (key, value) Provided by the programmer Provided by the programmer (key, value) (key, value) ¡map(key, value): ¡// key: document name; value: text of the document ¡ for each word w in value: ¡ emit(w, 1) ¡ reduce(key, values): // key: a word; value: an iterator over counts result = 0 for each count v in values: result += v emit(key, result) index-auto-0007-0001 Big document MAP: Read input and produces a set of key-value pairs Group by key: Collect all pairs with same key (Hash merge, Shuffle, Sort, Partition) Reduce: Collect all values belonging to the key and output All phases are distributed with many tasks doing the work Input 0 Map 0 Input 1 Map 1 Input 2 Map 2 Reduce 0 Reduce 1 Out 0 Out 1 Shuffle ¡Map worker failure §Map tasks completed or in-progress at worker are reset to idle §Reduce workers are notified when task is rescheduled on another worker ¡Reduce worker failure §Only in-progress tasks are reset to idle §Reduce task is restarted ¡Master failure §MapReduce task is aborted and client is notified ¡M map tasks, R reduce tasks ¡Rule of a thumb: §Make M much larger than the number of nodes in the cluster §One DFS chunk per map is common §Improves dynamic load balancing and speeds up recovery from worker failures ¡Usually R is smaller than M §Because output is spread across R files ¡Fine granularity tasks: map tasks >> machines §Minimizes time for fault recovery §Can do pipeline shuffling with map execution §Better dynamic load balancing index-auto-0009-0001 ¡Problem §Slow workers significantly lengthen the job completion time: §Other jobs on the machine §Bad disks §Weird things ¡Solution §Near end of phase, spawn backup copies of tasks §Whichever one finishes first "wins" ¡Effect §Dramatically shortens job completion time ¡Often a Map task will produce many pairs of the form (k,v1), (k,v2), … for the same key k §E.g., popular words in the word count example ¡Can save network time by pre-aggregating values in the mapper: §combine(k, list(v1)) à v2 §Combiner is usually same as the reduce function ¡Works only if reduce function is commutative and associative ¡Back to our word counting example: §Combiner combines the values of all keys of a single mapper (single machine): §Much less data needs to be copied and shuffled! /hadoop-F03_reference.jpg ¡Compute the natural join R(A,B) ⋈ S(B,C) ¡R and S are each stored in files ¡Tuples are pairs (a,b) or (b,c) A B a1 b1 a2 b1 a3 b2 a4 b3 B C b2 c1 b2 c2 b3 c3 ⋈ A C a3 c1 a3 c2 a4 c3 = R S ¡In MapReduce we quantify the cost of an algorithm using 1.Communication cost = total I/O of all processes 2.Elapsed communication cost = max of I/O along any path 3.(Elapsed) computation cost analogous, but count only running time of processes §Note that here the big-O notation is not the most useful (adding more machines is always an option) ¡For a map-reduce algorithm: §Communication cost = input file size + 2 ´ (sum of the sizes of all files passed from Map processes to Reduce processes) + the sum of the output sizes of the Reduce processes. §Elapsed communication cost is the sum of the largest input + output for any map process, plus the same for any reduce process ¡Either the I/O (communication) or processing (computation) cost dominates §Ignore one or the other ¡Total cost tells what you pay in rent from your friendly neighborhood cloud ¡Elapsed cost is wall-clock time using parallelism ¡Total communication cost = O(|R|+|S|+|R ⋈ S|) ¡Elapsed communication cost = O(s) §We're going to pick k and the number of Map processes so that the I/O limit s is respected §We put a limit s on the amount of input or output that any one process can have. s could be: §What fits in main memory §What fits on local disk ¡With proper indexes, computation cost is linear in the input + output size §So computation cost is like comm. cost ¡Google §Not available outside Google ¡Hadoop §An open-source implementation in Java §Uses HDFS for stable storage §Download: ¡Aster Data §Cluster-optimized SQL Database that also implements MapReduce ¡Hadoop Wiki § Introduction § § Getting Started § § Map/Reduce Overview § § § Eclipse Environment § ¡ Javadoc § ¡ Releases from Apache download mirrors § ¡ Nightly builds of source § ¡ Source code from subversion §