From charlesreid1

Installing

Docker

Easiest solution: use Docker.

Use the Jupyter PySpark notebook Docker container: https://hub.docker.com/r/jupyter/pyspark-notebook/

This comes bundled with Apache Mesos, which is a cluster resource management framework. This enables you to connect to a Mesos-managed cluster and use compute resources on that cluster.

This Docker image is provided courtesy of the Jupyter project on Github: https://github.com/jupyter/docker-stacks

Nice explanation of how to set it up with either a standalone (single node) or Mesos cluster in the PySpark notebook image's README: https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook

Basically, here are the first few lines of a standalone notebook:

import pyspark
sc = pyspark.SparkContext('local[*]')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

Mac

Ensure you have the following software installed:

Installing with Homebrew

Install Apache Spark using Homebrew:

$ brew install apache-spark

This should put pyspark on your path:

$ which pyspark
/usr/local/bin/pyspark

I was still getting problems importing pyspark, so I also ended up running a

$ pip3 install pyspark

Linux

Have the following software installed:

Download spark from this page: http://spark.apache.org/downloads.html

Now get the Scala build tool into aptitude (see https://stackoverflow.com/questions/35529913/how-to-install-sbt-on-ubuntu-debian-with-apt-get):

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
$ sudo apt-get update
$ sudo apt-get install sbt

Now unzip the Spark source, enter the directory, and run:

$ sbt assembly

Ensure Spark was built correctly by running this command from the same directory:

$ bin/pyspark

Now set the $SPARK_HOME environment variable to wherever your Spark lives:

export SPARK_HOME="/path/to/unzipped/spark-2.2"


Testing Out Pyspark

Test it out by running the pyspark command. This should look a bit like Python, but with a Spark splash message:

$ pyspark
Python 2.7.10 (default, Feb  7 2017, 00:08:15)
[GCC 4.2.1 Compatible Apple LLVM 8.0.0 (clang-800.0.34)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/09/26 17:53:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/09/26 17:53:16 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/09/26 17:53:16 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
17/09/26 17:53:17 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.10 (default, Feb  7 2017 00:08:15)
SparkSession available as 'spark'.
>>>

Test that it's ok by checking if the sc variable is holding a Spark context:

>>> sc
<SparkContext master=local[*] appName=PySparkShell>

Set Up PySpark With Jupyter

To use PySpark through a Jupyter notebook, instead of through the command line, first make sure your Jupyter is up to date:

$ pip3 install --upgrade jupyter

You may also need to install pyspark using pip:

$ pip3 install --upgrade pyspark

Start up Jupyter and create a new notebook:

$ jupyter notebook

Make a cell that tests out pyspark:

import pyspark
sc = pyspark.SparkContext('local[*]')

# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

Error with conflicting Python versions

When I ran the above test, I saw this error that the workers were running Python 2 and the master was running Python 3:

(For background, the "python" command was pointing to Python 2, but I created a Python 3 notebook, so the workers were using the default Python command pointed to by "python" (version 2) while the notebook, running the PySpark master, was using Python version 3.)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-0fdbaf1f4e92> in <module>()
      1 # do something to prove it works
      2 rdd = sc.parallelize(range(1000))
----> 3 rdd.takeSample(False, 5)

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in takeSample(self, withReplacement, num, seed)
    477             return []
    478 
--> 479         initialCount = self.count()
    480         if initialCount == 0:
    481             return []

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in count(self)
   1039         3
   1040         """
-> 1041         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1042 
   1043     def stats(self):

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in sum(self)
   1030         6.0
   1031         """
-> 1032         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
   1033 
   1034     def count(self):

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in fold(self, zeroValue, op)
    904         # zeroValue provided to each partition is unique from the one provided
    905         # to the final reduce call
--> 906         vals = self.mapPartitions(func).collect()
    907         return reduce(op, vals, zeroValue)
    908 

/usr/local/lib/python3.6/site-packages/pyspark/rdd.py in collect(self)
    807         """
    808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    810         return list(_load_from_socket(port, self._jrdd_deserializer))
    811 

/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 5, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 123, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more

I ended up running

$ export PYSPARK_PYTHON="python3" jupyter notebook

This resolved the issue. You should see:

Out[1] : [285, 777, 376, 101, 737]

RDDs

RDD = Resilient distributed dataset

RDDs are a central concept in Spark. They provide a way of splitting a larger data set into chunks. (Think of it like the Spark version of Hadoop's in-memory HDFS file system).

RDDs keep track of the transformations applied to each chunk, speeding up computations. If any chunks are lost, the transformations can be re-applied.

Internal Workings

RDDs operate in parallel - they are distributed in parallel, executed in parallel, transformed in parallel.

Transformations are lazy - the actions are accumulated but not performed, until the user calls for action. They also optimize execution.

Example of an inefficient workflow:

  • Count the number of occurrences of each word
  • Select the word counts of all words starting with the letter "T"
  • Print the results

The inefficiency in this workflow comes from counting all of the words that don't start with the letter T, which will never be returned. Spark's model is designed to optimize these calculations.

To illustrate, let's examine how the inefficient workflow is implemented in Spark code:

First, tell Spark to count words:

.map(lambda w : (w,1))

Then, tell Spark to map words starting with T:

.filter( lambda w : w.startswith('T') )

Then, we call the reduce by key:

.reduceByKey(operator.add)

This reduces the data set and adds (counts) the number of occurrences of each key.

Finally, these results are collected using the collect method:

.collect()

Calling collect is the user telling Spark to go into action - up until that point, the maps, filters, and reduce operations are just accumulated. When collect is called, the actions are actually carried out.

Creating RDDs

To create an RDD, you can use two ways:

  • Parallelize a list or array of elements
  • Load a text file in parallel

Example of parallelizing an array:

data = sc.parallelize( [ ('A',1), ('B',2), ('C',3), ('D',4) ] )

The data object is a ParallelCollectionRDD object.

Example of parallelizing a file:

fileData = sc.textFile('/path/to/file/stuff.txt' , 2)

(The last number in this argument specifies how many partitions to divide the dataset into).

Spark can also load compressed (gzipped) files directly:

compressedFileData = sc.textFile('/path/to/file/stuff.txt.gz' , 2)

These two objects are MapPartitionsRDD objects.

Spark can read from multiple file system types: local disks, network filesystems, Amazon S3, Google Cloud, Hadoop HDFS, Cassandra, and others.

Spark can also read data in different formats - text, json, Hive, SQL query results from RDBs.

RDDs have no schema, and can therefore be heterogeneous. For example, you can parallelize a list that consists of a tuple, and a dict, and a list, and Spark is okay with that. When you collect the results again (which returns all of the data back to the driver, or master, node), the resulting data set will function as any list containing a tuple, a dict, and a list.

Local Mode vs Cluster Mode

PySpark can be run in local mode or in cluster mode.

Local Mode

In local mode, things work like your "standard" Python script - there is one namespace, and if any process (on a worker or master node) changes data, those changes are visible to every worker. This creates a lot of communication overhead, so it isn't sustainable for a large cluster, but it might be useful if you have, say, two or three nodes. This reduces the cognitive overhead of switching your entire script to work for a cluster.

Cluster Mode

In cluster mode, the user submits a job for execution to the driver (master) node. The master creates a directed acyclic graph (see Graphs/DAGs) to decide which workers will perform the task. The master prepares each task's "closure" (materials, data, variables, methods to make sure the worker node actually has everything required to finish the task). It then copies that information over to the worker.

In cluster mode, the data is static - that's what we mean when we say copies are distributed to each worker. Each worker node can modify the data without affecting other workers' copies or the master node's copy.

Example

An example of a calculation in cluster mode, to illustrate:

Suppose we are numerically integrating a function over an interval using PySpark. Then we would divide the entire interval into pieces, and assign each piece of the interval to a different worker node. Each worker node runs the same code, with the same variables - delta x, start_x, end_x, and running_sum - but each worker has a separate copy of these variables. When the task is complete and each worker has found the sum of the approximating areas, these are accumulated in running_sum, and these disparate running_sum values are all collected into a single return value.

By contrast, if we were running this in local mode, we would have to proceed in serial - starting at the beginning of the interval, and proceeding to the end of the interval. (Or, alternatively, we would need multiple delta_x, start_x, end_x, and running_sum variables.)

Transformations

There are a LOT of transformations available. Full list available in PySpark documentation:

Lists

A list of the most important/common transforms:

  • map
  • flatMap
  • distinct
  • sample
  • leftOuterJoin
  • repartition

A pretty comprehensive list of transforms:

  • aggregate
  • aggregateByKey
  • cache
  • cartesian
  • checkpoint
  • coalesce
  • cogroup
  • collect
  • collectAsMap
  • combineByKey
  • context
  • count
  • countApprox
  • countApproxDistinct
  • countByKey
  • countByValue
  • distinct
  • filter
  • first
  • flatMap
  • flatMapValues
  • fold
  • foldByKey
  • foreach
  • foreachPartition
  • fullOuterJoin
  • getCheckpointFile
  • getNumPartitions
  • getStorageLevel
  • glom
  • groupBy
  • groupByKey
  • groupWith
  • histogram
  • id
  • intersection
  • isCheckpointed
  • isEmpty
  • isLocallyCheckpointed
  • join
  • keyBy
  • keys
  • leftOuterJoin
  • localCheckpoint
  • lookup
  • map
  • mapPartitions
  • mapValues
  • max
  • mean
  • name
  • persist
  • pipe
  • randomSplit
  • reduce
  • reduceByKey
  • reduceByKeyLocally
  • repartition
  • rightOuterJoin
  • sample
  • sampleByKey
  • sampleStdev
  • sampleVariance
  • saveAsHadoopDataset
  • saveAsPickleFile
  • saveAsTextFile
  • setName
  • sortBy
  • stats
  • stdev
  • subtract
  • sum
  • sumApprox
  • take
  • takeSample
  • top
  • treeAggregate
  • treeReduce
  • union
  • unpersist
  • values
  • variance
  • zip

Map Transform

The map transform is probably the most common; it applies a function to each element of the RDD.

Example: suppose we have a list of strings, and we want to turn them into integers. Then we can use map(), together with a lambda function, to apply the int() function to each string:

data_str = sc.parallelize(['15','25','38','42','127','384', ..., '1025' ] )
data_int = data_str.map( lambda x : int(x) )

Now if we run data_int.take(5), we'll see the first 5 values as a list of integers:

[15, 25, 38, 42, 127]

We can also build more complex lambda functions that return, e.g., tuples:

data_tuple = data_str.map( lambda x : (x, int(x)) )

which would return tuples with the first element being the string form of the number and the second element being the integer form of the number:

[ ('15', 15),
  ('25', 25),
  ('38', 38),
  ('42', 42),
  ('127', 127)]

Filter Transformation

The filter transformation is a way of filtering out data according to boolean criteria.

Provide a lambda function that returns a boolean. Filter will only return values for the RDD for which the boolean function returned True.

data_filt = data_str.map( lambda x : int(x) ).filter( lambda x : (x > 28 and x < 100) )

which would return

[38, 42]

FlatMap Transformation

Flat map is like map, in that it applies a function to each element of an RDD. However, it flattens anything that results. So if a function returns, say, a list of tuples, or a list of lists, flatMap will flatten those to a single, flat, shallow list. Compare:

data_notflat = data_str.map( lambda x : (x, int(x)) )

which resulted in a list of tuples. Compare with the flatMap call:

data_flat = data_str.flatMap( lambda x : (x, int(x)) )

which returns a flat map:

['15', 15, '25', 25, '38', 38, '42', 42, '127', 127, ... ]

Distinct Transformation

The distinct transformation is like the Numpy unique() function, it returns all of the unique values found in an RDD.

Suppose you have a file that contains information about people, and the fifth column contains an entry for gender. To obtain all unique values for this column (and remembering lists are zero-indexed):

distinct_gender = file_data.map( lambda row : row[4]).distinct()
distinc_gender.collect()

would return:

['O', 'M', 'F']

male/female/other.

NOTE: the distinct method is computationally expensive, as it parses all of the data.

Sample Transformation

This transformation samples the full data set and returns the sample. Think of it as picking out M representatives from a population of N things.

The parameters are:

  • Boolean - should the samples be replaced
  • Float - what fraction of the data should be sampled
  • Integer - what seed to use for the RNG

Example:

# Sample without replacing the values (remove them from the population)
# Sample 20% of the population
# Seed the random number generator with 1337 b/c we are 1337
fraction = 0.2
data_sample = file_data.sample( False, fraction, 1337)

Now we've basically split our data set into a 20% piece and an 80% piece.

One more transformation we can apply that's useful in this context is count():

print("Population count: %d / Sample count: %d" % (file_data.count(), data_sample.count()) )

LeftOuterJoin Transformation

Operation inspired by SQL, left outer join joins two data sets on an index based on keys. Outer join means it will keep any keys that are only in one or the other data set. Left outer join means it will keep any keys that are only in the left data set and not in the right data set.

rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] )
rdd2 = sc.parallelize( [ ('a', 2), ('b', 3), ('b', 18), ('c', 'FOUR'), ('e', 6) ] )

There are three shared keys: a, b, c

There is one key only in rdd1: d

There is one key only in rdd2: e

When we perform a leftOuterJoin, where left is rdd1, all keys from rdd1 will be kept, so we will see a, b, c, d. Keys in right collection, rdd2, that are not in rdd1 will be discarded, so we will not see e.

The result of combining the values will be a tuple. So, when we combine the two tuples ('a',1) and ('a', 2), the result will be ('a', (1, 2) ).

The result of combining the two (repeated) values (b) will be two tuples. When we combine the tuples ('b', 4) and ('b', 3), the result is ('b', (4,3)). Then, when we combine the tuples ('b', 4) and ('b', 18), the result is ('b', (4, 18)). So overall, the result is two tuples, ('b', (4,3)) and ('b', (4,18)).

Running the left outer join:

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.collect()

results in:

[ ('a', (1, 2) ), ('c', (9, 'FOUR')), ('b', (4, 3)), ('d', (16, None)), ('b', (4, 18)) ]

The tuples are not returned in order. The two b keys result in two tuples, as mentioned. The left-only key still returns a tuple, but uses null (None) for the value from the right.


Join Transformation

The join transformation, like SQL joins, discards any keys that are not in both data sets. If we use the same example RDDs as before:

rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] )
rdd2 = sc.parallelize( [ ('a', 2), ('b', 3), ('b', 18), ('c', 'FOUR'), ('e', 6) ] )

we should only see results for the three shared keys: a, b, c

Keys d and e only occur in one or the other, and so will not make it to the join result.

Running the join:

rdd3 = rdd1.join(rdd2)
rdd3.collect()

results in:

[ ('b', (4, 18) ), ('a', (1, 2) ), ('c', (9, 'FOUR') ), ('b', (4, 3) ) ]

The tuples are not returned in order. The two b keys result in two tuples, as mentioned. The left-only key still returns a tuple, but uses null (None) for the value from the right.

Intersection Transformation

Intersection performs a set intersection of two RDDs:

rdd1 = sc.parallelize( [ ('a', 1), ('b', 4), ('c', 9), ('d', 16) ] )
rdd2 = sc.parallelize( [ ('a', 1), ('b', 3), ('c', 18), ('d', 45) ] )

Running intersection:

rdd3 = rdd1.intersection(rdd2)
rdd3.collect()

Repartition Transformation

This function will change the number of partitions into which the data set is distributed.

rdd1 = rdd1.repartition(8)

Actions

Unlike transformations, which PySpark lazily accumulates and does not actually do, actions tell PySpark to actually carry out the transformations and do something with the results.

Take Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take

As useful and common as the map method. The take() method returns N things from the results RDD.

top_three = rdd1.take(3)

Take Sample Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeSample

Like take, the takeSample() method returns a specified number of records, but it samples randomly.

Three arguments:

  • Boolean - should sampling be done with replacement
  • Integer - number of records to return
  • Integer - seed for RNG
# Get a copy of one random record
random_three = rdd1.takeSample(True, 1, 1337)

Collect Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect

The collect() method returns all of the elements of the RDD to the master node. This is potentially an intensive operation.

Reduce Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduce

The reduce() method will reduce elements of an RDD according to a function that is provided. This reduce operation might, for example, be a sum of all elements in the RDD:

rdd1.reduce(lambda x, y : x + y)

(Note that x and y here refer to the prior value and the next value.)

Reduce methods must satisfy two properties:

  • associativity (order of elements does not matter)
  • commutative (order of operations does not matter)

Addition satisfies this property; division does not.

Reduce By Key Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

The reduceByKey() method similarly reduces the elements of an RDD using a provided function, but works by applying the reduce function only to elements with common keys.

Start with an example RDD:

rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 )

Now we reduce by key, where the key is the first element of the tuple:

rdd1.reduceByKey( lambda x, y : x + y ).collect()

This results in:

[ ( 'b', 4 ), ( 'c', 2 ), ( 'a', 12 ), ( 'd', 5 ) ]

(Note that x,y in the lambda function does NOT refer to the key-value pair in the tuple, it refers to the PRIOR and CURRENT values having the same key).

Count Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count

The count method just counts the number of elements in the RDD.

rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 )
rdd1.count()

will produce 7.

The advantage of using count, instead of collecting the whole data set and finding its length, is that count performs the count on each worker node, then gathers the results of the count - so there is no need to copy all of the data back to the master node.

That means the count method is pretty fast.

Count By Key Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByKey

Suppose we want to count the number of elements, but grouped by key. We can do that using the saveAsTextFile() method:

rdd1 = sc.parallelize( [ ('a',4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3) ], 4 )
rdd1.countByKey() # <-- returns a dictionary (more convenient)
rdd1.countByKey().items() # <-- returns a list of tuples (easier to print)

The last call (the list of items) returns something easier to print:

[ ('a', 2), ('b', 2), ('c', 1), ('d', 2) ]

Save As Text File Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.saveAsTextFile

This, obviously, saves the RDD into a text file. It uses whatever string representation of the elements is defined.

sc.parallelize(['wuz', 'foo', 'baz', 'bar', 'fizz']).saveAsTextFile('/path/to/temp/file.txt')

Blank lines are okay:

sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile('/path/to/temp/file.txt')

A more fancy example provided in the documentation shows the use of a NamedTemporaryFile object to create a temporary file in which to put the RDD:

First, the temporary file is created and deleted. This is just to give us a temporary file name.

tempFile = NamedTemporaryFile(delete=True)
tempFile.close()

Next, we create an RDD, and save it to the temporary text file we created:

sc.parallelize(range(10)).saveAsTextFile(tempFile.name)

Now, because we have parallelized the RDD, it actually saves the data in multiple parts. We passed it a temporary file name, but we have to add "/part-0000*" to the end to examine the entire file. Here, we use glob to get a list of files, read them into memory, sort their contents, and print them to a string:

from fileinput import input
from glob import glob
''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))

The output from this is:

'0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'

For Each Method

Documentation: https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.foreach

This performs a function for each element in the RDD iterativey. While this is similar to map(), which applies a function to each element of the RDD, map() works in parallel, with everything happening at once. The foreach() method is more appropriate when the algorithm needs to perform actions one record at a time.

Example: suppose you're inserting records into a database, and the records are inserted in order. You don't want to jam everything into the database all at once and risk losing/duplicating/corrupting data. In that case, use foreach().

def f(x):
    print(x)

rdd1.foreach(f)

Flags