最近在做一些基于mapreduce 操作hbase 表的工作,碰到了几个问题。
一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即hbase提供的HFileOutputFormat类。
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
1.reduce 在写的时候由于词排序问题导致程序运行异常。
java.io.IOException: Added a key not lexically larger than previous key=\x00\x04r100\x02f1c100\x00\x00\x01?'c \x1E\x04, lastkey=\x00\x03r99\x02f1c99\x00\x00\x01?'c \x1E\x04
at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:207)
at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:324)
at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:289)
at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:1197)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:168)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:124)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at com.gump.test.HBaseHFileReducer.reduce(HBaseHFileReducer.java:21)
at com.gump.test.HBaseHFileReducer.reduce(HBaseHFileReducer.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Unknown Source)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
这个问题,是由于hbase的row key 是基于词典排序,比如说reduce 写入 hbase 的row key 顺序是,r10,r11,r00,则会报上述异常。考虑到map有排序功能,于是将其rowkey作为map的输出key.
2.无论是map还是reduce作为最终的输出结果,输出的key和value的类型应该是:<ImmutableBytesWritable, KeyValue> 或者< ImmutableBytesWritable, Put>。否则报这样的错误:
java.lang.IllegalArgumentException: Can’t read partitions file
…
Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable
3.reduce 的任务槽数为1.
a. 进入hbase shell > create 't1','f1'
b.运行类
package com.gump.test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class HbaseHFileDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "ganliang"); job.setJarByClass(HbaseHFileDriver.class); job.setMapperClass(HBaseHFileMapper.class); job.setReducerClass(HBaseHFileReducer.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); HFileOutputFormat.setOutputPath(job, new Path(args[1])); Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum","bfdbjc2:2181,bfdbjc3:2181,bfdbjc4:2181"); HBASE_CONFIG.set("hbase.rootdir", "hdfs://bfdbjc1:12000/hbase"); HBASE_CONFIG.set("zookeeper.znode.parent","/hbase"); String tableName = "t1"; HTable htable = new HTable(HBASE_CONFIG, tableName); HFileOutputFormat.configureIncrementalLoad(job, htable); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
c.Mapper
package com.gump.test; import java.io.IOException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> { private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(); @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { String rowkey = value.toString().split(":")[0]; immutableBytesWritable.set(Bytes.toBytes(rowkey)); context.write(immutableBytesWritable, value); System.out.println(rowkey+" "+value); } }
d.reducer
package com.gump.test; import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> { protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String value = ""; while (values.iterator().hasNext()) { value = values.iterator().next().toString(); if (value != null && !"".equals(value)) { KeyValue kv = createKeyValue(value.toString()); if (kv != null){ context.write(key, kv); } } } } private KeyValue createKeyValue(String str) { String[] strs = str.split(":"); if (strs.length < 4) return null; String row = strs[0]; String family = strs[1]; String qualifier = strs[2]; String value = strs[3]; System.out.println(strs[0]+" "+strs[1]+" "+strs[2]+" "+strs[3]); return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qualifier), System.currentTimeMillis(), Bytes.toBytes(value)); } }
d:导入
hadoop jar /usr/local/hbase-0.94.6.1/hbase-0.94.6.1.jar completebulkload /user/work/b2_output/ t1
e:数据
r10:f1:c10:value10
r99:f1:c99:value99
r100:f1:c100:value100
r101:f1:c101:value101
相关推荐
#资源达人分享计划#
基于Hadoop的mapreduce 在hbase上的使用,基于Hadoop的mapreduce 在hbase上的使用
提出一种基于MapReduce技术的贝叶斯垃圾邮件过滤机制,一方面对传统贝叶斯过滤技术进行改进,另一方面利用MapReduce模型的海量数据处理优势优化邮件集训练与学习。实验,较之目前流行的传统贝叶斯算法、K最近邻(NN算法...
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
该案例中主要使用MapReduce作为处理组件进行数据处理,实现的案例有如通过javaapi实现hbase数据写入hdfs、hbase表数据复制到另一个表中等操作 对应(《HBase分布式存储系统应用》胡鑫喆 张志刚著)教材中案例
本文详细描述了基于MapReduce的交互可视化平台一系列方法
NULL 博文链接:https://jsh0401.iteye.com/blog/2096103
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
基于MapReduce+Pandas的电影排名与推荐以及数据分析与可视化展示
基于mapreduce的小型电影推荐系统,使用javaweb的方式实现,
基于MapReduce的图算法
基于MapReduce的简单倒排索引的建立
基于MapReduce的Apriori算法,关联规则并行化思路与解决方案。该方法具有一定的借鉴意义,可以用来进行学术研究。
基于Mapreduce的朴素贝叶斯分类
HBase MapReduce完整实例.rar
基于MapReduce的分布式智能搜索引擎框架研究.pdf
基于mapreduce的聚类算法研究,云计算环境下基于hadoop mapreduce编程模型下,聚类算法实现论文。
主要为大家详细介绍了基于MapReduce实现决策树算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
基于MapReduce设计的社会网络分析系统
该方法依托Hadoop框架组织计算资源,基于MapReduce模式从大规模空间数据集中高效创建出空间权重:大空间数据首先被分为多个数据块,然后将映射器分布给计算集群中的不同节点,以便在数据中寻找出空间对象的相邻对象...