1、《hadoop 实战》书上的例子并不能运行成功。以下我的代码,可以运行成功:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool{ public static class DataJoinMapper extends DataJoinMapperBase { @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { return new Text(aRecord.getData().toString().split(",")[0]); } @Override protected Text generateInputTag(String inputFiles) { return new Text(inputFiles); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedMapOutput ret = new TaggedWritable((Text)value); ret.setTag(this.inputTag); return ret; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(""); this.data = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public void write(DataOutput data) throws IOException { this.tag.write(data); this.data.write(data); } @Override public void readFields(DataInput in) throws IOException { this.data.readFields(in); this.tag.readFields(in); } @Override public Writable getData() { return data; } } public static class DataJoinReducer extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { if (tags.length < 2) { return null; } StringBuilder joinedStr = new StringBuilder(); for (int i = 0; i < values.length; i++) { if (i > 0) { joinedStr.append(","); } TaggedWritable tw = (TaggedWritable)values[i]; String line = ((Text)tw.getData()).toString(); String[] tokens = line.split(",",2); joinedStr.append(tokens[1]); } TaggedWritable ret = new TaggedWritable(new Text(joinedStr.toString())); ret.setTag((Text)tags[0]); return ret; } } @Override public int run(String[] args) throws Exception { JobConf job = new JobConf(getConf()); job.setJarByClass(getClass()); job.setJobName("datajoin"); Path in1 = new Path("/join/customers/"); Path in2 = new Path("/join/orders/"); FileInputFormat.addInputPath(job, in1); FileInputFormat.addInputPath(job, in2); Path out = new Path("/join/output/"); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(DataJoinMapper.class); job.setReducerClass(DataJoinReducer.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator",","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new DataJoin(),args); System.exit(res); } }
可以参考:
http://www.cnblogs.com/aprilrain/archive/2013/01/28/2880460.html
相关推荐
实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 模仿...
实例154 使用静态成员变量计算内存中实例化的对象数目 239 实例155 实现加减乘除的方法 240 8.3 面向对象的设计模式 241 实例156 Singleton单例模式 242 实例157 招聘(简单工厂模式) 243 实例158...
实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 模仿...
实例154 使用静态成员变量计算内存中实例化的对象数目 239 实例155 实现加减乘除的方法 240 8.3 面向对象的设计模式 241 实例156 Singleton单例模式 242 实例157 招聘(简单工厂模式) 243 ...
实例226 携子之手 与子偕老(join) 415 实例227 线程让步(Yield) 417 实例228 会走动的钟(多线程) 419 实例229 变形金刚中的守护神(守护线程) 424 实例230 查看JVM中所有的线程的活动状况 426 实例231 ...
4.4.5 join函数 4.4.6 map函数 4.4.7 pack和unpack函数 4.4.8 pop函数 4.4.9 push函数 4.4.10 shift函数 4.4.11 splice函数 4.4.12 split函数 4. 4.13 sort函数 4.4.14 reverse函数 ...
需要对统计报表进行处理,将手机号所在的归属地加入到统计报表中,使用pandas提供的join功能来实现,代码如下: #coding=utf-8 from pandas import Series,DataFrame import pandas as pd #reader1 = pd.read_csv('...
实例很丰富详细,初学必备 \EXAMPLES ├─ADO │ ├─BRIEFCASE │ └─SHAPE ├─APPEVENTS ├─APPS │ ├─AUTOCON │ ├─AUTOSRV │ ├─CANVAS │ ├─COLORDLG │ ├─CURSORS │ ├─DOODLE │ ├─FINDREP ...
7.2.9. MySQL如何优化LEFT JOIN和RIGHT JOIN 7.2.10. MySQL如何优化嵌套Join 7.2.11. MySQL如何简化外部联合 7.2.12. MySQL如何优化ORDER BY 7.2.13. MySQL如何优化GROUP BY 7.2.14. MySQL如何优化LIMIT 7.2.15. ...
实例如下所示: >>> import pandas as pd >>> df = pd.DataFrame({x:['a','b','c','d'],y:['aa','bb','cc','dd'],z:['aaa','bbb','ccc','ddd']}) >>> df x y z 0 a aa aaa 1 b bb bbb 2 c cc ccc 3 d dd ddd 【1】...
通过一个实例给大家分享了MySQL Sending data表查询慢问题解决办法。 最近在代码优化中,发现了一条sql语句非常的慢,于是就用各种方法进行排查,最后终于找到了原因。 一、事故现场 SELECT og.goods_barcode, og....
7.2.9. MySQL如何优化LEFT JOIN和RIGHT JOIN 7.2.10. MySQL如何优化嵌套Join 7.2.11. MySQL如何简化外部联合 7.2.12. MySQL如何优化ORDER BY 7.2.13. MySQL如何优化GROUP BY 7.2.14. MySQL如何优化LIMIT ...
print '\n'.join(['%s:%s' % item for item in obj.__dict__.items()]) Python logger对象属性(由上述函数获取的) name:get_data parent: handlers:[] level:10 disabled:1 #当前的logger是否有效,默认为0 m
功能:扫描当前目录下所有CSV文件并对其中文件进行统计,...all_files=glob.glob(os.path.join(input_path,'sales_*')) all_data_frames=[] for file in all_files: data_frame=pd.read_csv(file,index_col=None) t
本文实例讲述了Django框架文件上传与自定义图片上传路径、上传文件名操作。分享给大家供大家参考,具体如下: 文件上传 1、创建上传文件夹 在static文件夹下创建uploads用于存储接收上传的文件 在settings中配置, ...
{{ message.split('').reverse().join('') }} 实例 1 中模板变的很复杂起来,也不容易看懂理解。 接下来我们看看使用了计算属性的实例: 实例 2 原始字符串: {{ message }} 计算后反转字符串: {{ ...
本文实例讲述了cakephp2.X多表联合查询join及使用分页查询的方法。分享给大家供大家参考,具体如下: 格式化参数: public function getconditions($data){ $this->loadModel("Cm.LoginHistory"); $conditions = ...
1. 数据准备 ...base_dir = 'E:/python learn/dog_and_cat/data/' # 训练、验证数据集的目录 train_dir = os.path.join(base_dir, 'train') validation_dir = os.path.join(base_dir, 'validation') test_dir