RHIPE: R function reference


Package Information

R and Hadoop Integrated Programming Environment

Author: Saptarshi Guha

Version: 0.74.0

Date: 2014-02-19

License:

Description

RHIPE is a merger of R and Hadoop. R is the widely used, highly acclaimed interactive language and environment for data analysis. Hadoop consists of the Hadoop Distributed File System (HDFS) and the MapReduce distributed compute engine. RHIPE allows an analyst to carry out D&R analysis of complex big data wholly from within R. RHIPE communicates with Hadoop to carry out the big, parallel computations.

rhchmod

Change Permissions of Files on HDFS

Usage

rhchmod(path, permissions)

Arguments

path
is the location of the directory to be created. It is expanded using rhabsolute.hdfs.path

permissions
a character string such as '777'

Description

Change permissions of files on HDFS

Author

Ryan Hafen

See also

rhwrite, rhmkdir

rhcp

Copy files (or folders) on the HDFS

Usage

rhcp(ifile, ofile, delete = FALSE)

Arguments

ifile
Absolute path to be copied on the HDFS or the output from rhwatch(.., read=FALSE).

ofile
Absolute path to place the copies on the HDFS.

delete
should we delete `ifile when done?

Description

Copies the file (or folder) src on the HDFS to the destination dest also on the HDFS.

Author

Saptarshi Guha

See also

rhget, rhput, rhdel, rhread, rhwrite, rhsave

rhdel

HDFS File Deletion

Usage

rhdel(folder)

Arguments

folder
The absolute path on the hdfs to the directory(s) to be deleted.

Description
This function deletes the folders contained in the character vector folders which are located on the HDFS. The deletion is recursive, so all subfolders will be deleted too. Nothing is returned.

Author

Saptarshi Guha

See also

rhput, rhls, rhread, rhwrite, rhsave, rhget

rherrors

Retrieves errors from a job when debug=='collect' in rhwatch

Usage

rherrors(job, prefix = "rhipe_debug", num.file = 1)

Arguments

job
Is the result of rhwatch (when read = FALSE)

prefix
Is the prefix string of the debug files

num.file
Is the number of debug files to read

Description

Retrieves errors from a job when debug=='collect' in rhwatch

Author

Saptarshi Guha

rhex

Execute a MapReduce Job On Hadoop

Usage

rhex(conf, async = TRUE, mapred, ...)

Arguments

conf
conf is a list returned from rhwatch describing the MapReduce job.

async
When async is TRUE, the function returns immediately, leaving the job running in the background on Hadoop.

mapred
See Details.

...
Additional parameters for system call.

Value

When async is TRUE, function returns an object of class jobtoken.

Description

Submits a MapReduce job (created using rhwatch) to the Hadoop MapReduce framework. The argument mapred serves the same purpose as the mapred argument to rhwatch. This will override the settings in the object returned from rhwatch. The function returns when the job ends (success/failure or because the user terminated (see rhkill)). When async is TRUE, the function returns immediately, leaving the job running in the background on Hadoop.

Details

When async is TRUE, function returns an object of class jobtoken. The generic function print.jobtoken, displays the start time, duration (in seconds) and percent progress. This object can be used in calls to rhstatus,rhjoin and rhkill. Otherwise is returns a list of counters and the job state.

Author

Saptarshi Guha

See also

rhwatch, rhstatus, rhkill

rhexists

Check if File Exists on HDFS

Usage

rhexists(path)

Arguments

path
Is the location of the file to be checked. It is expanded using rhabsolute.hdfs.path

Description

Check if file exists on HDFS

Author

Ryan Hafen

See also

rhls, rhwrite, rhmkdir

rhfmt

A function that returns a function to specify input/output formats

Usage

rhfmt(type, ...)

Arguments

type
The name of the function handler

...
Arguments passed to the function

Description

Returns a function to spec out the input output formats

Details

The function returned must take 3 arguments 'lines', direction(input or output), call signature see rhoptions()$ioformats for examples on how to write your own.

rhget

Copying from the HDFS, moving files from the HDFS to a local directory.

Usage

rhget(src, dest)

Arguments

src
Absolute path to file or directory on HDFS to get.

dest
Path to file or directory on local filesystem to create as the new copy.

Description

Copies the files (or folder) at src, located on the HDFS to the destination dest located on the local filesystem. If a file or folder of the same name as dest exists on the local filesystem, it will be deleted. The dest can contain ~ which will be expanded. The original copy of the file or folder is left on the HDFS after this operation.

Author

Saptarshi Guha

See also

rhput, rhdel, rhread, rhwrite, rhsave

rhinit

Initializes the RHIPE subsystem

Usage

rhinit()

Description

Initializes the RHIPE subsystem

rhIterator

Iterates Through the Records of Sequence Files

Usage

rhIterator(files, type = "sequence", chunksize = 1000, chunk = "records",
  skip = rhoptions()$file.types.remove.regex, mc = lapply,
  textual = FALSE)
}

Arguments

files
Path to file or directory containing sequence files. This can also be the output from rhwatch(read = FALSE).

type
Is it 'sequence' or 'map'. Ideally should be auto-determined.

chunksize
Number of records or bytes to read. Depends on 'chunk'.

chunk
either "records" or "bytes"

skip
Files to skip while reading the hdfs. Various installs of Hadoop add additional log info to HDFS output from MapReduce. Attempting to read these files is not what we want to do. To get around this we specify pieces of filenames to grep and remove from the read. skip is a vector argument just to have sensible defaults for a number of different systems. You can learn which if any files need to be skipped by using rhls on the target directory.

mc
Set to lapply by default. User can change this to mclapply for parallel lapply.

textual
If the keys and values are hadoop Text objects

Description

Can be used to iterate through the records of a Sequence File (or collection thereof)

rhJobInfo

Very Detailed Job Information

Usage

rhJobInfo(job)

Arguments

job
The parameter job can either be a string with the format job_datetime_id (e.g. job_201007281701_0274) or the value returned from rhex with the async option set to TRUE.

Description

Returns detailed job information across all TaskIDs

rhkill

Kill A MapReduce Job

Usage

rhkill(job)

Arguments

job
The parameter job can either be string with the format job_datetime_id or the value returned from rhex with the async option set to TRUE.

Description
This kills the MapReduce job with job identifier given by job. The parameter job can either be string with the format job_datetime_id or the value returned from rhex with the async option set to TRUE.

Author

Saptarshi Guha

See also

rhstatus, rhwatch, rhjoin, rhex

rhls

List Files On HDFS

Usage

rhls(folder = NULL, recurse = FALSE, nice = "h")

Arguments

folder
Path of directory on HDFS or output from rhwatch(readback=FALSE)

recurse
If TRUE list all files and directories in sub-directories.

nice
One of 'g','m','b' or 'h' (gigabytes, megabytes, bytes, human readable).

Value

vector of file and directory names

Description

List all files and directories contained in a directory on the HDFS.

Details

Returns a data.frame of filesystem information for the files located at path. If recurse is TRUE, rhls will recursively travel the directory tree rooted at path. The returned object is a data.frame consisting of the columns: permission, owner, group, size (which is numeric), modification time, and the file name. path may optionally end in '*' which is the wildcard and will match any character(s).

Author

Saptarshi Guha

See also

rhput, rhdel, rhread, rhwrite, rhsave, rhget

rhload

Load an RData from the HDFS.

Usage

rhload(file, envir = parent.frame())

Arguments

file
Path to the .RData file on the HDFS.

envir
Environment in which to load the .RData file.

value data from HDFS

Description

Calls the function load after fetching an RData file from the HDFS.

Author

Saptarshi Guha

See also

rhsave, rhsaveimage

Details

This code, will chunk a data frame(or matrix) or list into sub objects, defined by chunks and then written to the HDFS across numfiles files. Thus if chunks is 10, and numfiles is 20, then a data frame is divided into sub data frames of rows 10 each and written across 20 files. In order to improve the R-Java switch, this is buffered, the buffer size defined by passByte(bytes).

rhmap

Macro to Wrap Boilerplate Around RHIPE Map code}

Usage

rhmap(expr = NULL, before = NULL, after = NULL)

Arguments

expr
Any R expression, that operates on current map.keys, map.values and current index (given by k,r, and .index respectively).

before
An R expression to run before the loop across map.values,map.keys and .index. If map .values is shortened, make map.keys the same length!

after
An R expression to run after the loop. The results of the loop is contained in result

Description

Returns an expression corresponding to given input

See also

rhwatch

rhmapfile

Creates a Handle to a Mapfile

Usage

rhmapfile(paths)

Arguments

paths
Absolute path to map file on HDFS or the output from rhwatch.

Description

Creates a Handle to a Mapfile

rhmkdir

Creates Directories on the HDFS

Usage

rhmkdir(path, permissions)

Arguments

path
Is the location of the directory to be created. It is expanded using rhabsolute.hdfs.path

permissions
either of the integer form e.g. 777 or string e.g. rwx. If missing the default is used.

value Logical TRUE if success

Description

Equivalent of mkdir -p in Unix

Author

Saptarshi Guha

See also

rhcp, rhmv,

rhmv

Move files (or folders) on the HDFS (delete original)

Usage

rhmv(ifile, ofile)

Arguments

ifile
Absolute path to be copied on the HDFS or the output from rhwatch(.., read=FALSE).

ofile
Absolute path to place the copies on the HDFS.

Description

Copies the file (or folder) src on the HDFS to the destination dest also on the HDFS.

Author

Saptarshi Guha

See also

rhget, rhput, rhdel, rhread, rhwrite, rhsave

rhofolder

Returns the output folder from a previous RHIPE job

Usage

rhofolder(job)

Arguments

job Can be a character indicating the string to a folder, or the result of call to rhwatch. For the latter, this works only read is FALSE (because if read is TRUE, the output is returned)

Description

Returns the output folder from a previous RHIPE job Take a look at hdfs.getwd for more information.

Author

Saptarshi Guha

rhoptions

Get or Set Rhipe Options

Usage

rhoptions(li = NULL, ...)

Arguments

li
A list of options. Names of elements indicate the option being modified. Values of elements indicate the value the option is to take on. If NULL then a list of current rhoptions is returned.
See details.

...
Options maybe assigned values as key=value. See examples.

value rhoptions() returns a list of current options.

Description

Used to set Rhipe options (called by rhinit for example). Most often called by a user to set a runner that starts the embedded R Rhipe executable.

Details

One may set the following options specific to Rhipe and many options specific to mapred (not listed here but present in the Hadoop documentation).

Note

Default values can be observed by simply typing rhoptions()

This is the launcher for the embedded R Rhipe executable used in MapReduce jobs. Often other options are easiest to set in the mapred argument of rhwatch.

Author

Saptarshi Guha

See also

rhinit

rhput

Put a file unto the HDFS

Usage

rhput(src, dest, deletedest = TRUE)

Arguments

src
Path to the local file to be copied to the HDFS.

dest
Path to the file on the HDFS. rhput creates the file at dest.

deletedest
If TRUE this function attempts to delete the destination of the HDFS before trying to copy to that location on the HDFS.

Description

Copies the local file called src (not a folder) to the destination dest on the HDFS. Uses path.expand to expand the src parameter.

Note

Local filesystem copy remains after the operation is complete.

Author

Saptarshi Guha

See also

rhget, rhdel, rhread, rhwrite, rhsave

rhread

Read Key/Value Pairs From The HDFS.

Usage

rhread(files, type = c("sequence"), max = -1L,
  skip = rhoptions()$file.types.remove.regex, mc = lapply,
  textual = FALSE, verbose = TRUE, ...)

Arguments

files
Path to file or directory containing map, sequence, or text file to be read on the HDFS. This can also be the output from rhwatch provided readback=FALSE.

type
Type of file on HDFS. Must be sequence, map, or text.

max
Maximum number of key/value pairs to read for map and sequence files. Maximum number of lines to read for text files.

mc
Set to lapply by default. User can change this to mclapply for parallel lapply.

textual
If the keys and values are hadoop Text objects

skip
Files to skip while reading the HDFS. Various installs of Hadoop add additional log info to HDFS output from MapReduce. Attempting to read these files is not what we want to do in rhread. To get around this we specify pieces of filenames to grep and remove from the read. skip is a vector argument just to have sensible defaults for a number of different systems. You can learn which if any files need to be skipped by using rhls on target directory.

verbose
Logical - print messages about what is being done.

...
Additional arguments passed to the function

Value

For map and sequence files, a list of key, pairs of up to length max. For text files, a matrix of lines, each row a line from the text files.

Description

Reads all or a limited number of key/value pairs from HDFS files.

Details

Reads the key/value pairs from the files pointed to by files. The source files can end in a wildcard * e.g. /path/input/p* will read all the key/value pairs contained in files starting with p in the folder /path/input/.

The parameter type specifies the format of files. This can be one of text, map or sequence which imply a Text file, MapFile or a SequenceFile respectively.

The parameter max specifies the maximum number of entries to read, by default all the key/value pairs will be read. Specifying max for text files, limits the number of lines read.

mc is by default lapply. The user can change this to mclapply for faster throughput.

Data written by rhwrite can be read using rhread.

Author

Saptarshi Guha

See also

rhget, rhput, rhdel, rhwrite, rhsave

rhsave

Save .RData to HDFS.

Usage

rhsave(..., file, envir = parent.frame())

Arguments

...
Additional parameters for rhsave.

file
Absolute path to file on HDFS. Creates the file or overwrites.

envir
Environment to search for objects to be saved it.

Description

Puts the result of a save call unto the HDFS. It is useful if you have variables in the current environment you want to work with in a MapReduce as a shared object.

Author

Saptarshi Guha

See also

rhsave.image, rhload

rhsave.image

save.image to HDFS

Usage

rhsave.image(..., file)

Arguments

...
Additional parameters for save.image

file
Path to file on HDFS. Creates the file or overwrites it.

Description

Puts the result of a save.image call unto the HDFS. Useful if you have variables in the current environment you want to work with in a MapReduce as a shared object.

Author

Saptarshi Guha

See also

rhsave, rhload

rhstatus

Report Status Of A MapReduce Job

Usage

rhstatus(job, mon.sec = 5, autokill = TRUE, showErrors = TRUE,
  verbose = FALSE, handler = rhoptions()$statusHandler)

Arguments

job
The parameter job can either be a string with the format `job_datetime_id (e.g. job_201007281701_0274) or the value returned from rhex with the async option set to TRUE.

mon.sec
If mon.sec is greater than 0, a small data frame indicating the progress will be returned every mon.sec seconds.If 0, it will return immediately. If Inf, it will wait till over.

autokill
If autokill is TUE, then any R errors caused by the map/reduce code will cause the job to be killed.

verbose
If verbose is TRUE, also provided is the state of the job, the duration in seconds, a data frame with columns for the Map and Reduce phase. This data frame summarizes the number of tasks, the percent complete, and the number of tasks that are pending, running, complete or have failed. In addition the list has an element that consists of both user defined and Hadoop MapReduce built in counters (counters can be user defined with a call to rhcounter).

showErrors
If TRUE display errors from R in MapReduce tasks.}

handler
is a function that gets the counters and job related information and you can use it to kill the job, returning FALSE, stops monitoring

value
A list of the current state

Description

Returns the status of an running MapReduce job.

Note

This function does something different depending on if it is used in MapReduce expression during a MapReduce task. In a MapReduce task use this function to REPORT the status of your job to Hadoop.

From the user side, this displays the status of a running MapReduce job and reports information accumulated about the Hadoop job.

The parameter job can either be a string with the format *job_datetime_id(e.g. *job_201007281701_0274*) or the value returned fromrhexwith theasync` option set to TRUE.

Author

Saptarshi Guha

See also

rhex, rhkill

rhuz

Serialize and Unserialize See Details.

Usage

rhuz(r)

Arguments

r
An object to serialize or unserialize

value
A serialized or unserialized object

Description

The function rhsz serializes an object using RHIPE's binary serialization. This will return the raw bytes corresponding the serialized object. If the object cannot be serialized, it will be converted to NULL and serialized. rhuz takes the bytes and un-serializes, throwing an error if it cannot. These two functions are also available at the R console. RHIPE uses the internals of these functions in rhcollect and rhread. The maximum size of the serialized object that can be read is 256MB. Larger objects will be written successfully, but when read RHIPE will throw an error. These functions are useful to get an approximate idea of how large an object will be.

Details

The R serialization is verbose. Serialized objects have 22 bytes of header and booleans are serialized to integers. Best performance is achieved in Hadoop when the size of the data exchanged is as small as possible. RHIPE implements its own serialization using Google's Protocol Buffers. A benefit of using this is that the data produced by RHIPE can be read in languages such as Python, C and Java using the wrappers provided on the Google website.

However, a drawback of RHIPE's serialization is that not all R objects can be seamlessly serialized. RHIPE can serialize the following

  • Scalar vectors: integers, characters (including UTF8 strings), numerics, logicals, complex and raw. NA values are accepted.

  • Lists of the above.

  • Attributes of objects.

RHIPE can serialize data frames, factors, matrices including others like time series objects) since these are the above data structure with attributes.

Closures, environments and promises cannot be serialized. You may often get around this limitation by using the built in serialization present in R first, and then serializing that string via rhsz (or indirectly via rhcollect. For example, to serialize the output of xyplot, wrap it in a call to serialize.

Author

Saptarshi Guha

rhwatch

Prepares,Submits and Monitors MapReduce Jobs`

Usage

rhwatch(map = NULL, reduce = NULL, combiner = FALSE, setup = NULL,
  cleanup = NULL, input = NULL, output = NULL, orderby = "bytes",
  mapred = NULL, shared = c(), jarfiles = c(), zips = c(),
  partitioner = NULL, copyFiles = FALSE, jobname = "",
  parameters = NULL, job = NULL, mon.sec = 5,
  readback = rhoptions()$readback, debug = NULL, noeval = FALSE, ...)
}

Arguments

map
map is an R expression (created using the R command expression) that is evaluated by RHIPE during the map stage. For each task, RHIPE will call this expression multiple times (see details). This can also be a function of two arguments,the key and the value. Using an expression provides one more opportunity for vector operations.

reduce
reduce is an R expression (created using the R command expression) that is evaluated by RHIPE during the reduce stage, or it is a vector of expressions with names pre, reduce, and post. For example reduce = expression(pre={...}, reduce={...}, post={...}). reduce is optional, and if not specified the map output keys will be sorted and shuffled and saved to disk. If it is not specified, then a default identity reduce is performed. Setting it to 0 or another integer is equivalent to mapred.reduce.tasks=reduce

combiner
If set to TRUE, RHIPE will run the reduce expression on the output of the map expression locally i.e. on the same computer that is running the associated map after io.sort.mb megabytes of key, value pairs. See details.

WARNING: setup/cleanup may not run when you think when used with a combiner. We recommend only advanced users try to use both a combiner and setup/cleanup expressions.

input
Specifies the type of input. If a character vector then Sequence file input. If a numeric(N), the lapply input e.g the key will be from 1 to N. If a pair of numbers, then the key ranges from 1 ..N[1] and mapred.map.tasks is set to N2. See rhfmt for more examples and rhoptions()$ioformats. To get text input, specify rhfmt(path, 'text').

output
Similar to input. To get a map output format (after after which the user can call rhmapfile to query using rhgetkey), specify rhfmt(path, 'map').

setup
An expression of R code to be run before map and reduce. Alternatively an expression with elements map and reduce e.g setup=expression(map=,reduce=)} and each of those is, ran respectively, before the map and reduce phases. See details.

cleanup
As in setup except cleanup runs after the map and reduce phases.

orderby
This is one of bytes, integer, numeric and character. The intermediate keys will be ordered assuming the output key in the map is of that type. If not of the type an exception will be thrown. Tuples can be sorted too, see Tuple Sorting in the online documentation pdf (BUGBUG: What's this?).

mapred
Specify Hadoop and RHIPE options in this parameter (a list). See details and for Hadoop options go here.

shared
This is a character vector of files located on the HDFS. At the beginning of the MapReduce job, these files will be copied to the local hard disks of the Tasktrackers (cluster computers on which the compute nodes/cores are located). See details.

jarfiles
Optional JARs that need to be used during Hadoop MapReduce. This is used in the case when a user provides a custom InputFormat. Specify the JAR file to handle this InputFormat using this argument and specify the name of the InputFormat in the mapred argument.

zips
Distributed cache file on the HDFS to unzip and distribute to each MapReduce task. See Distributed Cache.

partitioner
A list of two names elements: lims and type. See details.}

parameters
A named list with parameters to be passed to a mapreduce job.It can also be the string 'all', and all variables (whose size <= rhoptions()$copyObjects$maxsize (bytes) and the name of the variable not in rhoptions()$copyObjects$exclude will be automatically copied. If rhoptions()$copyObje cts$auto is TRUE (default), RHIPE will make an attempt (via codetools) to determine the called variables/functions and copy them automatically.

copyFiles
Will the files created in the R code e.g. PDF output, be copied to the destination folder, ofolder?

jobname
The name of the job, which is visible on the Jobtracker website. If not provided, Hadoop MapReduce uses the default name job_date_time_number e.g. job_201007281701_0274.

job
The parameter job can either be a string with the format job_datetime_id (e.g. job_201007281701_0274)

mon.sec If mon.sec is greater than 0, a small data frame indicating the progress will be returned every mon.sec seconds.

readback
If FALSE, results will not be read back and insteat results from rhstatus is returned

noeval
If TRUE do not run, just return the job object

debug
Can be one of count' which counts the number of errors but does not stop the job, 'stop' which stops the job (setting debug to NULL (default) is the same and is much faster) and 'collect' which collects 20 errors per split, saves them in files (inside the output folder) prefixed with rhipe.debug and does not kill the job. The maximum number of errors to collect can be set in the param argument to rhwatch, i.e., rh.max.errors=10 collects a maximum of 10 errors.

...
Extra parameters passed to rhstatus.

value
If the state is SUCCEEDED and total output size (in MB) is less than rhoptions()$max.read.in.size the data is read with a warning if the number of records is more than rhoptions()$reduce.output.records.warn. If rhoptions()$rhmr.max.records.to.read.in is not NA, that many records is read. This only works for Sequence output. Otherwise an object of length two. The first element is the data returned from rhstatus and the second is the data returned by the internal rhwatch() function.

Description

Creates the R object that contains all the information required by RHIPE to run a MapReduce job via a call to rhex (see details).

Details

Buffer Size
If a task consists of W key,value pairs, the expression map will be called ceil(W / rhipe_map_buffsize) times. The default value of rhipe_map_buff_size is 10,000 and is user configurable. Each time map is called, the vectors map.keys and map.values contain rhipe_map_buff_size keys and values respectively. If the objects are large it advisable to reduce the size of rhipe_map_buff_size, so that the total amount of memory used by a task is well controlled.
For particularly large map.values, the authors have used rhipe_map_buff_size as low as 10.

Setup
In RHIPE, each task is a sequence of many thousands of key, value pairs. Before running the map and reduce expression (and before any key, value pairs have been read), RHIPE will evaluate expressions in setup and cleanup. Each of these may contain the names map and reduce e.g setup=list(map=,reduce=) specific to the map and reduce expressions. If just an expressions is provided, it will be evaluated before both the Map phase and Reduce phase. The same is true for cleanup. Variables created, packages loaded in the setup expression will be visible in the map and the reduce expression but not both since both are evaluated in different R sessions (except when using a combiner).

Sorting and Shuffling
To turn off sorting and shuffling and instead write the map output to disk directly, set mapred.reduce.tasks to zero in mapred. In this case, the output keys are not sorted and the output format should not be map (since a map file expects sorted keys).

Using a Combiner
If combiner is TRUE, the reduce expression will be invoked during the local combine, in which case the output is intermediate and not saved as final output. The reduce expression also be invoked during the final reduce phase, in which case it will receive all the values associated with the key (note, these are values outputted when reduce is invoked as a combiner) and the output will be committed to the destination folder. To determine in which state reduce is running read the environment variable rhipe_iscombining which is 1' (also the R symbolrhipe_iscom biningis equal TRUE) or0' for the former and latter states respectively. WARNING: setup and cleanup may not run when you think when used with a combiner. We recommend only advanced users try to use both a combiner and setup/cleanup expressions.

Using Shared Files
shared is a character vector of files located on the HDFS. At the beginning of the MapReduce job, these files will be copied to the local hard disks of the Tasktrackers (cluster computers on which the compute nodes/cores are located). User provided R code can read theses files from the current directory (which is located on the local hard disk). For example, if /path/to/file.Rdata is located on the HDFS and shared, it is possible to read it in the R expressions as load('file.Rdata'). Note, there is no need for the full path, the file is copied to the current directory of the R process.

sequence
The keys and values can be arbitrary R objects. All the information of the object will be preserved. To extract a single key,value pair from a sequence file, either the user has to read the entire file or compose a MapReduce job to subset that key,value pair.

text
The keys, and values are stored as lines of text. If the input is of text format, the keys will be byte offsets from beginning of the file and the value is a line of text without the trailing newline. R objects written to a text output format are written as one line. Characters are quoted and vectors are separated by mapred.field.separator (default is space). The character used to separate the key from the value is specified in the mapred argument by setting mapred.textoutputformat.separator (default is tab). To not output the key, set mapred.textoutputformat.usekey to FALSE.

map
A map file is actually a folder consisting of sequence file and an index file. A small percentage of the keys in the sequence file are stored in the index file. Using the index file, Hadoop can very quickly return a value corresponding to a key (using rhgetkey). To create such an output format, use map. Note, the keys have to be saved in sorted order. The keys are sent to the reduce expression in sorted order, hence if the user does not modify reduce.key a query-able map file will be created. If reduce.key is modified, the sorted guarantee does not hold and RHIPE will either throw an error or querying the output for a key might return with empty results. MapFiles cannot be created if orderby is not bytes.

Custom Partitioning
A list of two names elements: lims and type. A partitioner forces all keys sharing the same property to be processed by one reducer. Thus, for these keys, the output of the reduce phase will be saved in one file. For example, if the keys were IP addresses e.g. c(A,B,C,D) where the components are integers, with the default partitioner, the space of keys will be uniformly distributed across the number of reduce tasks. If it is desired to store all IP addresses with the same first three ordinates in one file (and processed by one R process), use a partitioner as list(lims=c(1:3), type='integer'). RHIPE implements partitioners when the key is an atomic vector of the following type: integer, string, and real. The value of lims specifies the ordinates (beginning and end) of the key to partition on. The numbers must be positive. lims can be a single number.

Avoid Time Outs
To avoid time outs during long map or reduce expressions, your MapReduce expressions should report status messages via calls to rhstatus. In the absence of rhstatus and if mapred.task.timeout is non-zero (by default it is 10 minutes) Hadoop will eventually kill a lengthy R process.

List of Important Options for the mapred argument
These are all set with mapred = list( name=value, name=value,...).

  • rhipe_map_buffsize: Number of elements in the map buffer. (not size in bytes!) Control the amount of memory your map task employ using this.

  • rhipe_reduce_buffsize: Number of elements in the reduce.values buffer. (not size in bytes!) Control the amount of memory your reduce task employ using this.

  • rhipe_stream_buffer: .

  • mapred.task.timeout: If non-zero the number of milliseconds before a task times out.

  • mapred.reduce.tasks: If zero then no reducer is run and map output is placed directly on disk without shuffling or sorting. If non-zero, the number of simultaneous reduce task to launch.

  • mapred.map.tasks: The number of simultaneous map task to launch.

rhwrite

Write R data to the HDFS.

Usage

rhwrite(object, file, numfiles = 1, chunk = 1, passByte = 1024 * 1024 *
  20, kvpairs = TRUE, verbose = TRUE)

Arguments

object
An list of objects whose elements are written.

file
Where to write(it is overwritten).

numfiles
Number of files to write to.

chunk
An integer specificed to chunk data frames into rows or lists into sublists.

passByte
Buffer size for writing (see details).

kvpairs
If TRUE, object should be a list of key-value pairs. Otherwise, it should be a data frame or matrix (in which case NULL keys will be written with arbitrary chunking of the data).

verbose
Logical - print messages about what is being done

Description

Takes a list of objects, found in object and writes them to the folder pointed to by file which will be located on the HDFS. The file dest will be in a format interpretable by RHIPE, i.e it can be used as input to a MapReduce job. The values of the list of are written as key-value pairs in a SequenceFileFormat format. numfiles specifies the number of files to write the values to. For example, if numfiles is 1, the entire list will be written to one file in the folder dest. Computations across small files do not parallelize well on Hadoop. If the file is small, it will be treated as one split and the user does not gain any (hoped for) parallelization. Distinct files are treated as distinct splits. It is better to split objects across a number of files. If the list consists of a million objects, it is prudent to split them across a few files. Thus if numfiles is 10 and list contains 1,000,000 values, each of the 10 files (located in the directory dest) will contain 100,000 values.