package cn.luxh.app; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; /** * @author Luxh * */ public class WordStat { /** * TableMapper<Text,IntWritable> Text:输出的key类型,IntWritable:输出的value类型 */ public static class MyMapper extends TableMapper<Text,IntWritable>{ private static IntWritable one = new IntWritable(1); private static Text word = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //表里面只有一个列族,所以我就直接获取每一行的值 String words = Bytes.toString(value.list().get(0).getValue()); StringTokenizer st = new StringTokenizer(words); while (st.hasMoreTokens()) { String s = st.nextToken(); word.set(s); context.write(word, one); } } } /** * TableReducer<Text,IntWritable> Text:输入的key类型,IntWritable:输入的value类型,ImmutableBytesWritable:输出类型 */ public static class MyReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { sum+=val.get(); } //添加一行记录,每一个单词作为行键 Put put = new Put(Bytes.toBytes(key.toString())); //在列族result中添加一个标识符num,赋值为每个单词出现的次数 //String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式 //然后再转二进制存到hbase。 put.add(Bytes.toBytes("result"), Bytes.toBytes("num"), Bytes.toBytes(String.valueOf(sum))); context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf,"wordstat"); job.setJarByClass(Blog.class); Scan scan = new Scan(); //指定要查询的列族 scan.addColumn(Bytes.toBytes("content"),null); //指定Mapper读取的表为word TableMapReduceUtil.initTableMapperJob("word", scan, MyMapper.class, Text.class, IntWritable.class, job); //指定Reducer写入的表为stat TableMapReduceUtil.initTableReducerJob("stat", MyReducer.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
相关推荐
MapReduce goole MapReduce编程模型
Mapreduce编程模型是Google采用的云计算编程模式,本论文阐述了Mapreduce编程模型
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
Hadoop-0.20.0-HDFS+MapReduce+Hive+HBase十分钟快速入门
基于MapReduce编程模型的分布式并行计算系统的设计和实现,何皓星,李昕,大数据处理技术对互联网应用本身和企业都具有非常重大的意义。随着互联网业务数量的快速增长,系统中积累的数据也越来越多。如何
大数据及MapReduce编程模型94.pptx
HDFS+MapReduce+Hive+HBase十分钟快速入门
HBase – Hadoop Database,是一...Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。
3. 查看 Hadoop 自带的 MR-App 单词计数源代码 WordCount.java,在 Eclipse 项目 MapReduceExample 下建立新包 com.xijing.mapreduce,模仿内置的 WordCount 示例,自己编写一个 WordCount 程序,最后打包成 JAR ...
HDFS+MapReduce+Hive+HBase十分钟快速入门.pdf
主要为大家详细介绍了通用MapReduce程序复制HBase表数据,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
MapReduce编程模型下的上下文离群数据挖掘算法.pdf
Hadoop技术-MapReduce编程模型.pptx
HDFS+MapReduce+Hive+HBase十分钟快速入门,包括这几个部分的简单使用
Hadoop技术MapReduce编程模型共8页.pdf.zip