RHIPE Tutorial


Overview

The setting has three components: remote computer, one or more Unix R-session servers, and a Unix Hadoop cluster. The second two components are running R and RHIPE. You work on the remote computer, say your laptop, and login to an R-session server. This is home base, where you do all of your programming of R and RHIPE R commands. The R commands you write for division, application of analytic methods, and recombination that are destined for Hadoop on the cluster are passed along by RHIPE R commands.

The remote computer is typically for you to maintain. The R-session servers require IT staff to help install software, configure, and maintain. However you install packages too on the R-session servers, just you do when you want to use an R CRAN package in R. There is an extra task though; you want packages you install to be pushed up the Hadoop cluster so they can be used there too. Except for this push by you, the Hadoop cluster is the domain of the systems administrators who must, among other tasks, install Hadoop.

The R-Session Server and RStudio

Now the R-session server can be separate from the Hadoop cluster, handling only R sessions, or it can be one of the servers on the Hadoop cluster. If it is on the Hadoop cluster, there must be some precautions taken in the Hadoop configuration to protect the programming of the R session. This is needed because the RHIPE Hadoop jobs compete with the R sessions. There are never full guarantees though, so "safe mode" is separate R session servers. The last thing you want is for R sessions to get bogged down. If the cluster option is chosen, then you want to mount a file server on the cluster that contains the files associated with the R session such as .RData and files read into to R or written by R.

There is a vast segment of the R community that uses RStudio, for good reason. RStudio can join the setting. You have RStudio server installed on the R-session servers by system administrators. A web browser on the R server runs the RStudio interface which is accessed by you on your remote device via the remote login.

The Remote Computer

The remote computer is just a communication device, and does not carry out data analysis, so it can run any operating system, such as Windows. This is especially important for teaching, since Windows labs are typically very plentiful at academic institutions, but Unix labs much less so. Whatever the operating system, a common communication protocol that is used is the SSH protocol. SSH is typically used to log into a remote machine and execute commands or to transfer files. But a critical capability of it for our purposes here is that it supports both your R session command-line window, showing both input and output, and a separate window to show graphics.

Where Are the Data Analyzed

Obviously, much data analysis is carried out by Hadoop on the Hadoop cluster. Your R commands are given to RHIPE, passed along to Hadoop, and the outputs are written by Hadoop to the HDFS.

But in many analyses of larger and more complex data, it is common to have (1) outputs of a recombination method that constitute a relatively small dataset, and (2) the outputs are further analyzed as part of the overall analysis. If they are small enough to be readily analyzed in your R session, then for sure that is where you want to be. RHIPE commands allow you to write the recombination outputs from the HDFS to the R global environment of your R session. They become a dataset in .RData. While programming R and RHIPE is easy, it is not as easy as plain old serial R. The point is that a lot of data analysis can be carried out in just R even when the data are large and complex.

A Few Basic Hadoop Features

The two principal computational operations of Hadoop are Map and Reduce. The first runs parallel computations on subsets without communication among them. The second can compute across subset outputs. So Map carries out the analytic method computation. Reduce takes the outputs from Map and runs the recombination computation. A division is typically carried out both by Map and Reduce, sometimes each used several times, and can occur as part of the reading of the data into R at the start of the analysis.

Usage of Map and Reduce involves the critical Hadoop element of key-value pairs. We give one instance here. The Map operation, instructed by the analyst R code, puts a key on each subset output. This forms a key-value pair with the output as the value. Each output can have a unique key, or each key can be given to many outputs, or all outputs can have the same key. When Reduce is given the Map outputs, it assembles the key-value pairs by key, which forms groups, and then the R recombination code is applied to the values of each group independently; so the running of the code on the different groups is embarrassingly parallel. This framework provides substantial flexibility for the recombination method.

Hadoop attempts to optimize computation in a number of ways. One example is Map. Typically, there are vastly more subsets than cores on the cluster. When Map finishes the application of the analytic method to a subset on a core, Hadoop seeks to assign a subset on the same node as the core to avoid transmission of the subset across the network connecting the nodes, which is more time consuming.

Background

You will likely want to install packages on your R session server, for example, R CRAN packages. And you want these packages to run on the Hadoop cluster as well. The mechanism for doing this is much like what you have been using for packages in R, but adds a push of the packages to the cluster nodes since you will want to use them there too. It is all quite simple.

Standard R practice for a server with many R users is for a system administrator to install R for use by all. However, you can override this by installing your own version. It makes sense to follow this practice in this setting too, and have the systems administrators install R and RHIPE on the R session server and the Hadoop cluster. (The RHIPE installation manual for system administrators is available in these pages in the QuickStart section.) But you can override this and install your own RHIPE and R, and push them to the cluster along with any other packages you installed. You do need to be careful to check versions of R, RHIPE, and Hadoop for compatibility. The Tessera GitHub site has this information.

Now suppose you are using RMR on the Amazon cloud or Vagrant, both discussed in our QuickStart section. Then installation of R and RHIPE on the R session server and the push to the cluster has been taken care of for you. But if you want to install R CRAN packages or packages from other sources you will need to understand the installation mechanism.

There are some other installation matters that are the sole domain of the system administrator. Obviously linux and Hadoop are. But also protocol buffers must be installed on the Hadoop cluster to enable RHIPE communication. In addition, if you want to use RStudio on the R session server, the system administrator will need to install RStudio server on the R session server. Now there is one caution here for both users and system administrators to consider. You are best served if the linux versions you run are the same on the R server and cluster nodes, and also if the hardware is the same. The first is more critical, but the second is a nice bonus. Part of the reason is that Java plays a critical roll in RHIPE, and Java likes homogeneity.

Install and Push

To install Rhipe on the R session server, you first download the package file from within R

system("wget http://ml.stat.purdue.edu/rhipebin/Rhipe_0.74.0.tar.gz")

This puts the package in your R session directory. There are other versions of Rhipe. You will need to go to Github to find out about them. To install the package on your R session server, run

install.packages("testthat")
install.packages("rJava")
install.packages("Rhipe_0.74.0.tar.gz", repos=NULL, type="source")

The first two R CRAN packages are used only for RHIPE installation. You do not need them again until you reinstall. RHIPE is now installed. Each time you startup an R session and you wantRHIPE to be available, you run

library(Rhipe)
rhinit()

Next, you push all the R packages you have installed on the R session server, including RHIPE onto the cluster HDFS. First, you need the system administrator to configure the HDFS so you can do both this and other analysis tasks where you need to write to the HDFS. You need to have a directory on the HDFS where you have write permission. A common convention is for the administrator is to set up for you the directory /yourloginname using your login name, and do the same thing for other users. We will assume that has happened.

Suppose in /yourloginname you want to create a directory bin on the HDFS where you will push your installations on the R session server. You can do this and carry out the push by

rhmkdir("/yourloginname/bin")
hdfs.setwd("/yourloginname/bin")
bashRhipeArchive("R.Pkg")

rhmkdir() creates your directory bin in the directory yourloginname. hdfs.setwd() declares /yourloginname/bin to be the directory with your choice of installations. bashRhipeArchive() creates the actual archive of your installations and names it as R.Pkg.

Each time your R code will require the installations on the HDFS, you must in your R session run

library(Rhipe) rhinit()
rhoptions(zips = "/yourloginname/bin/R.Pkg.tar.gz")
rhoptions(runner = "sh ./R.Pkg/library/Rhipe/bin/RhipeMapReduce.sh")

The Data

The housing data consist of 7 monthly variables on housing sales from Oct 2008 to Mar 2014, which is 66 months. The measurements are for 2883 counties in 48 U.S. states, excluding Hawaii and Alaska, and also for the District of Columbia which we treat as a state with one county. The data were derived from sales of housing units from Quandl's Zillow Housing Data (www.quandl.com/c/housing). A housing unit is a house, an apartment, a mobile home, a group of rooms, or a single room that is occupied or intended to be occupied as a separate living quarter.

The variables are

  • FIPS: FIPS county code, an unique identifier for each U.S. county
  • county: county name
  • state: state abbreviation
  • date: time of sale measured in months, from 1 to 66
  • units: number of units sold
  • listing: monthly median listing price (dollars per square foot)
  • selling: monthly median selling price (dollars per square foot)

Many observations of the last three variables are missing: units 68%, listing 7%, and selling 68%.

The number of measurements (including missing), is 7 x 66 x 2883 = 1,331,946. So this is in fact a small dataset that could be analyzed in the standard serial R. However, we can use them to illustrate how RHIPE R commands implement Divide and Recombine. We simply pretend the data are large and complex, break into subsets, and continuing on with D&R. The small size let's you easily pick up the data, follow along using the R commands in the tutorial, and explore RHIPE yourself with other RHIPE R commands.

"housing.txt" is available in our Tesseradata Github repository of the RHIPE documentation here. The file is a table with 190,278 rows (66 months x 2883 counties) and 7 columns (the variables). The fields in each row are separated by a comma, and there are no headers in the first line. Here are the first few lines of the file:

01001,Autauga,AL,1,27,96.616541353383,99.1324
01001,Autauga,AL,2,28,96.856993190152,95.8209
01001,Autauga,AL,3,16,98.055555555556,96.3528
01001,Autauga,AL,4,23,97.747480735033,95.2189
01001,Autauga,AL,5,22,97.747480735033,92.7127

Write housing.txt to the HDFS

To get started, we need to make housing.txt available as a text file within the HDFS file system. This puts it in a place where it can be read into R, form subsets, and write the subsets to the HDFS. This is similar to what we do using R in the standard serial way; if we have a text file to read into R, we move put it in a place where we can read it into R, for example, in the working directory of the R session.

To set this up, the system administrator must do two tasks. On the R session server, set up a login directory where you have write permission; let's call it yourloginname in, say, /home. In the HDFS, the administrator does a similar thing, creates, say, /yourloginname which is in the root directory.

Your first step, as for the standard R case, is to copy housing.txt to a directory on the R-session server where your R session is running. Suppose in your login directory you have created a directory housing for your analysis of the housing data. You can copy housing.txt to

"/home/yourusername/housing/"

The next step is to get housing.txt onto the HDFS as a text file, so we can read it into R on the cluster. There are Hadoop commands that could be used directly to copy the file, but our promise to you is that you never need to use Hadoop commands. There is a RHIPE function, rhput() that will do it for you.

rhput("/home/yourloginname/housing/housing.txt", "/yourloginname/housing/housing.txt")

The rhput() function takes two arguments. The first is the path name of the R server file to be copied. The second argument is the path name HDFS where the file will be written. Note that for the HDFS, in the directory /yourloginname there is a directory housing. You might have created housing already with the command

rhmkdir(/yourloginname/housing)

If not, then rhput() creates the directory for you.

We can confirm that the housing data text file has been written to the HDFS with the rhexists() function.

rhexists("/yourloginname/housing/housing.txt")
[1] TRUE

We can use rhls() to get more information about files on the HDFS. It is similar to the Unix command ls. For example, rhls("/yourloginname/housing")

permission owner group size modtime file 1 -rw-rw-rw- yourloginname supergroup 7.683 mb 2014-09-17 11:11 /yourloginname/housing/housing.txt

Read and Divide by County

Our division method for the housing data will be to divide by county, so there will be 2883 subsets. Each subset will be a data.frame object with 4 column variables: date, units, listing, and selling. FIPS, state, and county are not column variables because each has only one value for each county; their values are added to the data.frame as attributes.

The first step is to read each line of the file house.txt into R. By convention, RHIPE takes each line of a text file to be a key-value pair. The line number is the key. The value is the data for the line, in our case the 7 observations of the 7 variables of the data for one month and one county.

Each line is read as part of Map R code written by the user. The Map input key-value pairs are the above line key-value pairs. Each line also has a Map output key-value pair. The key identifies the county. FIPS could have been enough to do this, but it is specified as a character vector with three elements: the 3-vector values of FIPS, state, and county. This is done so that later all three can be added to the subset data.frame. The output value for each output key is the observations of date, units, listing, and selling from the line for that key.

The Map output key-value pairs are the input key-value pairs for the Reduce R code written by the user. Reduce assembles these into groups by key, that is, the county. Then the Reduce R code is applied to the output values of each group collectively to create the subset data.frame object for each county. Each row is the value of one Reduce input key-value pair: observations of date, units, listing, and selling for one housing unit. FIPS, state, and county are added to the data.frame as attributes. Finally, Reduce writes each subset data.frame object to a directory in the HDFS specified by the user. The subsets are written as Reduce output key-value pairs. The output keys are the the values of FIPS. The output values are the county data.frame objects.

The RHIPE Manager: rhwatch()

We begin with the RHIPE R function rhwatch(). It runs the R code you write to specify Map and Reduce operations, takes your specification of input and output files, and manages key-value pairs for you.

The code for the county division is

mr1 <- rhwatch(
  map      = map1,
  reduce   = reduce1,
  input    = rhfmt("/yourloginname/housing/housing.txt", type = "text"),
  output   = rhfmt("/yourloginname/housing/byCounty", type = "sequence"),
  readback = FALSE
)

Arguments map and reduce take your Map and Reduce R code, which will be described below. input specifies the input to be the text file in the HDFS that we put there earlier using rhput(). The file supplies input key-value pairs for the Map code. output specifies the file name into which final output key-value pairs of the Reduce code that are written to the HDFS. rhwatch() creates this file if it does not exist, or overwrites it if it does not.

In our division by county here, the Reduce recombination outputs are the 2883 county data.frame R objects. They are a list object that describes the key-value pairs: FIPS key and data.frame value. There is one list element per pair; that element is itself a list with two elements, the FIPS key and then the data.frame value.

The Reduce list output can also be written to the R global environment of the R session. One use of this is analytic recombination in the R session when the outputs are a small enough dataset. You can do this with the argument readback. If TRUE, the list is also written to the global environment. If FALSE, it is not. If FALSE, it can be written latter using the RHIPE R function rhread().

countySubsets <- rhread("/yourloginname/housing/byCounty")

Suppose you just want to look over the byCounty file on the HDFS just to see if all is well, but that this can be done by looking at a small number of key-value pairs, say 10. The code for this is

countySubsets <- rhread("/yourloginname/housing/byCounty", max = 10)
Read 10 objects(31.39 KB) in 0.04 seconds

Then you can look at the list of length 10 in various was such as

keys <- unlist(lapply(countySubsets, "[[", 1))
keys
 [1] "01013" "01031" "01059" "01077" "01095" "01103" "01121" "04001" "05019" "05037"
attributes(countySubsets[[1]][[2]])
$names
[1] "date"             "units"            "listing"             "selling"

$row.names
 [1]  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
[33] 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 
[65] 65 66

$state
[1] "AL"

$FIPS
[1] "01013"

$county
[1] "Butler"

$class
[1] "data.frame"

Map R Code

The Map R code for the county division is

map1 <- expression({
  lapply(seq_along(map.keys), function(r) {
    line = strsplit(map.values[[r]], ",")[[1]]
    outputkey <- line[1:3]
    outputvalue <- data.frame(
      date = as.numeric(line[4]),
      units =  as.numeric(line[5]),
      listing = as.numeric(line[6]),
      selling = as.numeric(line[7]),
      stringsAsFactors = FALSE
    )
  rhcollect(outputkey, outputvalue)
  })
})

Map has input key-value pairs, and output key-value pairs. Each pair has an identifier, the key, and numeric-categorical information, the value. The Map R code is applied to each input key-value pair, producing one output key-value pair. Each application of the Map code to a key-value pair is carried out by a mapper, and there are many mappers running in parallel without communication (embarrassingly parallel) until the Map job completes.

RHIPE creates input key-value pair list objects, map.keys and map.values, based on information that it has. Let r be an integer from 1 to the number of input key-value pairs. map.values[[r]] is the value for key map.keys[[r]]. The housing data inputs come from a text file in the HDFS, housing.txt, By RHIPE convention, for a text file, each Map input key is a text file line number, and the corresponding Map input value is the observations in the line, read into R as a single text string. In our case each line value is the observations of the 7 county variables for the line.

This Map code is really a for loop with r as the looping variable, but is done by lapply() because it is in general faster than for r in 1:length(map.keys). The loop proceeds through the input keys, specified by the first argument of lapply. The second argument of the above lapply defines the Map expression with the argument r, an index for the Map keys and values.

The function strsplit() splits each character-string line input value into the individual observations of the text line. The result, line, is a list of length one whose element is a character vector whose elements are the line observations. In our case, the observations are a character vector of length 7, in order: FIPS, county, state, date, units, listing, selling.

Next we turn to the Map output key-value pairs. outputkey for each text line is a character vector of length 3 with FIPS, county, and state. outputvalue is a data.frame with one row and 4 columns, the observations of date, units, listing, and selling, each a numeric object.

The argument of data.frame, stringsAsFactors, is is given the value FALSE. This leaves character vectors in the data.frame as is, and does on convert to a factor.

The RHIPE function rhcollect() forms a Map output key-value pair for each line, and writes the results to the HDFS as a key-value pair list object.

Reduce R Code

The Reduce R code for the county division is

reduce1 <- expression(
  pre = {
    reduceoutputvalue <- data.frame()
  },
  reduce = {
    reduceoutputvalue <- rbind(reduceoutputvalue, do.call(rbind, reduce.values))
  },
  post = {
    reduceoutputkey <- reduce.key[1]
    attr(reduceoutputvalue, "location") <- reduce.key[1:3]
    names(attr(reduceoutputvalue, "location")) <- c("FIPS","county","state")
    rhcollect(reduceoutputkey, reduceoutputvalue)
  }
)

The output key-value pairs of Map are the input key-value pairs to Reduce. The first task of Reduce is to group its input key-value pairs by unique key. The Reduce R code is applied to the key-value pairs of each group by a reducer. The number of groups varies in applications from just one, with a single Reduce output, to many. For multiple groups, the reducers run in parallel, without communication, until the Reduce job completes.

RHIPE creates two list objects reduce.key and reduce.values. Each element of reduce.key is the key for one group, and the corresponding element of reduce.values has the values for the group to which the Reduce code is applied.. Now in our case, the key is county and the values are the observations of date, units, listing, and selling for the all housing units in the county.

Note the Reduce code has a certain structure: expressions pre, reduce, and post. In our case pre initializes reduceoutputvalue to a data.frame(). reduce assembles the county data.frame as the reducer receives the values through rbind(reduceoutputvalue, do.call(rbind, reduce.values)); this uses rbind() to add rows to the data.frame object. post operates further on the result of reduce. In our case it first assigns the observation of FIPS as the key. Then it adds FIPS,county, and state as attributes. Finally the RHIPE function rhcollect() forms a Reduce output key-value pair list, and writes it to the HDFS.

Compute County Min, Median, Max

With the county division subsets now in the HDFS we will illustrate using them to carry out D&R with a very simple recombination procedure based on a summary statistic for each county of the variable listing. We do this for simplicity of explanation of how RHIPE works. However, we emphasize that in practice, initial analysis would almost always involve comprehensive analysis of both the detailed data for all subset variables and summary statistics based on the detailed data.

Our summary statistic consists of the minimum, median, and maximum of listing, one summary for each county. Map R code computes the statistic. The output key of Map, and therefore the input key for Reduce is state. The Reduce R code creates a data.frame for each state where the columns are FIPS, county, min, median, and max. So our example illustrates a scenario where we create summary statistics, and then analyze the results. This is an analytic recombination. In addition, we suppose that in this scenario the summary statistic dataset is small enough to analyze in the standard serial R. This is not uncommon in practice even when the raw data are very large and complex.

The RHIPE Manager: rhwatch()

Here is the code for rhwatch().

CountyStats <- rhwatch(
  map      = map2,
  reduce   = reduce2,
  input    = rhfmt("/yourloginname/housing/byCounty", type = "sequence"),
  output   = rhfmt("/yourloginname/housing/CountyStats", type = "sequence"),
  readback = TRUE
)

Our Map and Reduce code, map2 and reduce2, is given to the arguments map and reduce. The code will be will be discussed later.

The input key-value pairs for Map, given to the argument input, are our county subsets which were written to the HDFS directory /yourloginname/housing as the key-value pairs list object byCounty. The final output key-value pairs for Reduce, specified by the argument output, will be written to the list object CountyStats in the same directory as the subsets. The keys are the states, and the values are the data.frame objects for the states.

The argument readback is given the value TRUE, which means CountyStats is also written to the R global environment of the R session. We do this because our scenario is that analytic recombination is done in R.

The argument mapred.reduce.tasks is given the value 10, as in our use of it to create the county subsets.

The Map R Code

The Map R code is

map2 <- expression({
  lapply(seq_along(map.keys), function(r) {
    outputvalue <- data.frame(
      FIPS = map.keys[[r]],
      county = attr(map.values[[r]], "location")["county"],
      min = min(map.values[[r]]$listing, na.rm = TRUE),
      median = median(map.values[[r]]$listing, na.rm = TRUE),
      max = max(map.values[[r]]$listing, na.rm = TRUE),
      stringsAsFactors = FALSE
    )
    outputkey <- attr(map.values[[r]], "location")["state"]
    rhcollect(outputkey, outputvalue)
  })
})

map.keys is the Map input keys, the county subset identifiers FIPS. map.values is the Map input values, the county subset data.frame objects. The lapply() loop goes through all subsets, and the looping variable is r. Each stage of the loop creates one output key-value pair, outputkey and outputvalue. outputkey is the observation of state. outputvalue is a data.frame with one row that has the variables FIPS, county, min, median, and max for county FIPS. rhcollect(outputkey, outputvalue) emits the pairs to reducers, becoming the Reduce input key-value pairs.

The Reduce R Code

The Reduce R code for the listing summary statistic is

reduce2 <- expression(
  pre = {
    reduceoutputvalue <- data.frame()
  },
  reduce = {
    reduceoutputvalue <- rbind(reduceoutputvalue, do.call(rbind, reduce.values))
  },
  post = {
    rhcollect(reduce.key, reduceoutputvalue)
  }
)

The first task of Reduce is to group its input key-value pairs by unique key, in this case by state. The Reduce R code is applied to the key-value pairs of each group by a reducer.

Expression pre, initializes reduceoutputvalue to a data.frame(). reduce assembles the state data.frame as the reducer receives the values through rbind(reduceoutputvalue, do.call(rbind, reduce.values)); this uses rbind() to add rows to the data.frame object. post operates further on the result of reduce; rhcollect() forms a Reduce output key-value pair for each state. RHIPE then writes the Reduce output key-value pairs to the HDFS.

Recall that we told RHIPE in rhwatch() to also write the Reduce output to CountyStats in both the R server global environment. There, we can have a look at the results to make sure all is well. We can look at a summary

str(CountyStats)
List of 49
 $ :List of 2
  ..$ : Named chr "AL"
  .. ..- attr(*, "names")= chr "state"
  ..$ :'data.frame':    64 obs. of  5 variables:
  .. ..$ FIPS  : chr [1:64] "01055" "01053" "01051" "01049" ...
  .. ..$ county: chr [1:64] "Etowah" "Escambia" "Elmore" "DeKalb" ...
  .. ..$ min   : num [1:64] 62.1 60.4 94.7 59.2 41.2 ...
  .. ..$ median: num [1:64] 67.6 66.2 99.2 71.9 50.6 ...
  .. ..$ max   : num [1:64] 77.8 79.8 102.2 82.3 60.4 ...
 $ :List of 2
  ..$ : Named chr "AR"
  .. ..- attr(*, "names")= chr "state"
  ..$ :'data.frame':    71 obs. of  5 variables:
  .. ..$ FIPS  : chr [1:71] "05025" "05023" "05021" "05019" ...
  .. ..$ county: chr [1:71] "Cleveland" "Cleburne" "Clay" "Clark" ...
  .. ..$ min   : num [1:71] 46.2 99.9 28.1 61.6 58.5 ...
  .. ..$ median: num [1:71] 60.2 108.2 38.7 67.3 82.1 ...
  .. ..$ max   : num [1:71] 73.5 125 48.8 72.7 117.4 ...
......

We can look at the first key-value pair

CountyStats[[1]][[1]]
[[1]]
state 
 "AL" 

We can look at the data.frame for state "AL"

head(CountyStats[[1]][[2]])
    FIPS     county       min    median       max
1  01055     Etowah  62.07526  67.64964  77.80488
2  01053   Escambia  60.44186  66.23173  79.83193
3  01051     Elmore  94.66667  99.20582 102.23077
4  01049     DeKalb  59.20484  71.89464  82.32628
5  01047     Dallas  41.20072  50.60164  60.37621
6  01045       Dale  65.04065  73.40946  81.80147