通过bulkload方式加载数据优点:与put方式相比
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
应该是业界将数据载入hbase常用方式之一,因此有必要学习掌握

实现步骤

步骤一 读取数据生成rdd

读入数据是面向行的表,一行有多个字段,需要转换成面向列的数据,构造keyValue对象,一定要注意key们要排序,比如user:age列要在user:gender列之前
需要设计行键保证行键唯一和避免数据都涌入一个region,如我的是按时间设计的,好几个月的数据,因此将数据按月预分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  val rdd = sc.textFile("file:///"+filePath)
.flatMap(x=>getLineData(x,rowKeyBase,HBaseUtils.LOG_FIELD_NAMES))
.sortByKey()
//处理每一条记录生成keyvalue对象
def getLineData(line:String,rowkey:String,fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] ={
val length = fieldNames.size
val values:Array[String] = line.split("\\\t")
if (null == values || values.length!=length) return Nil
//println(rowkey+values(1)+Random.nextInt(100000).toString)
val rowKey = Bytes.toBytes(rowkey+values(1)+Random.nextInt(1000).toString)
val writable = new ImmutableBytesWritable(rowKey)
val columnFamily = Bytes.toBytes("detail")
fieldNames.toList.map{
case (fieldName, fieldIndex) =>
// KeyValue实例对象
val keyValue = new KeyValue(
rowKey, //
columnFamily, //
Bytes.toBytes(fieldName), //
Bytes.toBytes(values(fieldIndex)) //
)
// 返回
(writable, keyValue)
}
}

步骤二 配置输出HFile文件

输出前检查

检查HFile输出目录是否存在

1
2
3
4
5
6
7
8
9
10
11
12
// TODO:构建Job,设置相关配置信息,主要为输出格式
// a. 读取配置信息
val hbaseConfig: Configuration = HBaseUtils.getHBaseConfiguration("hbase","2181")
// Configuration parameter hbase.mapreduce.hfileoutputformat.table.name cannot be empty
hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", "log")
// b. 如果输出目录存在,删除
val dfs = FileSystem.get(hbaseConfig)
val outputPath: Path = new Path("hdfs://hbase:9000/hbase/log/"+rowKeyBase)
if (dfs.exists(outputPath)) {
dfs.delete(outputPath, true)
}
dfs.close()

配置HFileOutputFormat2

1
2
3
4
5
6
7
8
9
// TODO: 配置HFileOutputFormat2输出
val conn = ConnectionFactory.createConnection(hbaseConfig)
val htableName = TableName.valueOf("log")
val table: Table = conn.getTable(htableName)
HFileOutputFormat2.configureIncrementalLoad(
Job.getInstance(hbaseConfig), //
table, //
conn.getRegionLocator(htableName) //
)

输出HFile文件

1
2
3
4
5
6
7
8
// TODO: 3. 保存数据为HFile文件//先排序
rdd.sortBy(x=>(x._1, x._2.getKeyString), ascending = true)
.saveAsNewAPIHadoopFile(
"hdfs://hbase:9000/hbase/log/"+rowKeyBase,
classOf[ImmutableBytesWritable], //
classOf[KeyValue], //
classOf[HFileOutputFormat2], //
hbaseConfig)

将HFile文件bulkload到hbase表分区当中

1
2
3
4
// TODO:4. 将输出HFile加载到HBase表中
val load = new LoadIncrementalHFiles(hbaseConfig)
load.doBulkLoad(outputPath, conn.getAdmin, table,
conn.getRegionLocator(htableName))

出现的问题

写入权限
可以将HFile要输出的文件位置chmod 777 /outputDir