PySpark
From charlesreid1
Contents
Installing
Docker
Easiest solution: use Docker.
Use the Jupyter PySpark notebook Docker container: https://hub.docker.com/r/jupyter/pyspark-notebook/
This comes bundled with Apache Mesos, which is a cluster resource management framework. This enables you to connect to a Mesos-managed cluster and use compute resources on that cluster.
This Docker image is provided courtesy of the Jupyter project on Github: https://github.com/jupyter/docker-stacks
Nice explanation of how to set it up with either a standalone (single node) or Mesos cluster in the PySpark notebook image's README: https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook
Basically, here are the first few lines of a standalone notebook:
import pyspark sc = pyspark.SparkContext('local[*]') # do something to prove it works rdd = sc.parallelize(range(1000)) rdd.takeSample(False, 5)
Mac
Ensure you have the following software installed:
- Python 3.x distribution
- Jupyter notebook
- Java 8 JDK (link: https://www.java.com/en/download/faq/java_mac.xml)
Installing with Homebrew
Install Apache Spark using Homebrew:
$ brew install apache-spark
This should put pyspark on your path:
$ which pyspark /usr/local/bin/pyspark
I was still getting problems importing pyspark, so I also ended up running a
$ pip3 install pyspark
Linux
Have the following software installed:
- Python 3.x distribution
- Jupyter notebook
- Java 8 JDK (link: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
Download spark from this page: http://spark.apache.org/downloads.html
Now get the Scala build tool into aptitude (see https://stackoverflow.com/questions/35529913/how-to-install-sbt-on-ubuntu-debian-with-apt-get):
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823 $ sudo apt-get update $ sudo apt-get install sbt
Now unzip the Spark source, enter the directory, and run:
$ sbt assembly
Ensure Spark was built correctly by running this command from the same directory:
$ bin/pyspark
Now set the $SPARK_HOME environment variable to wherever your Spark lives:
export SPARK_HOME="/path/to/unzipped/spark-2.2"
Testing Out Pyspark
Test it out by running the pyspark command. This should look a bit like Python, but with a Spark splash message:
$ pyspark Python 2.7.10 (default, Feb 7 2017, 00:08:15) [GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/09/26 17:53:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/09/26 17:53:16 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/09/26 17:53:16 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/09/26 17:53:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Python version 2.7.10 (default, Feb 7 2017 00:08:15) SparkSession available as 'spark'. >>>
Test that it's ok by checking if the sc variable is holding a Spark context:
>>> sc <SparkContext master=local[*] appName=PySparkShell>
Set Up PySpark With Jupyter
To use PySpark through a Jupyter notebook, instead of through the command line, first make sure your Jupyter is up to date:
$ pip3 install --upgrade jupyter
You may also need to install pyspark using pip:
$ pip3 install --upgrade pyspark
Start up Jupyter and create a new notebook:
$ jupyter notebook
Make a cell that tests out pyspark:
import pyspark sc = pyspark.SparkContext('local[*]') # do something to prove it works rdd = sc.parallelize(range(1000)) rdd.takeSample(False, 5)
Error with conflicting Python versions
When I ran the above test, I saw this error that the workers were running Python 2 and the master was running Python 3:
(For background, the "python" command was pointing to Python 2, but I created a Python 3 notebook, so the workers were using the default Python command pointed to by "python" (version 2) while the notebook, running the PySpark master, was using Python version 3.)
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) <ipython-input-4-0fdbaf1f4e92> in <module>() 1 # do something to prove it works 2 rdd = sc.parallelize(range(1000)) ----> 3 rdd.takeSample(False, 5) /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in takeSample(self, withReplacement, num, seed) 477 return [] 478 --> 479 initialCount = self.count() 480 if initialCount == 0: 481 return [] /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in count(self) 1039 3 1040 """ -> 1041 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 1042 1043 def stats(self): /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in sum(self) 1030 6.0 1031 """ -> 1032 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 1033 1034 def count(self): /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in fold(self, zeroValue, op) 904 # zeroValue provided to each partition is unique from the one provided 905 # to the final reduce call --> 906 vals = self.mapPartitions(func).collect() 907 return reduce(op, vals, zeroValue) 908 /usr/local/lib/python3.6/site-packages/pyspark/rdd.py in collect(self) 807 """ 808 with SCCallSiteSync(self.context) as css: --> 809 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 810 return list(_load_from_socket(port, self._jrdd_deserializer)) 811 /usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args: /usr/local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main ("%d.%d" % sys.version_info[:2], version)) Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main ("%d.%d" % sys.version_info[:2], version)) Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
I ended up running
$ export PYSPARK_PYTHON="python3" jupyter notebook
This resolved the issue. You should see:
Out[1] : [285, 777, 376, 101, 737]
RDDs
RDD = Resilient distributed dataset
RDDs are a central concept in Spark. They provide a way of splitting a larger data set into chunks. (Think of it like the Spark version of Hadoop's in-memory HDFS file system).
RDDs keep track of the transformations applied to each chunk, speeding up computations. If any chunks are lost, the transformations can be re-applied.
Internal Workings
RDDs operate in parallel - they are distributed in parallel, executed in parallel, transformed in parallel.
Transformations are lazy - the actions are accumulated but not performed, until the user calls for action. They also optimize execution.
Example of an inefficient workflow:
- Count the number of occurrences of each word
- Select the word counts of all words starting with the letter "T"
- Print the results
The inefficiency in this workflow comes from counting all of the words that don't start with the letter T, which will never be returned. Spark's model is designed to optimize these calculations.
To illustrate, let's examine how the inefficient workflow is implemented in Spark code:
First, tell Spark to count words:
.map(lambda w : (w,1))
Then, tell Spark to map words starting with T:
.filter( lambda w : w.startswith('T') )
Then, we call the reduce by key:
.reduceByKey(operator.add)
This reduces the data set and adds (counts) the number of occurrences of each key.
Finally, these results are collected using the collect method:
.collect()
Calling collect is the user telling Spark to go into action - up until that point, the maps, filters, and reduce operations are just accumulated. When collect is called, the actions are actually carried out.
Creating RDDs
To create an RDD, you can use two ways:
- Parallelize a list or array of elements
- Load a text file in parallel
Example of parallelizing an array:
data = sc.parallelize( [ ('A',1), ('B',2), ('C',3), ('D',4) ] )
The data object is a ParallelCollectionRDD object.
Example of parallelizing a file:
fileData = sc.textFile('/path/to/file/stuff.txt' , 2)
(The last number in this argument specifies how many partitions to divide the dataset into).
Spark can also load compressed (gzipped) files directly:
compressedFileData = sc.textFile('/path/to/file/stuff.txt.gz' , 2)
These two objects are MapPartitionsRDD objects.
Spark can read from multiple file system types: local disks, network filesystems, Amazon S3, Google Cloud, Hadoop HDFS, Cassandra, and others.
Spark can also read data in different formats - text, json, Hive, SQL query results from RDBs.
RDDs have no schema, and can therefore be heterogeneous. For example, you can parallelize a list that consists of a tuple, and a dict, and a list, and Spark is okay with that. When you collect the results again (which returns all of the data back to the driver, or master, node), the resulting data set will function as any list containing a tuple, a dict, and a list.
Local Mode vs Cluster Mode
PySpark can be run in local mode or in cluster mode.
Local Mode
In local mode, things work like your "standard" Python script - there is one namespace, and if any process (on a worker or master node) changes data, those changes are visible to every worker. This creates a lot of communication overhead, so it isn't sustainable for a large cluster, but it might be useful if you have, say, two or three nodes. This reduces the cognitive overhead of switching your entire script to work for a cluster.
Cluster Mode
In cluster mode, the user submits a job for execution to the driver (master) node. The master creates a directed acyclic graph (see Graphs/DAGs) to decide which workers will perform the task. The master prepares each task's "closure" (materials, data, variables, methods to make sure the worker node actually has everything required to finish the task). It then copies that information over to the worker.
In cluster mode, the data is static - that's what we mean when we say copies are distributed to each worker. Each worker node can modify the data without affecting other workers' copies or the master node's copy.
Example
An example of a calculation in cluster mode, to illustrate:
Suppose we are numerically integrating a function over an interval using PySpark. Then we would divide the entire interval into pieces, and assign each piece of the interval to a different worker node. Each worker node runs the same code, with the same variables - delta x, start_x, end_x, and running_sum - but each worker has a separate copy of these variables. When the task is complete and each worker has found the sum of the approximating areas, these are accumulated in running_sum, and these disparate running_sum values are all collected into a single return value.
By contrast, if we were running this in local mode, we would have to proceed in serial - starting at the beginning of the interval, and proceeding to the end of the interval. (Or, alternatively, we would need multiple delta_x, start_x, end_x, and running_sum variables.)
Transformations
There are a LOT of transformations available. Full list available in PySpark documentation:
Lists
A list of the most important/common transforms:
- map
- flatMap
- distinct
- sample
- leftOuterJoin
- repartition
A pretty comprehensive list of transforms:
|
|
|
|
|
Map Transform
The map transform is probably the most common; it applies a function to each element of the RDD.
Example: suppose we have a list of strings, and we want to turn them into integers. Then we can use map(), together with a lambda function, to apply the int() function to each string:
data_str = sc.parallelize(['15','25','38','42','127','384', ..., '1025' ] ) data_int = data_str.map( lambda x : int(x) )
Now if we run data_int.take(5)
, we'll see the first 5 values as a list of integers:
[15, 25, 38, 42, 127]
We can also build more complex lambda functions that return, e.g., tuples:
data_tuple = data_str.map( lambda x : (x, int(x)) )
which would return tuples with the first element being the string form of the number and the second element being the integer form of the number:
[ ('15', 15), ('25', 25), ('38', 38), ('42', 42), ('127', 127)]
Filter Transformation
The filter transformation is a way of filtering out data according to boolean criteria.
Provide a lambda function that returns a boolean. Filter will only return values for the RDD for which the boolean function returned True.
data_filt = data_str.map( lambda x : int(x) ).filter( lambda x : (x > 28 and x < 100) )
which would return
[38, 42]
FlatMap Transformation
Flat map is like map, in that it applies a function to each element of an RDD. However, it flattens anything that results. So if a function returns, say, a list of tuples, or a list of lists, flatMap will flatten those to a single, flat, shallow list. Compare:
data_notflat = data_str.map( lambda x : (x, int(x)) )
which resulted in a list of tuples. Compare with the flatMap call:
data_flat = data_str.flatMap( lambda x : (x, int(x)) )
which returns a flat map:
['15', 15, '25', 25, '38', 38, '42', 42, '127', 127, ... ]
Distinct Transformation
The distinct transformation is like the Numpy unique() function, it returns all of the unique values found in an RDD.
Suppose you have a file that contains information about people, and the fifth column contains an entry for gender. To obtain all unique values for this column (and remembering lists are zero-indexed):
distinct_gender = file_data.map( lambda row : row[4]).distinct() distinc_gender.collect()
would return:
['O', 'M', 'F']
male/female/other.
NOTE: the distinct method is computationally expensive, as it parses all of the data.
Sample Transformation
This transformation samples the full data set and returns the sample. Think of it as picking out M representatives from a population of N things.
The parameters are:
- Boolean - should the samples be replaced
- Float - what fraction of the data should be sampled
- Integer - what seed to use for the RNG
Example:
# Sample without replacing the values (remove them from the population) # Sample 20% of the population # Seed the random number generator with 1337 b/c we are 1337 fraction = 0.2 data_sample = file_data.sample( False, fraction, 1337)
Now we've basically split our data set into a 20% piece and an 80% piece.
One more transformation we can apply that's useful in this context is count():
print("Population count: %d / Sample count: %d" % (file_data.count(), data_sample.count()) )
LeftOuterJoin Transformation
Operation inspired by SQL, left outer join joins two data sets on an index based on keys. Outer join means it will keep any keys that are only in one or the other data set. Left outer join means it will keep any keys that are only in the left data set and not in the right data set.
rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] ) rdd2 = sc.parallelize( [ ('a', 2), ('b', 3), ('b', 18), ('c', 'FOUR'), ('e', 6) ] )
There are three shared keys: a, b, c
There is one key only in rdd1: d
There is one key only in rdd2: e
When we perform a leftOuterJoin, where left is rdd1, all keys from rdd1 will be kept, so we will see a, b, c, d. Keys in right collection, rdd2, that are not in rdd1 will be discarded, so we will not see e.
The result of combining the values will be a tuple. So, when we combine the two tuples ('a',1) and ('a', 2), the result will be ('a', (1, 2) ).
The result of combining the two (repeated) values (b) will be two tuples. When we combine the tuples ('b', 4) and ('b', 3), the result is ('b', (4,3)). Then, when we combine the tuples ('b', 4) and ('b', 18), the result is ('b', (4, 18)). So overall, the result is two tuples, ('b', (4,3)) and ('b', (4,18)).
Running the left outer join:
rdd3 = rdd1.leftOuterJoin(rdd2) rdd3.collect()
results in:
[ ('a', (1, 2) ), ('c', (9, 'FOUR')), ('b', (4, 3)), ('d', (16, None)), ('b', (4, 18)) ]
The tuples are not returned in order. The two b keys result in two tuples, as mentioned. The left-only key still returns a tuple, but uses null (None) for the value from the right.
Join Transformation
The join transformation, like SQL joins, discards any keys that are not in both data sets. If we use the same example RDDs as before:
rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] ) rdd2 = sc.parallelize( [ ('a', 2), ('b', 3), ('b', 18), ('c', 'FOUR'), ('e', 6) ] )
we should only see results for the three shared keys: a, b, c
Keys d and e only occur in one or the other, and so will not make it to the join result.
Running the join:
rdd3 = rdd1.join(rdd2) rdd3.collect()
results in:
[ ('b', (4, 18) ), ('a', (1, 2) ), ('c', (9, 'FOUR') ), ('b', (4, 3) ) ]
The tuples are not returned in order. The two b keys result in two tuples, as mentioned. The left-only key still returns a tuple, but uses null (None) for the value from the right.
Intersection Transformation
Intersection performs a set intersection of two RDDs:
rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] ) rdd2 = sc.parallelize( [ ('a', 1), ('b', 3), ('c', 18), ('d', 45) ] )
Running intersection:
rdd3 = rdd1.intersection(rdd2) rdd3.collect()
Repartition Transformation
This function will change the number of partitions into which the data set is distributed.
rdd1 = rdd1.repartition(8)
Actions
Unlike transformations, which PySpark lazily accumulates and does not actually do, actions tell PySpark to actually carry out the transformations and do something with the results.
Take Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take
As useful and common as the map method. The take() method returns N things from the results RDD.
top_three = rdd1.take(3)
Take Sample Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeSample
Like take, the takeSample() method returns a specified number of records, but it samples randomly.
Three arguments:
- Boolean - should sampling be done with replacement
- Integer - number of records to return
- Integer - seed for RNG
# Get a copy of one random record random_three = rdd1.takeSample(True, 1, 1337)
Collect Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect
The collect() method returns all of the elements of the RDD to the master node. This is potentially an intensive operation.
Reduce Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduce
The reduce() method will reduce elements of an RDD according to a function that is provided. This reduce operation might, for example, be a sum of all elements in the RDD:
rdd1.reduce(lambda x, y : x + y)
(Note that x and y here refer to the prior value and the next value.)
Reduce methods must satisfy two properties:
- associativity (order of elements does not matter)
- commutative (order of operations does not matter)
Addition satisfies this property; division does not.
Reduce By Key Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey
The reduceByKey() method similarly reduces the elements of an RDD using a provided function, but works by applying the reduce function only to elements with common keys.
Start with an example RDD:
rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 )
Now we reduce by key, where the key is the first element of the tuple:
rdd1.reduceByKey( lambda x, y : x + y ).collect()
This results in:
[ ( 'b', 4 ), ( 'c', 2 ), ( 'a', 12 ), ( 'd', 5 ) ]
(Note that x,y in the lambda function does NOT refer to the key-value pair in the tuple, it refers to the PRIOR and CURRENT values having the same key).
Count Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count
The count method just counts the number of elements in the RDD.
rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 ) rdd1.count()
will produce 7.
The advantage of using count, instead of collecting the whole data set and finding its length, is that count performs the count on each worker node, then gathers the results of the count - so there is no need to copy all of the data back to the master node.
That means the count method is pretty fast.
Count By Key Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByKey
Suppose we want to count the number of elements, but grouped by key. We can do that using the saveAsTextFile() method:
rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 ) rdd1.countByKey() # <-- returns a dictionary (more convenient) rdd1.countByKey().items() # <-- returns a list of tuples (easier to print)
The last call (the list of items) returns something easier to print:
[ ('a', 2), ('b', 2), ('c', 1), ('d', 2) ]
Save As Text File Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsTextFile
This, obviously, saves the RDD into a text file. It uses whatever string representation of the elements is defined.
sc.parallelize(['wuz', 'foo', 'baz', 'bar', 'fizz']).saveAsTextFile('/path/to/temp/file.txt')
Blank lines are okay:
sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile('/path/to/temp/file.txt')
A more fancy example provided in the documentation shows the use of a NamedTemporaryFile object to create a temporary file in which to put the RDD:
First, the temporary file is created and deleted. This is just to give us a temporary file name.
tempFile = NamedTemporaryFile(delete=True) tempFile.close()
Next, we create an RDD, and save it to the temporary text file we created:
sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
Now, because we have parallelized the RDD, it actually saves the data in multiple parts. We passed it a temporary file name, but we have to add "/part-0000*" to the end to examine the entire file. Here, we use glob to get a list of files, read them into memory, sort their contents, and print them to a string:
from fileinput import input from glob import glob ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
The output from this is:
'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
For Each Method
Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foreach
This performs a function for each element in the RDD iterativey. While this is similar to map(), which applies a function to each element of the RDD, map() works in parallel, with everything happening at once. The foreach() method is more appropriate when the algorithm needs to perform actions one record at a time.
Example: suppose you're inserting records into a database, and the records are inserted in order. You don't want to jam everything into the database all at once and risk losing/duplicating/corrupting data. In that case, use foreach().
def f(x): print(x) rdd1.foreach(f)