Centrum Wiskunde & Informatica
Map/Reduce on the Enron datasetWe are going to use EMR on the Enron email dataset: dataset contains 1,227,255 eMails from Enron employees. The version we use consists of 50 GB of compressed files. Consider the following scenario: Sept 9, 2001 (really), the The New York Times ran an article titled "MARKET WATCH; A Self-Inflicted Wound Aggravates Angst Over Enron" (). Someone (your boss?) wants to find out who frequently talked to the press in the days before. You are handed a dump of the email server. Technically, this task consists of the following steps:Put the dataset into S3 (the ugly part, already done for you)Extract the date/sender/recipient from the email data (this is what is described in detail below)Filter the data to only consider emails between 2001-09-05 and 2001-09-08. only consider messages going from ENRON employees to someone not part of the organizationCount the number of foreign interactions and only include accounts that have more than one outside contact that week.To achieve this, you need to create a set of MapReduce jobs. We are going to implement those in Python, using the Hadoop streaming feature also available in EMR. you are new to Python, check out is an example of using Python and Hadoop Streaming on EMR (not all details relevant here).In Hadoop Streaming, Python MapReduce programs are given a part of the input data on the standard system input (stdin) and are expected to write tab-separated tables on the standard output (stdout). Here is a working skeleton for a map or reduce function:#!/usr/bin/env pythonimport sysfor line in sys.stdin:line = line.strip().split('\t')# do something with line[0], line[1] etc.print "Some_Key\tSome Payload"The reducer counterpart starts very similar, but has one important difference: All the values with the same Key from the mapper will follow each other, which allows them to be combined.First, let's start a (small) cluster. Log into AWS at are going to start with a small cluster. This time, the simple configuration is fine, and most of the defaults can stay the way they are.For hardware configuration, we are going to start with a 2-node cluster of m1.large instancesWait until your cluster has started ("Waiting"). While you are waiting, we need to create a S3 bucket for the output of our analysis.Click "Create Bucket"Select the "US Standard" region and a name for your bucket. This name needs to be globally unique for S3. Then click "Create".Back to EMR:First, we are going to run the data transformation on a small part of the dataset. On your (now hopefully ready soon) cluster page, select "Steps", then "Add Step".Select Step Type "Streaming Program" and give it a name. Further, set Mapper tos3://enron-scripts/enron-etl.pyReducer tocatInput S3 Location tos3://enron-scripts/enron-urls-small.txtAnd output S3 location tos3://enron-results/t1(replace enron-results with the S3 bucket name you just created.) Then click "Add".You will see the MapReduce job starting, going from "Pending", to "Running" and then hopefully to "Completed". If something goes wrong, inspect the log files! If all went well, it is time to inspect the results in S3.Right-click the file “part-00000” and download it to inspect its contents. You will find three columns, separated by tab (\t) character, containing (1) a timestamp, (2) a sender email address, and (3) a recipient email address, respectively. In other words, the “enron-etl.py” you just ran extracted from the raw data exactly the information required for the analysis described above, i.e., for your task.In fact, AWS/EMR/Hadoop might choose to use more than one reducer (check the “View jobs” and “View tasks as described below for details), and then the result will be distributed over more than one file. In my latest test, AWS/EMR/Hadoop used two reducers for my job, resulting in two files, i.e., “part-00000” & “part-00001”.This file (or these files) will be the input for your next MapReduce job as described above.(Tip: If you specify as “Input S3 Location” to not a file (as “s3://enron-scripts/enron-urls-small.txt” in the above example) but a directory (folder), e.g., the “s3://enron-results/t1” result folder you used above, AWS/EMR/Hadoop will automatically iterate of all files in the directory (folder), i.e., you do not need to concatenate them yourself in any way.)Create a mapper.py and a reducer.py script, upload them to your S3 bucket, point to them in the Step "Streaming" step creation and run them.See the skeleton further up for an example. The Mapper is expected to output a key and values separated by a tab (\t) character. As mentioned in the slides, the Mapper typically filters records and outputs them with the common key, and the reducers read the files with the common key and output an aggregation. Here are examples for Hadoop Streaming Mappers and Reducers doing Wordcount (text files are available at and ):Mapper#!/usr/bin/env pythonimport sysfor line in sys.stdin:words = line.split(' ')for word in words:print word.strip()+ "\t1"Reducer#!/usr/bin/env pythonimport syscurrent_count = 0current_word = ""for line in sys.stdin:line = line.strip().split('\t')if len(line) != 2:continuekey = line[0]if (key != current_word):if (current_count > 0):print current_word + '\t' + str(current_count)current_count = 0current_word = keycurrent_count += 1if (current_count > 0):print current_word + '\t' + str(current_count)If anything goes wrong (which is likely in the beginning), you should inspect the log files provided for the EMR Step. It could take a few minutes for them to appear in the Web interface. Also check the logs for failing tasks! Finally, make sure each job's output directory does not exist yet in S3, otherwise the job will fail.Larger DatasetTo run the ETL (and your subsequent job) on the larger dataset, create a step as follows:Select Step Type "Custom JAR" and give it a name. Set JAR location tocommand-runner.jarSet Arguments tohadoop-streaming -Dmapred.map.tasks=100 -files s3://enron-scripts/enron-etl.py -mapper enron-etl.py -reducer cat -input s3://enron-scripts/enron-urls.txt -output s3://enron-results/f1(replace enron-results with the S3 bucket name you just created.)Note: This is a normal Hadoop streaming job, too, but for complicated reasons we need to set a custom MapReduce parameter.Then click "Add".After the Step has started, inspect its Mapper tasks:Scroll down to inspect the large number of Mapper tasks. In the current state, your cluster will take a long time to finish all those. But since this is the cloud, we can simply request more firepower: On your cluster details page, select "Resize"Increase the "Core Instance Group" to a count of 5 like so:Once the additional nodes are available, the Step will process much faster. After it has been completed, run your MapReduce job on the larger results.Once finished, again make sure to shut down your EMR cluster!ETL Script for reference (plain text file available at ):#!/usr/bin/env python# this turns enron email archive into tuples (date, from, to)import sysimport zipfile import tempfileimport emailimport timeimport datetimeimport osimport urllib# stdin is list of URLs to data filesfor u in sys.stdin:u = u.strip()if not u:continuetmpf = tempfile.mkstemp()urllib.urlretrieve(u, tmpf[1])try:zip = zipfile.ZipFile(tmpf[1], 'r')except:continuetxtf = [i for i in list() if i.filename.endswith('.txt')]for f in txtf:msg = email.message_from_file(zip.open(f))tostr = msg.get("To")fromstr = msg.get("From")datestr = msg.get("Date")if (tostr is None or fromstr is None or datestr is None):continuetoaddrs = [email.utils.parseaddr(a) for a in tostr.split(',')]fromaddr = email.utils.parseaddr(fromstr)[1].replace('\'','').strip().lower()try: # datetime hell, convert custom time zone stuff to UTCdt = datetime.datetime.strptime(datestr[:25].strip(), '%a, %d %b %Y %H:%M:%S')dt = dt + datetime.timedelta(hours = int(datestr[25:].strip()[:3]))except ValueError:continueif not '@' in fromaddr or '/' in fromaddr:continuefor a in toaddrs:if (not '@' in a[1] or '/' in a[1]):continueta = a[1].replace('\'','').strip().lower()print dt.isoformat() + 'Z\t' + fromaddr + '\t' + tazip.close()os.remove(tmpf[1]) ................
................
In order to avoid copyright disputes, this page is only a partial summary.
To fulfill the demand for quickly locating and searching documents.
It is intelligent file search solution for home and business.
Related searches
- corona radiata centrum semiovale
- right centrum semiovale infarct
- centrum semiovale ct
- centrum semiovale lesions symptoms
- centrum semiovale infarct symptoms
- left centrum semiovale symptoms
- left centrum semiovale
- informatica test data management pdf
- centrum semiovale stroke
- centrum semiovale lesions
- function of right centrum semiovale
- centrum semiovale on ct