Child pages
  • Using Hadoop on ROGER
Skip to end of metadata
Go to start of metadata

This documentation is outdated. Please see ROGER: The CyberGIS Supercomputer for updated documentation. 

Introduction

The Hadoop framework in ROGER is configured to run MapReduce programs using Java and Apache Pig. It also offers Hadoop Streaming API interfaces to  create and run MapReduce programs with any executable or script as the mapper and/or the reducer, e.g., Python, R or bash scripts. In addition, the Hadoop environment is configured to allow Apache Spark to interact with the Hadoop Distributed File System (HDFS), and managing Spark jobs with YARN.

The Hadoop cluster deployed in ROGER consists of 11 nodes:  

  • cg-hm08.ncsa.illinois.edu to cg-hm18.ncsa.illinois.edu

In particular, the namenode and login-node is cg-hm08. The Hadoop clusters provides multiple web interfaces to allow users keep track of nodes usage and jobs.

The two most useful web interfaces are:

  1. HDFS monitoring web interface: that includes availability of data storage and current files configuration on the HDFS. (http://cg-hm08.ncsa.illinois.edu:50070)

  2. Job monitoring web interface: that includes progress status of each job in addition to statistics regarding resource and time usage of jobs. (http://cg-hm09.ncsa.illinois.edu:8088/cluster )

ROGER-Hadoop Cluster login

To login to the Hadoop cluster in ROGER, a user needs to login to ROGER first, for example:

ssh username@roger-login.ncsa.illinois.edu

after entering password, we can login to the Hadoop login-node (i.e., cg-hm08.ncsa.illinois.edu) 

ssh cg-hm08

Data access on Hadoop

After login to the Hadoop cluster, a user can interact with the HDFS with a set of commands.

To check the version of the Hadoop software:

 hadoop version 

or the existing nodes in the cluster:

yarn node -list 


Before starting write specific MapReduce code, the following steps are necessary to make sure the settings are properly configured. The first thing to do is check existing files in HDFS:

hdfs dfs -ls PATH (e.g., /user/account_name/folder_name)
hdfs dfs -ls /user (to check existing users in the Hadoop cluster)

if it is the first time for a user to use the Hadoop cluster, creating a user folder with the same name as the cluster account name is necessary. To make a directory:

hdfs dfs -mkdir /user/account_name (for first time usage)
hdfs dfs -mkdir folder_name (it will create a folder /user/account_name/folder_name)

To copy your data from local disks to HDFS:

hdfs dfs -copyFromLocal  PATH_TO_LOCAL_FILE /user/account_name/folder_name

which is equivalent to:

hdfs dfs -copyFromLocal  PATH_TO_LOCAL_FILE folder_name

Contrary, to collect data from HDFS to local storage:

 hdfs dfs -getmerge PATH_TO_HDFS_FILE PATH_TO_LOCAL_FILE

To remove a file (or directory) from HDFS we can use:

hdfs dfs -rm(r) PATH_TO_HDFS_FILE

Furthermore, options such as replication factor and block size can be specified using -D directive.

hdfs dfs -Ddfs.replication=1 -copyFromLocal PATH_TO_LOCAL_FILE PATH_TO_HDFS_FILE

A detailed HDFS commands guide can be found from the following link:

https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html

 Writing MapReduce Programs

Interact with Hadoop using Pig script:

  1. To run a Pig script, directly type in the Linux console: pig pig_script.pig

  2. or directly enter pig console: pig

Note that the Pig script is submitted via YARN, which will be introduced later.

Writing MapReduce program using Java:

  1. A Java program

  2. It is recommended to compile the Java source code using “Ant”, where all the dependency and libraries are configured in the build.xml (see Appendix for details)

  3. A MapReduce program usually consists of three main classes: one Mapper class, one reducer class and one runner class which runs the Hadoop job. (combiner class is optional)

MapReduce with Hadoop Streaming API:

Hadoop streaming API can be utilized by any executable or script as the mapper and/or the reducer respectively, in this case, Python.

  1. Create a folder named “app”

  2. Create a python script to handle mapper, e.g., mapper.py

  3. Create a python script to handle reducer, e.g., reducer.py

  4. If third-party scripts are imported, put the source code in the same folder.

Writing Spark program

  1. Currently, Spark supports Scala (native), Python and R (in this case, we will use Python as an example)

  2. Spark can be launched directly in the local node by typing: pyspark

  3. It can also execute python script by:

    1. pyspark script.py (in the local node)

    2. spark-submit script.py --master yarn-client (as a job running in the cluster)

Submitting Jobs (via YARN)

Hadoop jobs are managed by YARN, which handles queueing jobs and assign computing resources to the corresponding job.

  • Submit Java programs:

hadoop jar dist/your_compiled_program.jar [input parameters]

 Input parameters will be then handled by the main runner class of the program. They usually include input and output address on HDFS, as well as some configuration of the job.

  • Submit MapReduce jobs with Hadoop Streaming API (Python)

If your source code only contains mapper.py and reducer.py

yarn jar /usr/hdp/2.3.2.0-2602/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2602.jar \
-D mapred.reduce.tasks=5 \
-mapper "mapper_script.py" \
-file mapper_script.py
-reducer "reducer_script.py" \
-file reducer_script.py
-input INPUT_FILE (PATH)  -output OUTPUT_FILE (PATH)

if your source code utilizes third-party libraries

yarn jar /usr/hdp/2.3.2.0-2602/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2602.jar \
-D mapred.reduce.tasks=5 \
-files app \
-mapper "app/mapper_script.py" \
-reducer "app/reducer_script.py" \
-input INPUT_FILE (PATH)  -output OUTPUT_FILE (PATH)

Note: the “-file” function will upload the specific python file to the entire cluster so that every node is aware of the file and will execute this script without problem, whereas “-files” wraps up all the data and code in the folder and upload to the cluster.

  • Submit Spark jobs

Spark provides a suite of interfaces to support read/write files in different storage, such as files in the local disk or HDFS. To take advantages of the HDFS for large files, spark jobs are recommended to submitted via YARN. However, there are several parameters to configure to tune the performances:

spark-submit --master yarn-client \
 --num-executors 18 \
--executor-cores 2 \
--executor-memory 4g \
spark_script.py

 In particular, by using “--master yarn-client”, it requires the the computing environment in each node is configured the same, otherwise there will be errors. For example, if program needs numpy packages, which is available in the namenode by: module load anaconda, the “--master” option cannot be set to “yarn-client”, because anaconda is not available in others nodes, so the command will be:

spark-submit --num-executors 18 \
--executor-cores 2 \
--executor-memory 4g \
spark_script.py

In addition to use third-party scripts or files, all these files should be specifically added in the program, which will be uploaded to the cluster and distributed to each node, for example:

sc.addPyFile("some_library.py")
sc.addFile(“some_files.txt")
sc.addFile("folder/some_file")

Also, the program need to find the path to the files by using the SparkFiles.get() function, e.g.:

mShp = SparkFiles.get("some_files.txt")
mTree = SparkFiles.get("some_file")

Appendix

An example build.xml

<?xml version="1.0" encoding="UTF-8"?>
<project name="extractGeoSpaceUser" default="main" basedir=".">
<property name="src.dir" location="src" />
<property name="classes.dir" location="classes" />
<property name="dist.dir" location="dist" />
<property name="dist.jarfile" value="aggregate.jar" />
<property name="lib.dir"  location="/usr/hdp/2.3.2.0-2602/hadoop"/>
<property name="my.lib" location="lib"/>
<path id="mylibs">
       <fileset dir="${lib.dir}">
               <include name="lib/**/**.jar"/>
               <include name="**/**.jar"/>
       </fileset>
       <fileset dir="${my.lib}"><include name="**/**.jar"/></fileset>
</path>

<target name="clean" description="Clean up output directories.">
       <delete dir="${classes.dir}" />
</target>

<target name="makedir">
       <mkdir dir="${classes.dir}" />
       <mkdir dir="${dist.dir}" />
</target>

<target name="compile" depends="clean, makedir" description="Compile all sources.">  
       <property name="myclasspath" refid="mylibs"/>
       <echo message="Classpath = ${myclasspath}"/>
       <javac srcdir="${src.dir}" destdir="${classes.dir}" classpathref="mylibs" debug="on"/>
</target>

<target name="jar" depends="compile" description="Creates the binary distribution.">
       <jar basedir="${classes.dir}" destfile="${dist.dir}/${dist.jarfile}" >
               <zipgroupfileset dir="${my.lib}" includes="**/*.jar"/>
               <manifest>
                       <attribute name="Main-Class" value="runhadoop.RunHadoop" />
               </manifest>
       </jar>
</target>

<target name="main" depends="compile, jar">
       <description>Main target</description>
</target>
</project>

Spark parameters directly set in programs

import pyspark
from pyspark.context import SparkContext
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
conf = SparkConf().setMaster("yarn-client").setAppName("flow_generator").set("spark.executor.memory", "4g").set("spark.executor.instances", 50)
sc = SparkContext(conf = conf)

  • No labels