Package Information
R and Hadoop Integrated Programming Environment
Author: Saptarshi Guha
Version: 0.74.0
Date: 2014-02-19
License:
Description
RHIPE is a merger of R and Hadoop. R is the widely used, highly acclaimed interactive language and environment for data analysis. Hadoop consists of the Hadoop Distributed File System (HDFS) and the MapReduce distributed compute engine. RHIPE allows an analyst to carry out D&R analysis of complex big data wholly from within R. RHIPE communicates with Hadoop to carry out the big, parallel computations.
rhchmod
Change Permissions of Files on HDFS
Usage
rhchmod(path, permissions)
Arguments
path
is the location of the directory to be created. It is expanded using rhabsolute.hdfs.path
permissions
a character string such as '777'
Description
Change permissions of files on HDFS
Author
Ryan Hafen
See also
rhwrite
, rhmkdir
rhcp
Copy files (or folders) on the HDFS
Usage
rhcp(ifile, ofile, delete = FALSE)
Arguments
ifile
Absolute path to be copied on the HDFS or the output from rhwatch(.., read=FALSE).
ofile
Absolute path to place the copies on the HDFS.
delete
should we delete `ifile when done?
Description
Copies the file (or folder) src
on the HDFS to the
destination dest
also on the HDFS.
Author
Saptarshi Guha
See also
rhget
, rhput
, rhdel
, rhread
, rhwrite
, rhsave
rhdel
HDFS File Deletion
Usage
rhdel(folder)
Arguments
folder
The absolute path on the hdfs to the directory(s) to be deleted.
Description
This function deletes the folders contained in the character vector folders
which are located on
the HDFS. The deletion is recursive, so all subfolders will be deleted too. Nothing is returned.
Author
Saptarshi Guha
See also
rhput
, rhls
, rhread
, rhwrite
, rhsave
, rhget
rherrors
Retrieves errors from a job when debug=='collect'
in rhwatch
Usage
rherrors(job, prefix = "rhipe_debug", num.file = 1)
Arguments
job
Is the result of rhwatch (when read = FALSE
)
prefix
Is the prefix string of the debug files
num.file
Is the number of debug files to read
Description
Retrieves errors from a job when debug=='collect' in rhwatch
Author
Saptarshi Guha
rhex
Execute a MapReduce Job On Hadoop
Usage
rhex(conf, async = TRUE, mapred, ...)
Arguments
conf
conf
is a list returned from rhwatch
describing the MapReduce job.
async
When async
is TRUE
, the function returns immediately, leaving the job running in the background
on Hadoop.
mapred
See Details.
...
Additional parameters for system call.
Value
When async
is TRUE
, function returns an object of class jobtoken
.
Description
Submits a MapReduce job (created using rhwatch
) to the Hadoop MapReduce framework. The argument
mapred
serves the same purpose as the mapred
argument to rhwatch
. This will override the
settings in the object returned from rhwatch
. The function returns when the job ends
(success/failure or because the user terminated (see rhkill
)). When async
is TRUE, the function
returns immediately, leaving the job running in the background on Hadoop.
Details
When async
is TRUE
, function returns an object of class jobtoken
. The generic function
print.jobtoken
, displays the start time, duration (in seconds) and percent progress. This object
can be used in calls to rhstatus
,rhjoin
and rhkill
. Otherwise is returns a list of counters
and the job state.
Author
Saptarshi Guha
See also
rhwatch
, rhstatus
, rhkill
rhexists
Check if File Exists on HDFS
Usage
rhexists(path)
Arguments
path
Is the location of the file to be checked. It is expanded using
rhabsolute.hdfs.path
Description
Check if file exists on HDFS
Author
Ryan Hafen
See also
rhls
, rhwrite
, rhmkdir
rhfmt
A function that returns a function to specify input/output formats
Usage
rhfmt(type, ...)
Arguments
type
The name of the function handler
...
Arguments passed to the function
Description
Returns a function to spec out the input output formats
Details
The function returned must take 3 arguments 'lines', direction(input or output), call signature see
rhoptions()$ioformats
for examples on how to write your own.
rhget
Copying from the HDFS, moving files from the HDFS to a local directory.
Usage
rhget(src, dest)
Arguments
src
Absolute path to file or directory on HDFS to get.
dest
Path to file or directory on local filesystem to create as the new copy.
Description
Copies the files (or folder) at src
, located on the HDFS to the destination dest
located on the
local filesystem. If a file or folder of the same name as dest
exists on the local filesystem, it
will be deleted. The dest
can contain ~ which will be expanded. The original copy of the file or
folder is left on the HDFS after this operation.
Author
Saptarshi Guha
See also
rhput
, rhdel
, rhread
, rhwrite
, rhsave
rhinit
Initializes the RHIPE subsystem
Usage
rhinit()
Description
Initializes the RHIPE subsystem
rhIterator
Iterates Through the Records of Sequence Files
Usage
rhIterator(files, type = "sequence", chunksize = 1000, chunk = "records",
skip = rhoptions()$file.types.remove.regex, mc = lapply,
textual = FALSE)
}
Arguments
files
Path to file or directory containing sequence files. This can also be the output from
rhwatch(read = FALSE)
.
type
Is it 'sequence' or 'map'. Ideally should be auto-determined.
chunksize
Number of records or bytes to read. Depends on 'chunk'.
chunk
either "records" or "bytes"
skip
Files to skip while reading the hdfs. Various installs of Hadoop add additional log info to
HDFS output from MapReduce. Attempting to read these files is not what we want to do. To get around
this we specify pieces of filenames to grep and remove from the read. skip
is a vector argument
just to have sensible defaults for a number of different systems. You can learn which if any files
need to be skipped by using rhls
on the target directory.
mc
Set to lapply by default. User can change this to mclapply
for parallel lapply.
textual
If the keys and values are hadoop Text objects
Description
Can be used to iterate through the records of a Sequence File (or collection thereof)
rhJobInfo
Very Detailed Job Information
Usage
rhJobInfo(job)
Arguments
job
The parameter job
can either be a string with the format job_datetime_id
(e.g. job_201007281701_0274) or the value returned from rhex
with the async
option set to TRUE.
Description
Returns detailed job information across all TaskIDs
rhkill
Kill A MapReduce Job
Usage
rhkill(job)
Arguments
job
The parameter job
can either be string with the format job_datetime_id
or the value
returned from rhex
with the async
option set to TRUE
.
Description
This kills the MapReduce job with job identifier given by job
. The parameter job
can
either be string with the format job_datetime_id
or the value returned from rhex
with the
async
option set to TRUE
.
Author
Saptarshi Guha
See also
rhstatus
, rhwatch
, rhjoin
, rhex
rhls
List Files On HDFS
Usage
rhls(folder = NULL, recurse = FALSE, nice = "h")
Arguments
folder
Path of directory on HDFS or output from rhwatch(readback=FALSE)
recurse
If TRUE
list all files and directories in sub-directories.
nice
One of 'g','m','b' or 'h' (gigabytes, megabytes, bytes, human readable).
Value
vector of file and directory names
Description
List all files and directories contained in a directory on the HDFS.
Details
Returns a data.frame of filesystem information for the files located at path
. If recurse
is
TRUE
, rhls
will recursively travel the directory tree rooted at path
. The returned object
is a data.frame consisting of the columns: permission, owner, group, size (which is numeric),
modification time, and the file name. path
may optionally end in '*' which is the wildcard and
will match any character(s).
Author
Saptarshi Guha
See also
rhput
, rhdel
, rhread
, rhwrite
, rhsave
, rhget
rhload
Load an RData from the HDFS.
Usage
rhload(file, envir = parent.frame())
Arguments
file
Path to the .RData file on the HDFS.
envir
Environment in which to load the .RData file.
value data from HDFS
Description
Calls the function load after fetching an RData file from the HDFS.
Author
Saptarshi Guha
See also
rhsave
, rhsaveimage
Details
This code, will chunk a data frame(or matrix) or list into sub objects, defined by chunks
and then
written to the HDFS across numfiles
files. Thus if chunks is 10, and numfiles
is 20, then a data
frame is divided into sub data frames of rows 10 each and written across 20 files. In order to
improve the R-Java switch, this is buffered, the buffer size defined by passByte
(bytes).
rhmap
Macro to Wrap Boilerplate Around RHIPE Map code}
Usage
rhmap(expr = NULL, before = NULL, after = NULL)
Arguments
expr
Any R expression, that operates on current map.keys, map.values and current index (given by
k
,r
, and .index
respectively).
before
An R expression to run before the loop across map.values,map.keys and .index. If map
.values is shortened, make map.keys the same length!
after
An R expression to run after the loop. The results of the loop is contained in result
Description
Returns an expression corresponding to given input
See also
rhwatch
rhmapfile
Creates a Handle to a Mapfile
Usage
rhmapfile(paths)
Arguments
paths
Absolute path to map file on HDFS or the output from rhwatch
.
Description
Creates a Handle to a Mapfile
rhmkdir
Creates Directories on the HDFS
Usage
rhmkdir(path, permissions)
Arguments
path
Is the location of the directory to be created. It is expanded using
rhabsolute.hdfs.path
permissions
either of the integer form e.g. 777 or string e.g. rwx. If missing the default is
used.
value Logical TRUE if success
Description
Equivalent of mkdir -p
in Unix
Author
Saptarshi Guha
See also
rhcp
, rhmv
,
rhmv
Move files (or folders) on the HDFS (delete original)
Usage
rhmv(ifile, ofile)
Arguments
ifile
Absolute path to be copied on the HDFS or the output from rhwatch(.., read=FALSE)
.
ofile
Absolute path to place the copies on the HDFS.
Description
Copies the file (or folder) src
on the HDFS to the destination dest
also on the HDFS.
Author
Saptarshi Guha
See also
rhget
, rhput
, rhdel
, rhread
, rhwrite
, rhsave
rhofolder
Returns the output folder from a previous RHIPE job
Usage
rhofolder(job)
Arguments
job Can be a character indicating the string to a folder, or the result of call to rhwatch
.
For the latter, this works only read is FALSE (because if read is TRUE, the output is
returned)
Description
Returns the output folder from a previous RHIPE job Take a look at hdfs.getwd
for more
information.
Author
Saptarshi Guha
rhoptions
Get or Set Rhipe Options
Usage
rhoptions(li = NULL, ...)
Arguments
li
A list of options. Names of elements indicate the option being modified. Values of elements
indicate the value the option is to take on. If NULL then a list of current rhoptions is returned.
See details.
...
Options maybe assigned values as key=value
. See examples.
value
rhoptions()
returns a list of current options.
Description
Used to set Rhipe options (called by rhinit
for example). Most often called by a user to set a
runner that starts the embedded R Rhipe executable.
Details
One may set the following options specific to Rhipe and many options specific to mapred (not listed here but present in the Hadoop documentation).
Note
Default values can be observed by simply typing rhoptions()
This is the launcher for the embedded R Rhipe executable used in MapReduce jobs.
Often other options are easiest to set in the mapred
argument of rhwatch
.
Author
Saptarshi Guha
See also
rhinit
rhput
Put a file unto the HDFS
Usage
rhput(src, dest, deletedest = TRUE)
Arguments
src
Path to the local file to be copied to the HDFS.
dest
Path to the file on the HDFS. rhput
creates the file at dest.
deletedest
If TRUE
this function attempts to delete the destination of the HDFS before trying to copy to
that location on the HDFS.
Description
Copies the local file called src
(not a folder) to the destination dest
on the HDFS. Uses
path.expand
to expand the src
parameter.
Note
Local filesystem copy remains after the operation is complete.
Author
Saptarshi Guha
See also
rhget
, rhdel
, rhread
, rhwrite
, rhsave
rhread
Read Key/Value Pairs From The HDFS.
Usage
rhread(files, type = c("sequence"), max = -1L,
skip = rhoptions()$file.types.remove.regex, mc = lapply,
textual = FALSE, verbose = TRUE, ...)
Arguments
files
Path to file or directory containing map
, sequence
, or text
file to be read on the HDFS. This
can also be the output from rhwatch
provided readback=FALSE
.
type
Type of file on HDFS. Must be sequence
, map
, or text
.
max
Maximum number of key/value pairs to read for map and sequence files. Maximum number of lines to read
for text files.
mc
Set to lapply by default. User can change this to mclapply
for parallel lapply.
textual
If the keys and values are hadoop Text objects
skip
Files to skip while reading the HDFS. Various installs of Hadoop add additional log info to HDFS
output from MapReduce. Attempting to read these files is not what we want to do in rhread
. To
get around this we specify pieces of filenames to grep and remove from the read. skip
is a vector
argument just to have sensible defaults for a number of different systems. You can learn which if
any files need to be skipped by using rhls
on target directory.
verbose
Logical - print messages about what is being done.
...
Additional arguments passed to the function
Value
For map and sequence files, a list of key, pairs of up to length max
. For text files, a matrix of
lines, each row a line from the text files.
Description
Reads all or a limited number of key/value pairs from HDFS files.
Details
Reads the key/value pairs from the files pointed to by files
. The source files
can end in a
wildcard *
e.g. /path/input/p*
will read all the key/value pairs contained in files starting
with p
in the folder /path/input/
.
The parameter type
specifies the format of files
. This can be one of text
, map
or sequence
which imply a Text file, MapFile or a SequenceFile respectively.
The parameter max
specifies the maximum number of entries to read, by default all the key/value
pairs will be read. Specifying max
for text files, limits the number of lines read.
mc
is by default lapply
. The user can change this to mclapply for faster throughput.
Data written by rhwrite
can be read using rhread
.
Author
Saptarshi Guha
See also
rhget
, rhput
, rhdel
, rhwrite
, rhsave
rhsave
Save .RData to HDFS.
Usage
rhsave(..., file, envir = parent.frame())
Arguments
...
Additional parameters for rhsave
.
file
Absolute path to file on HDFS. Creates the file or overwrites.
envir
Environment to search for objects to be saved it.
Description
Puts the result of a save
call unto the HDFS. It is useful if you have variables in the current
environment you want to work with in a MapReduce as a shared object.
Author
Saptarshi Guha
See also
rhsave.image
, rhload
rhsave.image
save.image to HDFS
Usage
rhsave.image(..., file)
Arguments
...
Additional parameters for save.image
file
Path to file on HDFS. Creates the file or overwrites it.
Description
Puts the result of a save.image
call unto the HDFS. Useful if you have variables in the current
environment you want to work with in a MapReduce as a shared object.
Author
Saptarshi Guha
See also
rhsave
, rhload
rhstatus
Report Status Of A MapReduce Job
Usage
rhstatus(job, mon.sec = 5, autokill = TRUE, showErrors = TRUE,
verbose = FALSE, handler = rhoptions()$statusHandler)
Arguments
job
The parameter job
can either be a string with the format `job_datetime_id (e.g.
job_201007281701_0274) or the value returned from rhex
with the async
option set to TRUE.
mon.sec
If mon.sec
is greater than 0, a small data frame indicating the progress will be
returned every mon.sec
seconds.If 0, it will return immediately. If Inf, it will wait till over.
autokill
If autokill
is TUE, then any R errors caused by the map/reduce code will cause the job
to be killed.
verbose
If verbose
is TRUE, also provided is the state of the job, the duration in seconds, a
data frame with columns for the Map and Reduce phase. This data frame summarizes the number of tasks,
the percent complete, and the number of tasks that are pending, running, complete or have failed. In
addition the list has an element that consists of both user defined and Hadoop MapReduce built in
counters (counters can be user defined with a call to rhcounter
).
showErrors
If TRUE
display errors from R in MapReduce tasks.}
handler
is a function that gets the counters and job related information and you can use it to kill the job,
returning FALSE
, stops monitoring
value
A list of the current state
Description
Returns the status of an running MapReduce job.
Note
This function does something different depending on if it is used in MapReduce expression during a MapReduce task. In a MapReduce task use this function to REPORT the status of your job to Hadoop.
From the user side, this displays the status of a running MapReduce job and reports information accumulated about the Hadoop job.
The parameter job
can either be a string with the format *job_datetime_id(e.g.
*job_201007281701_0274*) or the value returned from
rhexwith the
async` option set to TRUE.
Author
Saptarshi Guha
See also
rhex
, rhkill
rhuz
Serialize and Unserialize See Details.
Usage
rhuz(r)
Arguments
r
An object to serialize or unserialize
value
A serialized or unserialized object
Description
The function rhsz
serializes an object using RHIPE's binary serialization. This will return the raw
bytes corresponding the serialized object. If the object cannot be serialized, it will be converted to
NULL and serialized. rhuz
takes the bytes and un-serializes, throwing an error if it cannot. These two
functions are also available at the R console. RHIPE uses the internals of these functions in
rhcollect
and rhread
. The maximum size of the serialized object that can be read is 256MB. Larger
objects will be written successfully, but when read RHIPE will throw an error. These functions are
useful to get an approximate idea of how large an object will be.
Details
The R serialization is verbose. Serialized objects have 22 bytes of header and booleans are serialized to integers. Best performance is achieved in Hadoop when the size of the data exchanged is as small as possible. RHIPE implements its own serialization using Google's Protocol Buffers. A benefit of using this is that the data produced by RHIPE can be read in languages such as Python, C and Java using the wrappers provided on the Google website.
However, a drawback of RHIPE's serialization is that not all R objects can be seamlessly serialized. RHIPE can serialize the following
Scalar vectors: integers, characters (including UTF8 strings), numerics, logicals, complex and raw.
NA
values are accepted.Lists of the above.
Attributes of objects.
RHIPE can serialize data frames, factors, matrices including others like time series objects) since these are the above data structure with attributes.
Closures, environments and promises cannot be serialized. You may often get around this limitation by
using the built in serialization present in R first, and then serializing that string via rhsz
(or
indirectly via rhcollect
. For example, to serialize the output of xyplot
, wrap it in a call to
serialize
.
Author
Saptarshi Guha
rhwatch
Prepares,Submits and Monitors MapReduce Jobs`
Usage
rhwatch(map = NULL, reduce = NULL, combiner = FALSE, setup = NULL,
cleanup = NULL, input = NULL, output = NULL, orderby = "bytes",
mapred = NULL, shared = c(), jarfiles = c(), zips = c(),
partitioner = NULL, copyFiles = FALSE, jobname = "",
parameters = NULL, job = NULL, mon.sec = 5,
readback = rhoptions()$readback, debug = NULL, noeval = FALSE, ...)
}
Arguments
map
map
is an R expression (created using the R command expression
) that is evaluated by RHIPE
during the map stage. For each task, RHIPE will call this expression multiple times (see details). This
can also be a function of two arguments,the key and the value. Using an expression provides one more
opportunity for vector operations.
reduce
reduce
is an R expression (created using the R command expression
) that is evaluated by
RHIPE during the reduce stage, or it is a vector of expressions with names pre, reduce, and post. For
example reduce = expression(pre={...}, reduce={...}, post={...})
. reduce
is optional, and if not
specified the map output keys will be sorted and shuffled and saved to disk. If it is not specified,
then a default identity reduce is performed. Setting it to 0 or another integer is equivalent to
mapred.reduce.tasks=reduce
combiner
If set to TRUE
, RHIPE will run the reduce
expression on the output of the map
expression locally i.e. on the same computer that is running the associated map after io.sort.mb
megabytes of key, value pairs. See details.
WARNING: setup/cleanup may not run when you think when used with a combiner. We recommend only advanced users try to use both a combiner and setup/cleanup expressions.
input
Specifies the type of input. If a character vector then Sequence file input. If a numeric(N),
the lapply input e.g the key will be from 1 to N. If a pair of numbers, then the key ranges from 1
..N[1] and mapred.map.tasks
is set to N2. See rhfmt
for more examples and rhoptions()$ioformats
. To get text input, specify
rhfmt(path, 'text')
.
output
Similar to input
. To get a map output format (after after which the user can call
rhmapfile
to query using rhgetkey
), specify rhfmt(path, 'map')
.
setup
An expression of R code to be run before map and reduce. Alternatively an expression with
elements map and reduce e.g setup=expression(map=,reduce=)}
and each of those is, ran respectively,
before the map and reduce phases. See details.
cleanup
As in setup except cleanup runs after the map and reduce phases.
orderby
This is one of bytes, integer, numeric and character. The intermediate keys will be
ordered assuming the output key in the map is of that type. If not of the type an exception will be
thrown. Tuples can be sorted too, see Tuple Sorting in the online documentation pdf (BUGBUG: What's
this?).
mapred
Specify Hadoop and RHIPE options in this parameter (a list). See details and for Hadoop
options go here.
shared
This is a character vector of files located on the HDFS. At the beginning of the MapReduce
job, these files will be copied to the local hard disks of the Tasktrackers (cluster computers on which
the compute nodes/cores are located). See details.
jarfiles
Optional JARs that need to be used during Hadoop MapReduce. This is used in the case when
a user provides a custom InputFormat. Specify the JAR file to handle this InputFormat using this
argument and specify the name of the InputFormat in the mapred
argument.
zips
Distributed cache file on the HDFS to unzip and distribute to each MapReduce task. See
Distributed Cache.
partitioner
A list of two names elements: lims
and type
. See details.}
parameters
A named list with parameters to be passed to a mapreduce job.It can also be the string
'all', and all variables (whose size <= rhoptions()$copyObjects$maxsize
(bytes) and the name of the
variable not in rhoptions()$copyObjects$exclude
will be automatically copied. If rhoptions()$copyObje
cts$auto
is TRUE (default), RHIPE will make an attempt (via codetools) to determine the called
variables/functions and copy them automatically.
copyFiles
Will the files created in the R code e.g. PDF output, be copied to the destination
folder, ofolder
?
jobname
The name of the job, which is visible on the Jobtracker website. If not provided,
Hadoop MapReduce uses the default name job_date_time_number e.g. job_201007281701_0274
.
job
The parameter job
can either be a string with the format job_datetime_id (e.g.
job_201007281701_0274
)
mon.sec
If mon.sec
is greater than 0, a small data frame indicating the progress will be
returned every mon.sec
seconds.
readback
If FALSE
, results will not be read back and insteat results from rhstatus is returned
noeval
If TRUE
do not run, just return the job object
debug
Can be one of count' which counts the number of errors but does not stop the job, 'stop'
which stops the job (setting debug to NULL (default) is the same and is much faster) and 'collect'
which collects 20 errors per split, saves them in files (inside the output folder) prefixed with
rhipe.debug
and does not kill the job. The maximum number of errors to collect can be set in the
param
argument to rhwatch
, i.e., rh.max.errors=10
collects a maximum of 10 errors.
...
Extra parameters passed to rhstatus
.
value
If the state is SUCCEEDED and total output size (in MB) is
less than rhoptions()$max.read.in.size
the data is read with a warning if the number of records is
more than rhoptions()$reduce.output.records.warn
. If rhoptions()$rhmr.max.records.to.read.in
is
not NA
, that many records is read. This only works for Sequence output. Otherwise an object of
length two. The first element is the data returned from rhstatus and the second is the data returned
by the internal rhwatch()
function.
Description
Creates the R object that contains all the information required by RHIPE to run a MapReduce job via a
call to rhex
(see details).
Details
Buffer Size
If a task consists of W key,value pairs, the expression map
will be
called ceil(W / rhipe_map_buffsize
) times. The default value of rhipe_map_buff_size
is 10,000
and is user configurable. Each time map
is called, the vectors map.keys
and map.values
contain
rhipe_map_buff_size
keys and values respectively. If the objects are large it advisable to reduce the
size of rhipe_map_buff_size
, so that the total amount of memory used by a task is well controlled.
For particularly large map.values, the authors have used rhipe_map_buff_size
as low as 10.
Setup
In RHIPE, each task is a sequence of many thousands of key, value pairs. Before running the
map
and reduce
expression (and before any key, value pairs have been read), RHIPE will evaluate
expressions in setup
and cleanup
. Each of these may contain the names map
and reduce
e.g
setup=list(map=,reduce=)
specific to the map
and reduce
expressions. If just an expressions is
provided, it will be evaluated before both the Map phase and Reduce phase. The same is true for
cleanup
. Variables created, packages loaded in the setup
expression will be visible in the map
and the reduce
expression but not both since both are evaluated in different R sessions (except when
using a combiner).
Sorting and Shuffling
To turn off sorting and shuffling and instead write the map output to disk directly, set
mapred.reduce.tasks
to zero in mapred
. In this case, the output keys are not sorted and the output
format should not be map
(since a map file expects sorted keys).
Using a Combiner
If combiner
is TRUE, the reduce
expression will be invoked during the local
combine, in which case the output is intermediate and not saved as final output. The reduce
expression also be invoked during the final reduce phase, in which case it will receive all the values
associated with the key (note, these are values outputted when reduce
is invoked as a combiner) and
the output will be committed to the destination folder. To determine in which state reduce
is
running read the environment variable rhipe_iscombining
which is 1' (also the R symbol
rhipe_iscom
biningis equal TRUE) or
0' for the former and latter states respectively. WARNING: setup and
cleanup may not run when you think when used with a combiner. We recommend only advanced users try to
use both a combiner and setup/cleanup expressions.
Using Shared Files
shared
is a character vector of files located on the HDFS. At the beginning of
the MapReduce job, these files will be copied to the local hard disks of the Tasktrackers (cluster
computers on which the compute nodes/cores are located). User provided R code can read theses files
from the current directory (which is located on the local hard disk). For example, if
/path/to/file.Rdata
is located on the HDFS and shared, it is possible to read it in the R expressions
as load('file.Rdata')
. Note, there is no need for the full path, the file is copied to the current
directory of the R process.
sequence
The keys and values can be arbitrary R objects. All the information of the object will be
preserved. To extract a single key,value pair from a sequence file, either the user has to read the
entire file or compose a MapReduce job to subset that key,value pair.
text
The keys, and values are stored as lines of text. If the input is of text format, the keys
will be byte offsets from beginning of the file and the value is a line of text without the trailing
newline. R objects written to a text output format are written as one line. Characters are quoted and
vectors are separated by mapred.field.separator
(default is space). The character used to separate
the key from the value is specified in the mapred
argument by setting
mapred.textoutputformat.separator
(default is tab).
To not output the key, set mapred.textoutputformat.usekey
to FALSE.
map
A map file is actually a folder consisting of sequence file and an index file.
A small percentage of the keys in the sequence file are stored in the index file. Using the index file,
Hadoop can very quickly return a value corresponding to a key (using rhgetkey
). To create such an
output format, use map. Note, the keys have to be saved in sorted order. The keys are sent to the
reduce
expression in sorted order, hence if the user does not modify reduce.key
a query-able map
file will be created. If reduce.key
is modified, the sorted guarantee does not hold and RHIPE will
either throw an error or querying the output for a key might return with empty results. MapFiles cannot
be created if orderby
is not bytes.
Custom Partitioning
A list of two names elements: lims
and type
. A partitioner forces all keys
sharing the same property to be processed by one reducer. Thus, for these keys, the output of the reduce
phase will be saved in one file. For example, if the keys were IP addresses e.g. c(A,B,C,D)
where the
components are integers, with the default partitioner, the space of keys will be uniformly distributed
across the number of reduce tasks. If it is desired to store all IP addresses with the same first three
ordinates in one file (and processed by one R process), use a partitioner as
list(lims=c(1:3), type='integer')
. RHIPE implements partitioners when the key is an atomic vector of
the following type: integer, string, and real. The value of lims
specifies the ordinates (beginning
and end) of the key to partition on. The numbers must be positive. lims
can be a single number.
Avoid Time Outs
To avoid time outs during long map or reduce expressions, your MapReduce
expressions should report status messages via calls to rhstatus. In the absence of rhstatus
and if
mapred.task.timeout
is non-zero (by default it is 10 minutes) Hadoop will eventually kill a lengthy R
process.
List of Important Options for the mapred argument
These are all set with mapred = list( name=value, name=value,...)
.
rhipe_map_buffsize: Number of elements in the map buffer. (not size in bytes!) Control the amount of memory your map task employ using this.
rhipe_reduce_buffsize: Number of elements in the reduce.values buffer. (not size in bytes!) Control the amount of memory your reduce task employ using this.
rhipe_stream_buffer: .
mapred.task.timeout: If non-zero the number of milliseconds before a task times out.
mapred.reduce.tasks: If zero then no reducer is run and map output is placed directly on disk without shuffling or sorting. If non-zero, the number of simultaneous reduce task to launch.
mapred.map.tasks: The number of simultaneous map task to launch.
rhwrite
Write R data to the HDFS.
Usage
rhwrite(object, file, numfiles = 1, chunk = 1, passByte = 1024 * 1024 *
20, kvpairs = TRUE, verbose = TRUE)
Arguments
object
An list of objects whose elements are written.
file
Where to write(it is overwritten).
numfiles
Number of files to write to.
chunk
An integer specificed to chunk data frames into rows or lists into sublists.
passByte
Buffer size for writing (see details).
kvpairs
If TRUE
, object
should be a list of key-value pairs. Otherwise, it should be a data frame or
matrix (in which case NULL
keys will be written with arbitrary chunking of the data).
verbose
Logical - print messages about what is being done
Description
Takes a list of objects, found in object
and writes them to the folder pointed to by file
which
will be located on the HDFS. The file dest
will be in a format interpretable by RHIPE
, i.e it
can be used as input to a MapReduce job. The values of the list of are written as key-value pairs in
a SequenceFileFormat
format. numfiles
specifies the number of files to write the values to. For
example, if numfiles
is 1, the entire list will be written to one file in the folder dest
.
Computations across small files do not parallelize well on Hadoop. If the file is small, it will be
treated as one split and the user does not gain any (hoped for) parallelization. Distinct files are
treated as distinct splits. It is better to split objects across a number of files. If the list
consists of a million objects, it is prudent to split them across a few files. Thus if numfiles
is
10 and list contains 1,000,000 values, each of the 10 files (located in the directory dest
) will
contain 100,000 values.