一、理论知识
(一)加载数据到HBase的三种方法:
通过MR job,使用TableOutputFormat加载到表中。(效率较低)
核心的原理还是使用htable的put方法,不过由于使用了mapreduce分布式提交到hbase,速度比单线程效率高出许多。
通过客户端API,写入表中。(效率较低)
通过Bulk load 运行MR job将数据输出为hbase内部格式,再加载数据到集群。(使用更少的CPU和网络资源)
(二)Bulk load两步走
通过MR job,使用HFileOutputFormat2,生成StoreFiles。
每一个输出的HFile 文件,都在一个单独的region内,所以需要使用TotalOrderPartitioner 进行分区。
保证map任务的输出为相互不交叉的主键空间范围,也就是对应hbase中region里的主键范围。
完成文件加载,将数据导入hbase集群中。
完成数据加载有两种方式:
#命令行--- completebulkload
#代码---ImportTsv.java HFileOutputFormat 或 LoadIncrementalHFiles
(三)生成HFile的方法
命令方式:
使用 importtsv 工具将 TSV 格式数据转换为 HFile。(自动生成MR任务)
代码方式:
2.1 通过HFileOutputFormat2 类编写 MapReduce 程序来生成 HFile 。
案例1:https://github.com/jrkinley/hbase-bulk-import-example
案例2:https://www.cnblogs.com/smartloli/p/9501887.html
2.2 Spark bulk load:Using thin record bulk load.
案例1:(Hbase官网文档)Chapter 110 Bulk Load
(四)加载HFile的方法
命令方式:
1.1 使用 completebulkload 将 HFile 导入到 HBase中。
1.2
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles HDFS_Dir_Path HBase_table_name
代码方式:
调用LoadIncrementalHFiles 的doBulkLoad 方法来导入。
(五)常用命令
创建HBase表格,并做预分区:
hbase org.apache.hadoop.hbase.util.RegionSplitter pre_split_table HexStringSplit -c pre_split_nums -f column_family
pre_split_table:需要创建和预分区的表格名称。
pre_split_nums:分区的个数。
column_family:列族名称。
HBase中表的行数统计:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter table_name
table_name:Hbase表的名称。
将HFile文件导入HBase集群中,生成表数据:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hdfs_dir_path table_name
hdfs_dir_path:HDFS中存储HFile文件的目录(该目录下一级应该还有一个以列族名命名的子目录)
table_name:HBase表的名称
二、代码实操
需求:下载纽约出租车数据(https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page),将CSV文件导入到HBase中。
方法一:命令行方式
#将纽约出租车数据文件导入hbase表格(taxi_nyc_2013_01)中
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=","
-Dimporttsv.columns=HBASE_ROW_KEY,taxi:medallion,taxi:hack_license,taxi:vendor_id,
taxi:rate_code,taxi:store_and_fwd_flag,taxi:pickup_datetime,taxi:dropoff_datetime,
taxi:passenger_count,taxi:trip_time_in_secs,taxi:trip_distance,taxi:pickup_longitude,
taxi:pickup_latitude,taxi:dropoff_longitude,taxi:dropoff_latitude
taxi_nyc_2013_01 /hdfs_nyc/2013_trip_data_1.csv
【问题:】 #.纽约出租车数据中没有合适的可以作为ROW_KEY的字段,使用以上命令写入,没办法自定义ROW_KEY,因为主键重复,数据会覆盖。 #.导入CSV时,字段顺序会打乱,命令行中的columns顺序需要与导入的字段顺序一致。
命令行的其他方式,或者使用:
(生成hfile)hadoop jar hbase-version.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,c1,c2 -Dimporttsv.bulk.output=tmp
hbase_table hdfs_file
(导入hbase)hadoop jar hbase-version.jar completebulkload /user/hadoop/tmp/cf hbase_table
命令行的其他方式,或者使用:
(直接生成并导入)hadoop jar hbase-version.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,c1,c2 hbase_table hdfs_file
方法二:Spark Bulk Load
import java.io.IOException
import java.util.UUID
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.spark.{ByteArrayWrapper, FamiliesQualifiersValues, FamilyHFileWriteOptions, HBaseContext}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author: gh
* @Description:
*【目的】将CSV文件导入到Hbase table中。
* 【实现思路】bulk load:
* step1:读数据,通过MR Job生成storefiles
* step2:将hfile加载到hbase集群中。
*/
object SparkBulkLoad2 {
System.setProperty("hadoop.home.dir", "C:/hadoop-2.6.0")
val sc = new SparkContext(new SparkConf()
.setAppName("write_csv_to_hbase_table")
.setMaster("local[*]")
.set("spark.shuffle.file.buffer","2048k")
.set("spark.executor.cores","2")
)
//HBase配置
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "IP:2181")
config.setInt("zookeeper.recovery.retry", 0)
config.setInt("hbase.client.retries.number", 0)
val hbaseContext = new HBaseContext(sc, config)
//hbase 表格
val columnFamily = "taxi"
//TODO:need to change
val table_name = "taxi_nyc_2013_03"
val COLUMNS = Array(Bytes.toBytes("medallion"), Bytes.toBytes("hack_license"),
Bytes.toBytes("vendor_id"), Bytes.toBytes("rate_code"),
Bytes.toBytes("store_and_fwd_flag"), Bytes.toBytes("pickup_datetime"),
Bytes.toBytes("dropoff_datetime"), Bytes.toBytes("passenger_count"),
Bytes.toBytes("trip_time_in_secs"), Bytes.toBytes("trip_distance"),
Bytes.toBytes("pickup_longitude"), Bytes.toBytes("pickup_latitude"),
Bytes.toBytes("dropoff_longitude"), Bytes.toBytes("dropoff_latitude"))
val tableName = createTable(ConnectionFactory.createConnection(config),table_name,columnFamily)
//HFile的生成路径
//TODO:need to change
val path = new Path("D:\\hbase_mr_nyc3")
//需要读取的文件
val files = "D:\\mydownload\\NYC_taxi_data\\2013_trip_data_3.csv"
def main(args: Array[String]): Unit = {
val start = System.currentTimeMillis()
generateHFiles(files)
val end = System.currentTimeMillis()
println("耗时: "+(end - start)/(1000*60).toDouble+" min...")
//loadHFiles
}
/**
* 生成hbase table的rowkey
* @param baseInfo rowkey中的组成部分
* @return
*/
def generateRowKey(baseInfo: String): String = {
val uuid = UUID.randomUUID.toString
val idx = uuid.lastIndexOf("-")
new StringBuffer(baseInfo).append("_").append(uuid.substring(idx + 1)).toString
}
/**
* 读取文件并生成hfile
* @param tmp
* @param fileName
*/
def generateHFiles(fileName:String): Unit ={
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions.GenericHBaseRDDFunctions
val column_count = COLUMNS.length
//获取数据
val lines = sc.textFile(fileName,10)
lines.filter(line=>line.split(",").length==column_count).map(line=>{
var arr = line.split(",")
val familyQualifiersValues = new FamiliesQualifiersValues
for(i <- 0 until column_count){
familyQualifiersValues +=(Bytes.toBytes(columnFamily), COLUMNS(i), Bytes.toBytes(arr(i)))
}
val rowkey = generateRowKey(arr(0))
(new ByteArrayWrapper(Bytes.toBytes(rowkey)), familyQualifiersValues)
}).hbaseBulkLoadThinRows(hbaseContext,tableName,
t => {t},
path.toUri.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,HConstants.DEFAULT_MAX_FILE_SIZE)
}
/**
* 将上一步生成的hfile导入hbase。
* 【注意】
* 1.HDFS文件夹中需要有一个子目录,名称为hbase的列族名称。这和生成hfile时得到的目录是一致的。
* 2.运行该方法(loadHFiles)和运行命令是等价的。命令如下:
* hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hbase_hfile table_name
* 3.如果报错:Bulk load aborted with some files not yet loaded。原因可能是权限问题,需要以hdfs用户运行。
* 4.将hfile导入hbase之后,原来HDFS路径下的文件会被自动移除。
*/
def loadHFiles: Unit ={
val path1 = new Path("hdfs://IP:8020/hbase_mr_nyc/")
//批量导入
val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(tableName)
val load = new LoadIncrementalHFiles(config)
//将文件移入到hbase
load.doBulkLoad(path1,conn.getAdmin, table, conn.getRegionLocator(tableName))
}
def createTable(conn: Connection, tableName: String, cf: String): TableName = {
val tn = TableName.valueOf(tableName)
var admin:Admin = null
try {
admin = conn.getAdmin
val flag = admin.tableExists(tn)
if (!flag) { //表不存在,则创建
val builder = TableDescriptorBuilder.newBuilder(tn)
val cfd = ColumnFamilyDescriptorBuilder.of(cf)
builder.addColumnFamily(cfd)
admin.createTable(builder.build)
}
} catch {
case e: IOException =>
e.printStackTrace()
}
tn
}
}
- 方法三:HFileOutputFormat2
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.UUID;
/**
* @Author: gh
* @Description:
* 【目的】将CSV文件导入到Hbase table中。
* 【实现思路】bulk load:
* step1:读数据,通过MR Job生成storefiles
* step2:将hfile加载到hbase集群中。
* 【问题!!!】
* 1.在本地运行失败:java.io.IOException: (null) entry in command string: null chmod 0700
* 2.在集群上运行失败:java.lang.ClassNotFoundException: org.apache.hadoop.hbase.TableName
*/
public class FileToHbaseJob {
//输出的表名
static final String OUTPUT_TABLE="taxi_nyc_2013_04";
//输入的文件名:hdfs路径
static final Path INPUT_FILE_PATH = new Path("hdfs://IP:8020/testdata/2013_trip_data_test.csv");
//输出hfile的路径
static final Path OUTPUT_HFILE_PATH = new Path("hdfs://IP:8020/tmp/hfile");
//表的字段名
static final byte[][]COLUMNS = {Bytes.toBytes("medallion"),Bytes.toBytes("hack_license"),
Bytes.toBytes("vendor_id"),Bytes.toBytes("rate_code"),
Bytes.toBytes("store_and_fwd_flag"),Bytes.toBytes("pickup_datetime"),
Bytes.toBytes("dropoff_datetime"),Bytes.toBytes("passenger_count"),
Bytes.toBytes("trip_time_in_secs"),Bytes.toBytes("trip_distance"),
Bytes.toBytes("pickup_longitude"),Bytes.toBytes("pickup_latitude"),
Bytes.toBytes("dropoff_longitude"),Bytes.toBytes("dropoff_latitude")};
//表的列族
static final String CF = "taxi";
public static void main(String[] args) {
//AccessControlException: Permission denied
System.setProperty("HADOOP_USER_NAME", "hdfs");
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "IP:2181");
config.set("fs.defaultFS", "hdfs://IP:8020");
config.set(TableOutputFormat.OUTPUT_TABLE, OUTPUT_TABLE);
Job job = null;
Connection conn = null;
try {
job = Job.getInstance(config, "write_csv_to_hbase_table");
job.setJarByClass(FileToHbaseJob.class);
//取出Configuration conf = job.getConfiguration();设置了conf的输入路径input_dir
FileInputFormat.setInputPaths(job, INPUT_FILE_PATH);
FileOutputFormat.setOutputPath(job, OUTPUT_HFILE_PATH);
//设置inputformat,outputformat
job.setInputFormatClass(TextInputFormat.class);
//设置mapper及输出格式
job.setMapperClass(ImportFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
//设置reducer
//job.setNumReduceTasks(0);
//HFileOutputFormat2
conn = ConnectionFactory.createConnection(config);
//表格已经创建,或通过代码创建
TableName tn = createTable(conn,OUTPUT_TABLE,CF);
Table table = conn.getTable(tn);
RegionLocator regionLocator = conn.getRegionLocator(tn);
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
} catch (IOException e) {
e.printStackTrace();
}
boolean b = false;
try {
b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
if(conn != null){
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static TableName createTable(Connection conn,String tableName,String cf){
TableName tn = TableName.valueOf(tableName);
Admin admin = null;
try {
admin = conn.getAdmin();
boolean flag = admin.tableExists(tn);
if(!flag){
//表不存在,则创建
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tn);
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(cf);
builder.addColumnFamily(cfd);
admin.createTable(builder.build());
}
} catch (IOException e) {
e.printStackTrace();
}
return tn;
}
/**
* 读取(csv)文件,拼凑成单元格形式,由Mapper任务写出。emit
*/
public static class ImportFileMapper extends Mapper<LongWritable, Text,ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fieldValues = value.toString().split(",");
int len = fieldValues.length;
//rowkey = uuid_medallion
String rowKey = generateRowKey(fieldValues[0]);
System.out.println("rowkey: "+rowKey);
for(int i=0;i<len;i++){
KeyValue kv = new KeyValue(rowKey.getBytes(),
CF.getBytes(),
COLUMNS[i],
fieldValues[i].getBytes());
context.write(new ImmutableBytesWritable(rowKey.getBytes()),kv);
}
}
public String generateRowKey(String baseInfo){
String uuid = UUID.randomUUID().toString();
int idx = uuid.lastIndexOf("-");
return new StringBuffer(uuid.substring(idx+1)).append("_").append(baseInfo).toString();
}
}
}
注意:本文归作者所有,未经作者允许,不得转载