Apache Spark and PySpark on CentOS/RHEL 7.x

What is Apache Spark

You may have noticed, wherever there is a talk about big data the name Apache Spark eventually comes up, in simplest words it’s a large-scale data processing engine. Apache Spark is a fast data processing framework with provided APIs to connect and perform big data processing. Spark being the largest open-source data processing engine, has been adopted by large companies – Yahoo, eBay, Netflix, have massive scale Spark deployments, processing multiple petabytes of data on clusters of over 8,000 nodes.
Apache Spark can be started as a standalone cluster (which we’ll be doing for this tutorial), or using Mesos or YARN as cluster managers. Spark can work with data from various sources, AWS S3, HDFS, Cassandra, Hive (structured data), HBase, or any other Hadoop data source. Above all what makes Spark high in-demand is the included libraries MLib, SQL and DataFrames, GraphX, and Spark Streaming, to cater the main data processing use-cases, such that users can combinely use all these libraries in the same application.

What is PySpark ?

Apache Spark provides various APIs for services to perform big data processing on it’s engine. PySpark is the Python API, exposing Spark programming model to Python applications. PySpark is also available out-of-the-box as an interactive Python shell, provide link to the Spark core and starting the Spark context.

Since Spark 2.2.0 PySpark is also available as a Python package at PyPI, which can be installed using pip. In Spark 2.1, though it was available as a Python package, but not being on PyPI, one had to install is manually, by executing the setup.py in <spark-directory>/python., and once installed it was required to add the path to PySpark lib in the PATH. Python (2.6 or higher) and Apache Spark the requirements for PySpark.


Apache Spark Standalone cluster Setup

Requirements

  • Java (JRE) 7+
  • Python 2.6 or higher

Step 0. Prelimenaries

Run yum update

$ sudo yum update

Stop the firewall

To make the ports accessible i.e. for master and slave/s to communicate.
$ sudo systemclt stop firewall-cmd

Disable SELinux

$ sudo setenforce 0

The above command will disable SELinux for the session i.e. until next reboot – to permanently disable it set SELINUX=disabled in /etc/selinux/config file.


Step 1: Install Java

There’s a separate blog post – install Java 8 on CentOS/RHEL 7.x
If you already have it installed, you van verify athe version.

[nahmed@localhost ~]$ java -version
openjdk version "1.8.0_111"
OpenJDK Runtime Environment (build 1.8.0_111-b15)
OpenJDK 64-Bit Server VM (build 25.111-b15, mixed mode)


Step 2: Install Apache Spark

As mentioned above, Spark can be deployed using YARN or Mesos as cluster manger, but it also allows a simple standalone deployment.
Spark can be installed either by downloading and compiling the source code as per the environment, or simply using the pre-built binaries. For simplification we’ll be using the pre-built binaries. For a standalone Spark cluster, all we need to do is place the pre-built Spark binary on each node of the cluster. There 2 main requirements for Apache Spark, Java JRE, and Scala.
Note: Spark installs Scala during the installation process, so we just need to make sure that Java and Python are present. Starting version 2.0, Spark is built with Scala 2.11 by default.

Download the Spark binaries

Execute the following commands on all the nodes, or download the binary on a node, and copy to other nodes. I have used the latest available bin 2.2.1 pre-built for Apache Hadoop 2.7 or later.
cd /opt
wget http://www-eu.apache.org/dist/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz

Untar the binary

tar -xzf spark-2.2.1-bin-hadoop2.7.tgz
Output:
pyspark-bin

Create a symlink

This way we can use multiple Spark versions, by linking the one we want to use
ln -s /opt/spark-2.2.1-bin-hadoop2.7  /opt/spark

pyspark-symlink


Step 3: Launch standalone Spark cluster

The standalone Spark cluster can be started manually i.e. executing the start script on each node, or simple using the available launch scripts. For testing we can run master and slave daemons on the same machine (no way recommended for production).

Option 1: Starting the Cluster manually

Start the master

While in the spark directory, of the node you need to be set as the master
./sbin/start-master.sh
Once the master has been started, you’ll see it print out the master url, for other services or slaves to connect to i.e. spark://HOST:PORT. There’s also a management web portal, by default running at http://localhost:8080 – you can also find the Spark master url here.
pyspark-master

Start the slave

For starting a slave process on the second node, while being inside the spark directory
./sbin/start-slave.sh <master-spark-URL>
pyspark-slave-start
Similarly, you can start as much slaves as you want – have the binary, and start the slaves, make them connect to the master. The new started slave worker will appear on the master’s dashboard
pyspark-slave

Option 2: Start cluster using the launch scripts

For launching the Spark standalone cluster using the launch scripts, you are required to create conf/slave file inside the spark directory, which must have hostnames of all the machines that you want to be part of the cluster i.e. intend to start a Spark worker.
Note: the master node accesses each of the worker machines via ssh. In case you don’t have the key copied to slave nodes, alternatively you can provide the passwords serially, by setting the SPARK_SSH_FOREGROUND environment variable.
If conf/slaves does not exist, the launch scripts defaults to a single machine (localhost), which is useful for testing.
Once you have the conf/slave ready, you can launch or stop your cluster by executing the following shell scripts (from spark-directory/sbin), from the master node
sbin/start-master.sh - Starts a master instance on the machine the script is executed on.
sbin/start-slaves.sh - Starts a slave instance on each machine specified in the conf/slaves file.
sbin/start-slave.sh - Starts a slave instance on the machine the script is executed on.
sbin/start-all.sh - Starts both a master and a number of slaves as described above.
sbin/stop-master.sh - Stops the master that was started via the sbin/start-master.sh script.
sbin/stop-slaves.sh - Stops all slave instances on the machines specified in the conf/slaves file.
sbin/stop-all.sh - Stops both the master and the slaves as described above.
Note: that these scripts must be executed on the machine you want to run the Spark master on, not your local machine.

Add SPARK_HOME

Finally, update your $PATH, by adding the path to spark project root as SPARK_HOME – add the following lines in your ~/.bashrc (or ~/.zshrc) file
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH

You can further configure (optional) the Spark cluster by setting various environment variables in conf/spark-env.sh file. To get started you can use the conf/spark-env.sh.template to create you env file, and copy it to all your worker machines for the settings to take effect.


Step 4: Install PySpark

PySpark Shell

Apache Spark provides a interactive Python shell out of the box, which is the Python API to access the Spark core (initializing the SparkContext). You can launch it by executing the following command – the script automatically adds the bin/pyspark package to the PYTHONPATH.
bin/PySpark
pyspark-shell
Spark Context allows the users to handle the managed spark cluster resources so that users can read, tune and configure the spark cluster. Spark Content is used to initialize the driver program but since PySpark has Spark Context available as sc, PySpark itself acts as the driver program.

PySpark from PyPI

PySpark from PyPI does not has the full Spark functionality, it works on top of an already launched Spark process, or cluster i.e. it’s provides an interface for the existing Spark cluster (standalone, or using Mesos or YARN). One main dependency of PySpark package is Py4J, which get installed automatically. I’d recommend to set up a virtualenv, and install pyspark in it.

pip install pyspark


Step 5: An example

Directly using the PySpark shell

>>> seqs = sc.textFile('file:///home/nahmed/dna-seq.txt')
>>>
>>> seqs.collect()
[u'ATATCCCCGGGAT', u'ATCGATCGATAT']
>>>
>>> ones = seqs.flatMap(lambda x : [(c, 1) for c in list(x)])
>>> ones.collect()
[(u'A', 1), (u'T', 1), (u'A', 1), (u'T', 1), (u'C', 1), (u'C', 1), (u'C', 1), (u'C', 1), (u'G', 1), (u'G', 1), (u'G', 1), (u'A', 1), (u'T', 1), (u'A', 1), (u'T', 1), (u'C', 1), (u'G', 1), (u'A', 1), (u'T', 1), (u'C', 1), (u'G', 1), (u'A', 1), (u'T', 1), (u'A', 1), (u'T', 1)]
>>>

You can execute a python script directly using the Python shell, provided out-of-the-box

./bin/spark-submit <python-file>
Create an example Python file – example-app.py
"""example-app.py"""
from pyspark import SparkContext
logFile = "/opt/spark/README.md"  # or some file on your system
sc = SparkContext("local", "Example App")logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Output:
pyspark-submit

Leave a Reply

Your email address will not be published. Required fields are marked *