本文共 6867 字,大约阅读时间需要 22 分钟。
Hadoop MapReduce程序调试的痛点
在分布式计算领域,Hadoop MapReduce的程序调试无疑是一个令人头疼的过程。尤其在集群环境中运行时,代码错误或逻辑问题常常需要一遍遍地修改和打印日志才能定位问题。即便是小问题,调试成本也往往很高。而MapReduce框架本身的复杂性,尤其是Map和Reduce任务的参数由框架在运行时传入,进一步增加了调试的难度。
因此,拥有一个优秀的单元测试框架,能够帮助开发者尽早发现和修复问题,显得尤为重要。
MRUnit框架简介
MRUnit是Cloudera公司专为Hadoop MapReduce开发的单元测试框架,其API简洁实用。该框架根据测试对象类型提供了不同的Driver:
以下将以Temperature程序为例,展示如何使用MRUnit框架进行单元测试。
Temperature程序准备
为了更好地理解单元测试框架,我们准备了一个MapReduce程序,仍以Temperature为测试案例。该程序从文件中读取气象站数据,提取气温值进行统计。为了方便单元测试,我们将气象站ID设置为常量weatherStationId。以下是Temperature程序的代码:
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class Temperature extends Configured implements Tool { public static class TemperatureMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); int temperature = Integer.parseInt(line.substring(14, 19).trim()); if (temperature != -9999) { String weatherStationId = "weatherStationId"; context.write(new Text(weatherStationId), new IntWritable(temperature)); } } } public static class TemperatureReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; for (IntWritable val : values) { sum += val.get(); count++; } result.set(sum / count); context.write(key, result); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = new Job(conf, "temperature"); job.setJarByClass(Temperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(TemperatureMapper.class); job.setReducerClass(TemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://djt002:9000/weather/", "hdfs://djt002:9000/weather/out/" }; int ec = ToolRunner.run(new Configuration(), new Temperature(), args0); System.exit(ec); }} 单元测试准备
准备好Temperature类后,我们可以开始编写单元测试代码。以下是Mapper和Reducer的单元测试代码示例。
Mapper单元测试
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mrunit.mapreduce.MapDriver;import org.junit.Before;import org.junit.Test;import com.dajiangtai.hadoop.advance.Temperature;public class TemperatureMapperTest { private Mapper mapper; private MapDriver driver; @Before public void init() { mapper = new Temperature.TemperatureMapper(); driver = new MapDriver(mapper); } @Test public void test() throws IOException { String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; driver.withInput(new LongWritable(), new Text(line)) .withOutput(new Text("weatherStationId"), new IntWritable(200)) .runTest(); }} Reducer单元测试
import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;import org.junit.Before;import org.junit.Test;import com.dajiangtai.hadoop.advance.Temperature;public class TemperatureReduceTest { private Reducer reducer; private ReduceDriver driver; @Before public void init() { reducer = new Temperature.TemperatureReducer(); driver = new ReduceDriver(reducer); } @Test public void test() throws IOException { String key = "weatherStationId"; List values = new ArrayList(); values.add(new IntWritable(200)); values.add(new IntWritable(100)); driver.withInput(new Text("weatherStationId"), values) .withOutput(new Text("weatherStationId"), new IntWritable(150)) .runTest(); }} MapReduce单元测试
import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;import org.junit.Before;import org.junit.Test;import com.dajiangtai.hadoop.advance.Temperature;public class TemperatureTest { private Mapper mapper; private Reducer reducer; private MapReduceDriver driver; @Before public void init() { mapper = new Temperature.TemperatureMapper(); reducer = new Temperature.TemperatureReducer(); driver = new MapReduceDriver(mapper, reducer); } @Test public void test() throws RuntimeException, IOException { String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; String line2 = "1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999"; driver.withInput(new LongWritable(), new Text(line)) .withInput(new LongWritable(), new Text(line2)) .withOutput(new Text("weatherStationId"), new IntWritable(150)) .runTest(); }} 调试方法
调试Hadoop MapReduce程序的成本较高,特别是在集群环境中。使用Eclipse的Debug功能可以帮助开发者快速定位问题。右键点击Temperature项目,选择"Debug As" -> "Java Application",然后在程序中设置断点进入调试模式。调试过程与传统的Eclipse调试流程一致,不再详细说明。
转载地址:http://tkbyz.baihongyu.com/