通过bulkload方式加载数据优点:与put方式相比
1.导入过程不占用Region资源
2.能快速导入海量的数据
3.节省内存
应该是业界将数据载入hbase常用方式之一,因此有必要学习掌握
实现步骤
步骤一 读取数据生成rdd
读入数据是面向行的表,一行有多个字段,需要转换成面向列的数据,构造keyValue对象,一定要注意key们要排序,比如user:age列要在user:gender列之前
需要设计行键保证行键唯一和避免数据都涌入一个region,如我的是按时间设计的,好几个月的数据,因此将数据按月预分区。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| val rdd = sc.textFile("file:///"+filePath) .flatMap(x=>getLineData(x,rowKeyBase,HBaseUtils.LOG_FIELD_NAMES)) .sortByKey()
def getLineData(line:String,rowkey:String,fieldNames: TreeMap[String, Int]): List[(ImmutableBytesWritable, KeyValue)] ={ val length = fieldNames.size val values:Array[String] = line.split("\\\t") if (null == values || values.length!=length) return Nil val rowKey = Bytes.toBytes(rowkey+values(1)+Random.nextInt(1000).toString) val writable = new ImmutableBytesWritable(rowKey) val columnFamily = Bytes.toBytes("detail") fieldNames.toList.map{ case (fieldName, fieldIndex) => val keyValue = new KeyValue( rowKey, columnFamily, Bytes.toBytes(fieldName), Bytes.toBytes(values(fieldIndex)) ) (writable, keyValue) } }
|
步骤二 配置输出HFile文件
输出前检查
检查HFile输出目录是否存在
1 2 3 4 5 6 7 8 9 10 11 12
|
val hbaseConfig: Configuration = HBaseUtils.getHBaseConfiguration("hbase","2181")
hbaseConfig.set("hbase.mapreduce.hfileoutputformat.table.name", "log")
val dfs = FileSystem.get(hbaseConfig) val outputPath: Path = new Path("hdfs://hbase:9000/hbase/log/"+rowKeyBase) if (dfs.exists(outputPath)) { dfs.delete(outputPath, true) } dfs.close()
|
1 2 3 4 5 6 7 8 9
| val conn = ConnectionFactory.createConnection(hbaseConfig) val htableName = TableName.valueOf("log") val table: Table = conn.getTable(htableName) HFileOutputFormat2.configureIncrementalLoad( Job.getInstance(hbaseConfig), table, conn.getRegionLocator(htableName) )
|
输出HFile文件
1 2 3 4 5 6 7 8
| rdd.sortBy(x=>(x._1, x._2.getKeyString), ascending = true) .saveAsNewAPIHadoopFile( "hdfs://hbase:9000/hbase/log/"+rowKeyBase, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hbaseConfig)
|
将HFile文件bulkload到hbase表分区当中
1 2 3 4
| val load = new LoadIncrementalHFiles(hbaseConfig) load.doBulkLoad(outputPath, conn.getAdmin, table, conn.getRegionLocator(htableName))
|
出现的问题
写入权限
可以将HFile要输出的文件位置chmod 777 /outputDir