HBase批量入库/导入

求知探索 1年前 ⋅ 840 阅读

一、理论知识
(一)加载数据到HBase的三种方法:

通过MR job,使用TableOutputFormat加载到表中。(效率较低)
核心的原理还是使用htable的put方法,不过由于使用了mapreduce分布式提交到hbase,速度比单线程效率高出许多。
通过客户端API,写入表中。(效率较低)
通过Bulk load 运行MR job将数据输出为hbase内部格式,再加载数据到集群。(使用更少的CPU和网络资源)
(二)Bulk load两步走

通过MR job,使用HFileOutputFormat2,生成StoreFiles。
每一个输出的HFile 文件,都在一个单独的region内,所以需要使用TotalOrderPartitioner 进行分区。
保证map任务的输出为相互不交叉的主键空间范围,也就是对应hbase中region里的主键范围。
完成文件加载,将数据导入hbase集群中。
完成数据加载有两种方式:
#命令行--- completebulkload
#代码---ImportTsv.java HFileOutputFormat 或 LoadIncrementalHFiles
(三)生成HFile的方法

命令方式:

使用 importtsv 工具将 TSV 格式数据转换为 HFile。(自动生成MR任务)
代码方式:

2.1 通过HFileOutputFormat2 类编写 MapReduce 程序来生成 HFile 。
案例1:https://github.com/jrkinley/hbase-bulk-import-example
案例2:https://www.cnblogs.com/smartloli/p/9501887.html

2.2 Spark bulk load:Using thin record bulk load.
案例1:(Hbase官网文档)Chapter 110 Bulk Load
(四)加载HFile的方法

命令方式:

1.1 使用 completebulkload 将 HFile 导入到 HBase中。
1.2 

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles HDFS_Dir_Path HBase_table_name


代码方式:
调用LoadIncrementalHFiles 的doBulkLoad 方法来导入。
(五)常用命令

创建HBase表格,并做预分区:
hbase org.apache.hadoop.hbase.util.RegionSplitter pre_split_table HexStringSplit -c pre_split_nums -f column_family
pre_split_table:需要创建和预分区的表格名称。
pre_split_nums:分区的个数。
column_family:列族名称。
HBase中表的行数统计:
hbase org.apache.hadoop.hbase.mapreduce.RowCounter table_name
table_name:Hbase表的名称。
将HFile文件导入HBase集群中,生成表数据:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hdfs_dir_path table_name
hdfs_dir_path:HDFS中存储HFile文件的目录(该目录下一级应该还有一个以列族名命名的子目录)
table_name:HBase表的名称

二、代码实操
需求:下载纽约出租车数据(https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page),将CSV文件导入到HBase中。

方法一:命令行方式
#将纽约出租车数据文件导入hbase表格(taxi_nyc_2013_01)中

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=","
-Dimporttsv.columns=HBASE_ROW_KEY,taxi:medallion,taxi:hack_license,taxi:vendor_id,
taxi:rate_code,taxi:store_and_fwd_flag,taxi:pickup_datetime,taxi:dropoff_datetime,
taxi:passenger_count,taxi:trip_time_in_secs,taxi:trip_distance,taxi:pickup_longitude,
taxi:pickup_latitude,taxi:dropoff_longitude,taxi:dropoff_latitude 
taxi_nyc_2013_01 /hdfs_nyc/2013_trip_data_1.csv


【问题:】 #.纽约出租车数据中没有合适的可以作为ROW_KEY的字段,使用以上命令写入,没办法自定义ROW_KEY,因为主键重复,数据会覆盖。 #.导入CSV时,字段顺序会打乱,命令行中的columns顺序需要与导入的字段顺序一致。

命令行的其他方式,或者使用:

(生成hfile)hadoop jar hbase-version.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,c1,c2 -Dimporttsv.bulk.output=tmp
hbase_table hdfs_file
(导入hbase)hadoop jar hbase-version.jar completebulkload /user/hadoop/tmp/cf hbase_table


命令行的其他方式,或者使用:

(直接生成并导入)hadoop jar hbase-version.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,c1,c2 hbase_table hdfs_file

方法二:Spark Bulk Load

import java.io.IOException
import java.util.UUID
 
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.spark.{ByteArrayWrapper, FamiliesQualifiersValues, FamilyHFileWriteOptions, HBaseContext}
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.spark.{SparkConf, SparkContext}
 
/**
  * @Author: gh
  * @Description:
  *【目的】将CSV文件导入到Hbase table中。
  * 【实现思路】bulk load:
  * step1:读数据,通过MR Job生成storefiles
  * step2:将hfile加载到hbase集群中。
  */
object SparkBulkLoad2 {
 
  System.setProperty("hadoop.home.dir", "C:/hadoop-2.6.0")
  val sc = new SparkContext(new SparkConf()
    .setAppName("write_csv_to_hbase_table")
    .setMaster("local[*]")
  .set("spark.shuffle.file.buffer","2048k")
  .set("spark.executor.cores","2")
 
  )
  //HBase配置
  val config = HBaseConfiguration.create()
  config.set("hbase.zookeeper.quorum", "IP:2181")
  config.setInt("zookeeper.recovery.retry", 0)
  config.setInt("hbase.client.retries.number", 0)
  val hbaseContext = new HBaseContext(sc, config)
  //hbase 表格
  val columnFamily = "taxi"
  //TODO:need to change
  val table_name = "taxi_nyc_2013_03"
  val COLUMNS = Array(Bytes.toBytes("medallion"), Bytes.toBytes("hack_license"),
                      Bytes.toBytes("vendor_id"), Bytes.toBytes("rate_code"),
                      Bytes.toBytes("store_and_fwd_flag"), Bytes.toBytes("pickup_datetime"),
                      Bytes.toBytes("dropoff_datetime"), Bytes.toBytes("passenger_count"),
                      Bytes.toBytes("trip_time_in_secs"), Bytes.toBytes("trip_distance"),
                      Bytes.toBytes("pickup_longitude"), Bytes.toBytes("pickup_latitude"),
                      Bytes.toBytes("dropoff_longitude"), Bytes.toBytes("dropoff_latitude"))
  val tableName = createTable(ConnectionFactory.createConnection(config),table_name,columnFamily)
  //HFile的生成路径
  //TODO:need to change
  val path = new Path("D:\\hbase_mr_nyc3")
  //需要读取的文件
  val files = "D:\\mydownload\\NYC_taxi_data\\2013_trip_data_3.csv"
 
  def main(args: Array[String]): Unit = {
    val start = System.currentTimeMillis()
    generateHFiles(files)
    val end = System.currentTimeMillis()
    println("耗时: "+(end - start)/(1000*60).toDouble+" min...")
    //loadHFiles
  }
 
  /**
    * 生成hbase table的rowkey
    * @param baseInfo  rowkey中的组成部分
    * @return
    */
  def generateRowKey(baseInfo: String): String = {
    val uuid = UUID.randomUUID.toString
    val idx = uuid.lastIndexOf("-")
    new StringBuffer(baseInfo).append("_").append(uuid.substring(idx + 1)).toString
  }
 
  /**
    * 读取文件并生成hfile
    * @param tmp
    * @param fileName
    */
  def generateHFiles(fileName:String): Unit ={
    import org.apache.hadoop.hbase.spark.HBaseRDDFunctions.GenericHBaseRDDFunctions
    val column_count = COLUMNS.length
    //获取数据
    val lines = sc.textFile(fileName,10)
    lines.filter(line=>line.split(",").length==column_count).map(line=>{
      var arr = line.split(",")
      val familyQualifiersValues = new FamiliesQualifiersValues
      for(i <- 0 until column_count){
        familyQualifiersValues +=(Bytes.toBytes(columnFamily), COLUMNS(i), Bytes.toBytes(arr(i)))
      }
      val rowkey = generateRowKey(arr(0))
      (new ByteArrayWrapper(Bytes.toBytes(rowkey)), familyQualifiersValues)
    }).hbaseBulkLoadThinRows(hbaseContext,tableName,
      t => {t},
      path.toUri.getPath,
      new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude = false,HConstants.DEFAULT_MAX_FILE_SIZE)
  }
 
  /**
    * 将上一步生成的hfile导入hbase。
    * 【注意】
    * 1.HDFS文件夹中需要有一个子目录,名称为hbase的列族名称。这和生成hfile时得到的目录是一致的。
    * 2.运行该方法(loadHFiles)和运行命令是等价的。命令如下:
    * hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hbase_hfile  table_name
    * 3.如果报错:Bulk load aborted with some files not yet loaded。原因可能是权限问题,需要以hdfs用户运行。
    * 4.将hfile导入hbase之后,原来HDFS路径下的文件会被自动移除。
    */
  def loadHFiles: Unit ={
    val path1 = new Path("hdfs://IP:8020/hbase_mr_nyc/")
    //批量导入
    val conn = ConnectionFactory.createConnection(config)
    val table = conn.getTable(tableName)
    val load = new LoadIncrementalHFiles(config)
    //将文件移入到hbase
    load.doBulkLoad(path1,conn.getAdmin, table, conn.getRegionLocator(tableName))
  }
 
  def createTable(conn: Connection, tableName: String, cf: String): TableName = {
    val tn = TableName.valueOf(tableName)
    var admin:Admin = null
    try {
      admin = conn.getAdmin
      val flag = admin.tableExists(tn)
      if (!flag) { //表不存在,则创建
        val builder = TableDescriptorBuilder.newBuilder(tn)
        val cfd = ColumnFamilyDescriptorBuilder.of(cf)
        builder.addColumnFamily(cfd)
        admin.createTable(builder.build)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
    tn
  }
 
 
}
  1. 方法三:HFileOutputFormat2
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import java.io.IOException;
import java.util.UUID;
 
/**
 * @Author: gh
 * @Description:
 * 【目的】将CSV文件导入到Hbase table中。
 * 【实现思路】bulk load:
 * step1:读数据,通过MR Job生成storefiles
 * step2:将hfile加载到hbase集群中。
 * 【问题!!!】
 * 1.在本地运行失败:java.io.IOException: (null) entry in command string: null chmod 0700
 * 2.在集群上运行失败:java.lang.ClassNotFoundException: org.apache.hadoop.hbase.TableName
 */
public class FileToHbaseJob {
    //输出的表名
    static final String OUTPUT_TABLE="taxi_nyc_2013_04";
    //输入的文件名:hdfs路径
    static final Path INPUT_FILE_PATH = new Path("hdfs://IP:8020/testdata/2013_trip_data_test.csv");
    //输出hfile的路径
    static final Path OUTPUT_HFILE_PATH = new Path("hdfs://IP:8020/tmp/hfile");
 
    //表的字段名
    static final byte[][]COLUMNS = {Bytes.toBytes("medallion"),Bytes.toBytes("hack_license"),
                                    Bytes.toBytes("vendor_id"),Bytes.toBytes("rate_code"),
                                    Bytes.toBytes("store_and_fwd_flag"),Bytes.toBytes("pickup_datetime"),
                                    Bytes.toBytes("dropoff_datetime"),Bytes.toBytes("passenger_count"),
                                    Bytes.toBytes("trip_time_in_secs"),Bytes.toBytes("trip_distance"),
                                    Bytes.toBytes("pickup_longitude"),Bytes.toBytes("pickup_latitude"),
                                    Bytes.toBytes("dropoff_longitude"),Bytes.toBytes("dropoff_latitude")};
    //表的列族
    static final String CF = "taxi";
 
    public static void main(String[] args) {
 
        //AccessControlException: Permission denied
        System.setProperty("HADOOP_USER_NAME", "hdfs");
 
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", "IP:2181");
        config.set("fs.defaultFS", "hdfs://IP:8020");
        config.set(TableOutputFormat.OUTPUT_TABLE, OUTPUT_TABLE);
        Job job = null;
        Connection conn = null;
        try {
            job = Job.getInstance(config, "write_csv_to_hbase_table");
            job.setJarByClass(FileToHbaseJob.class);
 
            //取出Configuration conf = job.getConfiguration();设置了conf的输入路径input_dir
            FileInputFormat.setInputPaths(job, INPUT_FILE_PATH);
            FileOutputFormat.setOutputPath(job, OUTPUT_HFILE_PATH);
            //设置inputformat,outputformat
            job.setInputFormatClass(TextInputFormat.class);
 
            //设置mapper及输出格式
            job.setMapperClass(ImportFileMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
 
            //设置reducer
            //job.setNumReduceTasks(0);
 
            //HFileOutputFormat2
            conn = ConnectionFactory.createConnection(config);
            //表格已经创建,或通过代码创建
            TableName tn = createTable(conn,OUTPUT_TABLE,CF);
            Table table = conn.getTable(tn);
            RegionLocator regionLocator = conn.getRegionLocator(tn);
            HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        boolean b = false;
        try {
            b = job.waitForCompletion(true);
            if (!b) {
                throw new IOException("error with job!");
            }
            if(conn != null){
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    public static TableName createTable(Connection conn,String tableName,String cf){
        TableName tn = TableName.valueOf(tableName);
        Admin admin = null;
        try {
            admin = conn.getAdmin();
            boolean flag = admin.tableExists(tn);
            if(!flag){
                //表不存在,则创建
                TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tn);
                ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(cf);
                builder.addColumnFamily(cfd);
                admin.createTable(builder.build());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return tn;
    }
    /**
     * 读取(csv)文件,拼凑成单元格形式,由Mapper任务写出。emit
     */
    public static class ImportFileMapper extends Mapper<LongWritable, Text,ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fieldValues = value.toString().split(",");
            int len = fieldValues.length;
            //rowkey = uuid_medallion
            String rowKey = generateRowKey(fieldValues[0]);
            System.out.println("rowkey: "+rowKey);
            for(int i=0;i<len;i++){
                KeyValue kv = new KeyValue(rowKey.getBytes(),
                        CF.getBytes(),
                        COLUMNS[i],
                        fieldValues[i].getBytes());
                context.write(new ImmutableBytesWritable(rowKey.getBytes()),kv);
            }
        }
        public String generateRowKey(String baseInfo){
            String uuid = UUID.randomUUID().toString();
            int idx = uuid.lastIndexOf("-");
            return new StringBuffer(uuid.substring(idx+1)).append("_").append(baseInfo).toString();
        }
    }
}

 


全部评论: 0

    我有话说: