一、项目采用maven构建,如下为pom.xml中引入的jar包
13 4.0.0 4 5com.slp 6HadoopDevelop 70.0.1-SNAPSHOT 8jar 9 10HadoopDevelop 11http://maven.apache.org 1213 16 17UTF-8 142.8.0 1518 19 5520 24org.apache.hadoop 21hadoop-common 22${hadoopVersion} 2325 29 30org.apache.hadoop 26hadoop-hdfs 27${hadoopVersion} 2831 35org.apache.hadoop 32hadoop-mapreduce-client-core 33${hadoopVersion} 3436 40 41org.apache.hadoop 37hadoop-client 38${hadoopVersion} 3942 48jdk.tools 43jdk.tools 441.8 45system 46${JAVA_HOME}/lib/tools.jar 4749 54junit 50junit 513.8.1 52test 53
二、输入文件
2014010114201401021620140103172014010410201401050620120106092012010732201201081220120109192012011023200101011620010102122001010310200101041120010105292013010619201301072220130108122013010929201301102320080101052008010216200801033720080104142008010516200701061920070107122007010812200701099920070110232010010114201001021620100103172010010410201001050620150106492015010722201501081220150109992015011023
三、代码实现
1 package com.slp.temperature; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 import com.slp.temperature.Temperature.TempMapper.TempReducer; 17 18 public class Temperature { 19 20 static class TempMapper extends Mapper{ 21 /** 22 * 四个泛型类型分别代表 23 * KeyIn Mapper的输入数据Key ,这里是每行文字的起始位置(0,12,...) 24 * ValueIn Mapper的输入数据的Value,这里是每行文字 25 * KeyOut Mapper的输出数据的Key,这里是每行文字中的年份 26 * ValueOut Mapper的输出数据的value,这里是每行文字中的气温 27 */ 28 @Override 29 protected void map(LongWritable key, Text value, Mapper .Context context) 30 throws IOException, InterruptedException { 31 // TODO Auto-generated method stub 32 //super.map(key, value, context); 33 //打印样本 34 System.out.println("Before Mapper : "+ key+","+value); 35 String line = value.toString(); 36 String year = line.substring(0, 4); 37 int temperature = Integer.parseInt(line.substring(8)); 38 context.write(new Text(year), new IntWritable(temperature)); 39 //map之后打印样本 40 System.out.println("After Mapper:" + new Text(year)+","+new IntWritable(temperature)); 41 } 42 /** 43 * 四个泛型类型分别代表 44 * KeyIn Mapper的输入数据Key ,这里是每行文字的年份 45 * ValueIn Mapper的输入数据的Value,这里是每行文字中的气温 46 * KeyOut Mapper的输出数据的Key,这里是不重复的年份 47 * ValueOut Mapper的输出数据的value,这里是这一年中的最高气温 48 */ 49 static class TempReducer extends Reducer { 50 51 @Override 52 protected void reduce(Text key, Iterable values, 53 Reducer .Context context) 54 throws IOException, InterruptedException { 55 // TODO Auto-generated method stub 56 //super.reduce(arg0, arg1, arg2); 57 int maxValue = Integer.MIN_VALUE; 58 StringBuffer sb = new StringBuffer(); 59 //取value中的最大值 60 for(IntWritable value : values){ 61 maxValue = Math.max(maxValue, value.get()); 62 sb.append(value).append(","); 63 } 64 //打印样本 65 System.out.println("Before Reduce:"+key+","+sb.toString()); 66 context.write(key, new IntWritable(maxValue)); 67 //打印样本 68 System.out.println("After Reduce : "+key+","+maxValue); 69 70 } 71 72 } 73 } 74 public static void main(String[] args) throws Exception { 75 //输入路径 76 String dst = "D:\\hadoopnode\\input\\temp.txt"; 77 //输出路径 78 String desout = "D:\\hadoopnode\\outtemp"; 79 Configuration conf = new Configuration(); 80 conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); 81 conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); 82 Job job = new Job(conf); 83 //如果需要打成jar运行,需要配置如下 84 job.setJarByClass(Temperature.class); 85 86 //job执行作业时输入和输出文件的路径 87 FileInputFormat.addInputPath(job, new Path(dst)); 88 FileOutputFormat.setOutputPath(job, new Path(desout)); 89 90 //指定自定义的Mapper和Reducer作为两个阶段的任务处理类 91 job.setMapperClass(TempMapper.class); 92 job.setReducerClass(TempReducer.class); 93 94 //设置最后输出结果的key和value的类型 95 job.setOutputKeyClass(Text.class); 96 job.setOutputValueClass(IntWritable.class); 97 98 //执行job直到完成 99 job.waitForCompletion(true);100 System.out.println("Finished");101 }102 }
四、输出结果
Before Mapper : 0,2014010114After Mapper:2014,14Before Mapper : 12,2014010216After Mapper:2014,16Before Mapper : 24,2014010317After Mapper:2014,17Before Mapper : 36,2014010410After Mapper:2014,10Before Mapper : 48,2014010506After Mapper:2014,6Before Mapper : 60,2012010609After Mapper:2012,9Before Mapper : 72,2012010732After Mapper:2012,32Before Mapper : 84,2012010812After Mapper:2012,12Before Mapper : 96,2012010919After Mapper:2012,19Before Mapper : 108,2012011023After Mapper:2012,23Before Mapper : 120,2001010116After Mapper:2001,16Before Mapper : 132,2001010212After Mapper:2001,12Before Mapper : 144,2001010310After Mapper:2001,10Before Mapper : 156,2001010411After Mapper:2001,11Before Mapper : 168,2001010529After Mapper:2001,29Before Mapper : 180,2013010619After Mapper:2013,19Before Mapper : 192,2013010722After Mapper:2013,22Before Mapper : 204,2013010812After Mapper:2013,12Before Mapper : 216,2013010929After Mapper:2013,29Before Mapper : 228,2013011023After Mapper:2013,23Before Mapper : 240,2008010105After Mapper:2008,5Before Mapper : 252,2008010216After Mapper:2008,16Before Mapper : 264,2008010337After Mapper:2008,37Before Mapper : 276,2008010414After Mapper:2008,14Before Mapper : 288,2008010516After Mapper:2008,16Before Mapper : 300,2007010619After Mapper:2007,19Before Mapper : 312,2007010712After Mapper:2007,12Before Mapper : 324,2007010812After Mapper:2007,12Before Mapper : 336,2007010999After Mapper:2007,99Before Mapper : 348,2007011023After Mapper:2007,23Before Mapper : 360,2010010114After Mapper:2010,14Before Mapper : 372,2010010216After Mapper:2010,16Before Mapper : 384,2010010317After Mapper:2010,17Before Mapper : 396,2010010410After Mapper:2010,10Before Mapper : 408,2010010506After Mapper:2010,6Before Mapper : 420,2015010649After Mapper:2015,49Before Mapper : 432,2015010722After Mapper:2015,22Before Mapper : 444,2015010812After Mapper:2015,12Before Mapper : 456,2015010999After Mapper:2015,99Before Mapper : 468,2015011023After Mapper:2015,23Before Reduce:2001,12,10,11,29,16,After Reduce : 2001,29Before Reduce:2007,23,19,12,12,99,After Reduce : 2007,99Before Reduce:2008,16,14,37,16,5,After Reduce : 2008,37Before Reduce:2010,10,6,14,16,17,After Reduce : 2010,17Before Reduce:2012,19,12,32,9,23,After Reduce : 2012,32Before Reduce:2013,23,29,12,22,19,After Reduce : 2013,29Before Reduce:2014,14,6,10,17,16,After Reduce : 2014,17Before Reduce:2015,23,49,22,12,99,After Reduce : 2015,99Finished