`
bo_hai
  • 浏览: 554488 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
社区版块
存档分类
最新评论

DataJoin 实例

 
阅读更多

1、《hadoop 实战》书上的例子并不能运行成功。以下我的代码,可以运行成功:

 

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class DataJoin extends Configured implements Tool{

	public static class DataJoinMapper extends DataJoinMapperBase {

		@Override
		protected Text generateGroupKey(TaggedMapOutput aRecord) {
			return new Text(aRecord.getData().toString().split(",")[0]);
		}

		@Override
		protected Text generateInputTag(String inputFiles) {
			return new Text(inputFiles);  
		}

		@Override
		protected TaggedMapOutput generateTaggedMapOutput(Object value) {
			TaggedMapOutput ret = new TaggedWritable((Text)value);  
			ret.setTag(this.inputTag);
			return ret;
		}
	}
	
	public static class TaggedWritable extends TaggedMapOutput {

		private Writable data;
		
		public TaggedWritable() {
			this.tag = new Text("");
			this.data = new Text("");
		}
		
		public TaggedWritable(Writable data) {
			this.tag = new Text("");
			this.data = data;
		}
		
		@Override
		public void write(DataOutput data) throws IOException {
			this.tag.write(data);
			this.data.write(data);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.data.readFields(in);
			this.tag.readFields(in);
		}

		@Override
		public Writable getData() {
			return data;
		}
		
	}
	
	public static class DataJoinReducer extends DataJoinReducerBase {

		@Override
		protected TaggedMapOutput combine(Object[] tags, Object[] values) {
			if (tags.length < 2) {
				return null;
			}
			StringBuilder joinedStr = new StringBuilder();
			for (int i = 0; i < values.length; i++) {
				if (i > 0) {
					joinedStr.append(",");
				}
				TaggedWritable tw = (TaggedWritable)values[i];
				String line = ((Text)tw.getData()).toString();
				String[] tokens = line.split(",",2);
				joinedStr.append(tokens[1]);
			}
			TaggedWritable ret = new TaggedWritable(new Text(joinedStr.toString()));
			ret.setTag((Text)tags[0]);
			return ret;
		}
		
	}
	
	@Override
	public int run(String[] args) throws Exception {
		JobConf job = new JobConf(getConf());
		job.setJarByClass(getClass());
		job.setJobName("datajoin");
		Path in1 = new Path("/join/customers/");
		Path in2 = new Path("/join/orders/");
		FileInputFormat.addInputPath(job, in1);
		FileInputFormat.addInputPath(job, in2);
		Path out = new Path("/join/output/");
		FileOutputFormat.setOutputPath(job, out);
		
		job.setMapperClass(DataJoinMapper.class);
		job.setReducerClass(DataJoinReducer.class);
		
		job.setInputFormat(TextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TaggedWritable.class);
		
		job.set("mapred.textoutputformat.separator",",");
		
		JobClient.runJob(job);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new DataJoin(),args);
		System.exit(res);
	}
}

 可以参考:

  http://www.cnblogs.com/aprilrain/archive/2013/01/28/2880460.html

  http://blog.csdn.net/jokes000/article/details/7080551

分享到:
评论
1 楼 bo_hai 2013-12-12  
结果如下:
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009

相关推荐

    java范例开发大全

    实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 模仿...

    Java范例开发大全 (源程序)

     实例154 使用静态成员变量计算内存中实例化的对象数目 239  实例155 实现加减乘除的方法 240  8.3 面向对象的设计模式 241  实例156 Singleton单例模式 242  实例157 招聘(简单工厂模式) 243  实例158...

    java范例开发大全(pdf&源码)

    实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 模仿...

    java范例开发大全源代码

     实例154 使用静态成员变量计算内存中实例化的对象数目 239  实例155 实现加减乘除的方法 240  8.3 面向对象的设计模式 241  实例156 Singleton单例模式 242  实例157 招聘(简单工厂模式) 243  ...

    Java范例开发大全(全书源程序)

    实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 ...

    Perl 实例精解(第三版).pdf

    4.4.5 join函数 4.4.6 map函数 4.4.7 pack和unpack函数 4.4.8 pop函数 4.4.9 push函数 4.4.10 shift函数 4.4.11 splice函数 4.4.12 split函数 4. 4.13 sort函数 4.4.14 reverse函数 ...

    使用pandas对两个dataframe进行join的实例

    需要对统计报表进行处理,将手机号所在的归属地加入到统计报表中,使用pandas提供的join功能来实现,代码如下: #coding=utf-8 from pandas import Series,DataFrame import pandas as pd #reader1 = pd.read_csv('...

    C++Builder6.0自带VCL控件使用实例.rar

    实例很丰富详细,初学必备 \EXAMPLES ├─ADO │ ├─BRIEFCASE │ └─SHAPE ├─APPEVENTS ├─APPS │ ├─AUTOCON │ ├─AUTOSRV │ ├─CANVAS │ ├─COLORDLG │ ├─CURSORS │ ├─DOODLE │ ├─FINDREP ...

    MySQL 5.1中文手冊

    7.2.9. MySQL如何优化LEFT JOIN和RIGHT JOIN 7.2.10. MySQL如何优化嵌套Join 7.2.11. MySQL如何简化外部联合 7.2.12. MySQL如何优化ORDER BY 7.2.13. MySQL如何优化GROUP BY 7.2.14. MySQL如何优化LIMIT 7.2.15. ...

    对dataframe进行列相加,行相加的实例

    实例如下所示: &gt;&gt;&gt; import pandas as pd &gt;&gt;&gt; df = pd.DataFrame({x:['a','b','c','d'],y:['aa','bb','cc','dd'],z:['aaa','bbb','ccc','ddd']}) &gt;&gt;&gt; df x y z 0 a aa aaa 1 b bb bbb 2 c cc ccc 3 d dd ddd 【1】...

    深入分析MySQL Sending data查询慢问题

    通过一个实例给大家分享了MySQL Sending data表查询慢问题解决办法。 最近在代码优化中,发现了一条sql语句非常的慢,于是就用各种方法进行排查,最后终于找到了原因。 一、事故现场 SELECT og.goods_barcode, og....

    MYSQL中文手册

    7.2.9. MySQL如何优化LEFT JOIN和RIGHT JOIN 7.2.10. MySQL如何优化嵌套Join 7.2.11. MySQL如何简化外部联合 7.2.12. MySQL如何优化ORDER BY 7.2.13. MySQL如何优化GROUP BY 7.2.14. MySQL如何优化LIMIT ...

    python日志记录模块实例及改进

    print '\n'.join(['%s:%s' % item for item in obj.__dict__.items()]) Python logger对象属性(由上述函数获取的) name:get_data parent: handlers:[] level:10 disabled:1 #当前的logger是否有效,默认为0 m

    Python利用pandas计算多个CSV文件数据值的实例

    功能:扫描当前目录下所有CSV文件并对其中文件进行统计,...all_files=glob.glob(os.path.join(input_path,'sales_*')) all_data_frames=[] for file in all_files: data_frame=pd.read_csv(file,index_col=None) t

    Django框架文件上传与自定义图片上传路径、上传文件名操作分析

    本文实例讲述了Django框架文件上传与自定义图片上传路径、上传文件名操作。分享给大家供大家参考,具体如下: 文件上传 1、创建上传文件夹 在static文件夹下创建uploads用于存储接收上传的文件 在settings中配置, ...

    Vue.js 计算属性

    {{ message.split('').reverse().join('') }} 实例 1 中模板变的很复杂起来,也不容易看懂理解。 接下来我们看看使用了计算属性的实例: 实例 2 原始字符串: {{ message }} 计算后反转字符串: {{ ...

    cakephp2.X多表联合查询join及使用分页查询的方法

    本文实例讲述了cakephp2.X多表联合查询join及使用分页查询的方法。分享给大家供大家参考,具体如下: 格式化参数: public function getconditions($data){ $this-&gt;loadModel("Cm.LoginHistory"); $conditions = ...

    keras分类之二分类实例(Cat and dog)

    1. 数据准备 ...base_dir = 'E:/python learn/dog_and_cat/data/' # 训练、验证数据集的目录 train_dir = os.path.join(base_dir, 'train') validation_dir = os.path.join(base_dir, 'validation') test_dir

Global site tag (gtag.js) - Google Analytics