Summary for Big Data
18 Nov 2016Hadoop notes:
Create directory: $hadoop hdfs dfs -mkdir /user/new_directory
Copy local file to server: $hadoop hdfs dfs -put local_file /user/new_directory
Run ls: $hadoop hdfs dfs -ls
Copy server file to local: $hadoop hdfs dfs -get file_name /user/new_direcotry
Delete file: $hadoop hdfs dfs -rm file_name /usr/new_directory
Basically, it is similar as the linux system commands, but with “hadoop hdfs dfs -” at the beginning.
Pig Latin:
The Grunt shell provides an interactive shell to submit Pig commands or run pig scripts. To run Pig script in local mode: pig -x local script.pig
To run in Hadoop mode: pig -x mapreduce script.pig
Mahout:
Mahout was specifically designed for serving as a recommendation engine, employing what is known as a collaborative filtering algorithm.
Apache Oozie:
As a workflow engine, Oozie enables you to run a set of Hadoop applications in a specified sequence known as a workflow.
Hive:
Running Hadoop WordCount in Python
The mapper code is shown below. It is stored in a file called mapper.py, and does not even contain a function. All it needs to do is receive data on its stdin input and output data on its stdout.
#!/usr/bin/env python
import sys
#--- get all lines from stdin ---
for line in sys.stdin:
#--- remove leading and trailing whitespace---
line = line.strip()
#--- split the line into words ---
words = line.split()
#--- output tuples [word, 1] in tab-delimited format---
for word in words:
print '%s\t%s' % (word, "1")
Typically the reducer gets the tuples generated by the mapper, after the shuffle and sort phases. The code is stored in a file called reducer.py.
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
continue
try:
word2count[word] = word2count[word]+count
except:
word2count[word] = count
# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
print '%s\t%s'% ( word, word2count[word] )
Running on the Hadoop Cluster
Let’s run the Python code on the Ulysses.txt file.
We’ll assume that the Python code is stored in ~hadoop/352/dft/python
We’ll assume that the streaming java library is in ~hadoop/contrib/streaming/streaming-0.19.2-streaming.jar
We’ll also assume that ulysses.txt is in dft and that we want the output in dft-output:
cd 352/dft/python
hadoop dfs -rmr dft1-output
hadoop jar /home/hadoop/hadoop/contrib/streaming/hadoop-0.19.2-streaming.jar -file ./mapper.py \ -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py -input dft -output dft-output
Spark:
Apache Spark is a relatively new data processing engine implemented in Scala and Java that can run on a cluster to process and analyze large amounts of data. Spark performance is particularly good if the cluster has sufficient main memory to hold the data being analyzed. Several sub-projects run on top of Spark and provide graph analysis (GraphX), Hive-based SQL engine (Shark), machine learning algorithms (MLlib) and realtime streaming (Spark streaming). Spark has also recently been promoted from incubator status to a new top-level project.
First we create a spark Resilient Distributed Dataset (RDD) containing each line from the files in the HDFS folder:
>>> lines = sc.textFile('hdfs://ubuntu1:54310/user/dev/gutenberg')
PySpark provides operations on RDDs to apply transforms produce new RDDs or to return some results. To filter out empty lines we can use the following filter transformation.
>>> lines_nonempty = lines.filter( lambda x: len(x) > 0 )
At this point, no actual data is processed. Spark/PySpark evaluates lazily, so its not until we extract result data from an RDD (or a chain of RDDs) that any actual processing will be done. The following code returns the number of non-empty lines:
>>> lines_nonempty.count()
1240997
To run the traditional wordcount example, see python/examples/wordcount.py in the spark installation. You can also run the following from the PySpark interpreter to find the 10 most commonly occurring words with their associated frequencies (not suprisingly, these are the usual stopwords):
>>> words = lines_nonempty.flatMap(lambda x: x.split())
>>> wordcounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y).map(lambda x:(x[1],x[0])).sortByKey(False)
>>> wordcounts.take(10)
[(463684, u'the'), (281800, u'and'), (281055, u'of'), (185953, u'to'), (138053, u'a'), (123173, u'in'), (91436, u'that'), (88530, u'I'), (65400, u'with'), (63112, u'he')]
We start by writing the transformation in a single invocation, with a few changes to deal with some punctuation characters and convert the text to lower case.
>>> wordcounts = sc.textFile('hdfs://ubuntu1:54310/user/dev/gutenberg') \
.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \
.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[1],x[0])) \
.sortByKey(False)
>>> wordcounts.take(10)
[(500662, u'the'), (331864, u'and'), (289323, u'of'), (196741, u'to'),
(149380, u'a'), (132609, u'in'), (100711, u'that'), (92052, u'i'),
(77469, u'he'), (72301, u'for')]
To understand whats going on its best to consider this program as a pipeline of transformations. Apart from the initial call to the textFile method of variable sc (SparkContext) to create the first resilient distributed dataset (RDD) by reading lines from each file in the specified directory on HDFS, subsequent calls transfrom each input RDD into a new output RDD. We’ll consider a simple example where we start by creating an RDD with just two lines with sc.parallelize, rather than reading the data from files with sc.textFile, and trace what each step in our wordcount program does. The lines are a quote from a Dr Seuss story.
>>> lines = sc.parallelize(['Its fun to have fun,','but you have to know how.'])
>>> wordcounts = lines.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \
.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y:x+y) \
.map(lambda x:(x[1],x[0])) \
.sortByKey(False)
>>> wordcounts.take(10)
[(2, 'to'), (2, 'fun'), (2, 'have'), (1, 'its'), (1, 'know'), (1, 'how'), (1, 'you'), (1, 'but')]
map( )
map returns a new RDD containing values created by applying the supplied function to each value in the original RDD. Here we use a lambda function which replaces some common punctuation characters with spaces and convert to lower case, producing a new RDD:
>>> r1 = lines.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower())
>>> r1.take(10)
['its fun to have fun ', 'but you have to know how ']
flatMap( )
flatMap applies a function which takes each input value and returns a list. Each value of the list becomes a new, separate value in the output RDD. In our example, the lines are split into words and then each word becomes a separate value in the output RDD:
>>> r2 = r1.flatMap(lambda x: x.split())
>>> r2.take(20)
['its', 'fun', 'to', 'have', 'fun', 'but', 'you', 'have', 'to', 'know', 'how']
>>>
map( )
In this second map invocation, we use a function which replaces each original value in the input RDD with a 2-tuple containing the word in the first position and the integer value 1 in the second position:
>>> r3 = r2.map(lambda x: (x, 1))
>>> r3.take(20)
[('its', 1), ('fun', 1), ('to', 1), ('have', 1), ('fun', 1), ('but', 1), ('you', 1), ('have', 1), ('to', 1), ('know', 1), ('how', 1)]
>>>
reduceByKey( <function )
Expect that the input RDD contains tuples of the form (
>>> r4 = r3.reduceByKey(lambda x,y:x+y)
>>> r4.take(20)
[('fun', 2), ('to', 2), ('its', 1), ('know', 1), ('how', 1), ('you', 1), ('have', 2), ('but', 1)]
map( )
map a lambda function to the data which will swap over the first and second values in each tuple, now the word count appears in the first position and the word in the second position
>>> r5 = r4.map(lambda x:(x[1],x[0]))
>>> r5.take(20)
[(2, 'fun'), (1, 'how'), (1, 'its'), (1, 'know'), (2, 'to'), (1, 'you'), (1, 'but'), (2, 'have')]
sortByKey( ascending=True)
sort the input RDD by the key value (the value at the first position in each tuple). In this example the first position stores the word count so this will sort the words so that the most frequently occurring words occur first in the RDD - the False parameter sets the sort order to descending (pass
>>> r6 = r5.sortByKey(ascending=False)
>>> r6.take(20)
[(2, 'fun'), (2, 'to'), (2, 'have'), (1, 'its'), (1, 'know'), (1, 'how'), (1, 'you'), (1, 'but')]
Spark supports the efficient parallel application of map and reduce operations by dividing data up into multiple partitions. In the example above, each file will by default generate one partition. What Spark adds to existing frameworks like Hadoop are the ability to add multiple map and reduce tasks to a single workflow. There are some useful ways to look at the distribution of objects in each partition in our rdd:
>>> lines = sc.textFile('hdfs://ubuntu1:54310/user/dev/gutenberg')
>>> def countPartitions(id,iterator):
c = 0
for _ in iterator:
c += 1
yield (id,c)
>>> lines.mapPartitionsWithSplit(countPartitions).collectAsMap()
{0: 566, 1: 100222, 2: 124796, 3: 3735, ..., 96: 6690, 97: 3921, 98: 16271, 99: 1138}
Each partition within an RDD is replicated across multiple workers running on different nodes in a cluster so that failure of a single worker should not cause the RDD to become unavailable. Many operations including map and flatMap operations can be applied independently to each partition, running as concurrent jobs based on the number of available cores. Typically these operations will preserve the number of partitions. When processing reduceByKey, Spark will create a number of output partitions based on the default paralellism based on the numbers of nodes and cores available to Spark. Data is effectively reshuffled so that input data from different input partitions with the same key value is passed to the same output partition and combined there using the specified reduce function. sortByKey is another operation which transforms N input partitions to M output partitions.
>>> sc.defaultParallelism 4
>>> wordcounts = sc.textFile('hdfs://ubuntu1:54310/user/dev/gutenberg') \
.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \
.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y:x+y)
>>> wordcounts.mapPartitionsWithSplit(countPartitions).collectAsMap()
{0: 122478, 1: 122549, 2: 121597, 3: 122587}
The number of partitions generated by the reduce stage can be controlled by supplying the desired number of partitions as an extra parameter to reduceByKey:
>>> wordcounts = sc.textFile('hdfs://ubuntu1:54310/user/dev/gutenberg') \
.map( lambda x: x.replace(',',' ').replace('.',' ').replace('-',' ').lower()) \
.flatMap(lambda x: x.split()) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda x,y:x+y,numPartitions=2)
>>> wordcounts.mapPartitionsWithSplit(countPartitions).collectAsMap()
{0: 244075, 1: 245136}
Apache Spark Examples
Word Count
In this example, we use a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
Pi Estimation
Spark can also be used for compute-intensive tasks. This code estimates π by “throwing darts” at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.
def sample(p):
x, y = random(), random()
return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
.reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
DataFrame API Examples
In Spark, a DataFrame is a distributed collection of data organized into named columns. Users can use DataFrame API to perform various relational operations on both external data sources and Spark’s built-in distributed collections without providing specific procedures for processing data. Also, programs based on DataFrame API will be automatically optimized by Spark’s built-in optimizer, Catalyst.
Text Search
In this example, we search through the error messages in a log file.
textFile = sc.textFile("hdfs://...")
# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
Simple Data Operations
In this example, we read a table stored in a database and calculate the number of people for every age. Finally, we save the calculated result to S3 in the format of JSON. A simple MySQL table “people” is used in the example and this table has two columns, “name” and “age”.
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
.read \
.format("jdbc") \
.option("url", url) \
.option("dbtable", "people") \
.load()
# Looks the schema of this DataFrame.
df.printSchema()
# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()
# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
Machine Learning Example
MLlib, Spark’s Machine Learning (ML) library, provides many distributed ML algorithms. These algorithms cover tasks such as feature extraction, classification, regression, clustering, recommendation, and more. MLlib also provides tools such as ML Pipelines for building workflows, CrossValidator for tuning parameters, and model persistence for saving and loading models. Prediction with Logistic Regression In this example, we take a dataset of labels and feature vectors. We learn to predict the labels from feature vectors using the Logistic Regression algorithm.
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])
# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)
# Fit the model to the data.
model = lr.fit(df)
# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
References:
- http://www.mccarroll.net/blog/pyspark/index.html
- http://www.mccarroll.net/blog/pyspark2/index.html
- http://spark.apache.org/examples.html