MapReduce is a programming model used to process and generate large datasets, primarily for parallel processing of massive datasets at the terabyte level. This article provides a detailed overview of how DolphinScheduler is applied to MapReduce tasks, including the distinctions between GenericOptionsParser and args, a comprehensive explanation of the hadoop jar
command parameters, MapReduce code examples, and instructions on configuring and running MapReduce tasks in DolphinScheduler.
Differences between GenericOptionsParser and args
Using GenericOptionsParser is as follows:
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
Analyzing the source code of GenericOptionsParser, it follows these steps:
public GenericOptionsParser(Configuration conf, String[] args) throws IOException { this(conf, new Options(), args); }
Parsing Options:
private boolean parseGeneralOptions(Options opts, String[] args) throws IOException { opts = buildGeneralOptions(opts); CommandLineParser parser = new GnuParser(); boolean parsed = false; try { commandLine = parser.parse(opts, preProcessForWindows(args), true); processGeneralOptions(commandLine); parsed = true; } catch (ParseException e) { LOG.warn("options parsing failed: " + e.getMessage()); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("general options are: ", opts); } return parsed; }
The GenericOptionsParser reads options such as fs, jt, D, libjars, files, archives, and tokenCacheFile and places them in Hadoop’s Configuration.
With args
, however, parsing these options (fs, jt, D, etc.) must be handled manually.
Complete Hadoop jar Command Parameters
hadoop jar wordcount.jar org.myorg.WordCount \
-fs hdfs:// \
-jt \
-D mapreduce.job.queuename=default \
-libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
-files /path/to/file1.txt,/path/to/file2.txt \
-archives /path/to/,/path/to/archive2.tar.gz \
-tokenCacheFile /path/to/credential.file \
/input /output
This command:
- Submits the job to the specified HDFS.
- Uses the specified YARN ResourceManager.
- Queues the job under
. - Adds dependencies and files.
- Distributes archives and credentials.
MapReduce Examples
Classic WordCount Example
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
for (String field : fields) {
context.write(word, one);
// Other classes omitted for brevity
File Distribution Example
public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private List<String> whiteList = new ArrayList<>();
protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException {
URI[] files = context.getCacheFiles();
if (files != null && files.length > 0) {
try (BufferedReader reader = new BufferedReader(new FileReader("white.txt"))) {
String line;
while ((line = reader.readLine()) != null) {
Using MapReduce with DolphinScheduler
Setting up the Yarn test Queue
In capacity-scheduler.xml
<value>default, test</value>
Refresh with yarn rmadmin -refreshQueues
Execution Results
Yarn Job
Source Code Analysis
String others = param.getOthers();
// TODO This means that if the queue isn’t specified using the -D mapreduce.job.queuename option,
// then the queue name will be taken directly from the "Yarn Queue" input field on the page.
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
String yarnQueue = param.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
// TODO This part represents the optional parameters input field on the page,
// where -conf, -archives, -files, -libjars, and -D can be specified.
if (StringUtils.isNotEmpty(others)) {
Top comments (0)