通过 MapReduce 服务来批量导入 HDFS 中数据到 HBase。

可通过 DistCp 命令来拷贝不同 HDFS 中的数据,关于 DistCp 更多的详细信息,可参考 DistCp

前提条件

已创建 QingMR 集群,并在 HBase 客户端 中在 /opt/hadoop 目录下完成 MapReduce 服务配置。

方案简介

使用 MapReduce 导入数据有三种方案:

  • 方案一:直接书写 MapReduce 使用 HBase 提供的 JAVA API 从 HDFS 导入到 HBase 表。

    特点:只需要一步操作,可自由规整数据,更为简单灵活,但直接写入 HBase 表会对线上服务有一定的性能影响。

  • 方案二:书写 MapReduce 将 HDFS 中数据转化为 HFile 格式,再使用 HBase 的 BulkLoad 工具导入到 HBase 表。若数据并未格式化仍需规整则采用该方式。

    特点:需两步操作,耗时工作提前做好,确保对线上服务影响做到最小。

  • 方案三:使用 HBase ImportTsv 工具将格式化的 HDFS 数据导入到 HBase 表。若要导入的数据已经是格式化的数据(有固定的分隔符),不需要自己实现 MapReduce 做进一步数据清洗,直接采用该方式。

    特点:需两步操作,耗时工作提前做好,确保对线上服务影响做到最小。

建表

以下示例通过 HBase Shell 预先建表,均使用 HBase 表 test_import,包含一个column family:content。

  cd /opt/hbase

  bin/hbase shell

  hbase(main):001:0> create 'test_import', 'content'
  0 row(s) in 1.2130 seconds

  => Hbase::Table - test_import

项目若使用 mvn 构建,pom.xml 中增加如下内容:

  <properties>
    <hbase.version>1.2.6</hbase.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>${hbase.version}</version>
    </dependency>
  </dependencies>

方案一

方案一 MapReduce 代码如下,先创建表,在 Map 中完成数据解析,在 Reduce 中完成入库。Reduce的个数相当于入库线程数。

说明

可自行修改 job.setNumReduceTasks() 中 Reduce 数目。

  package com.qingcloud.hbase

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hbase.HBaseConfiguration;
  import org.apache.hadoop.hbase.HColumnDescriptor;
  import org.apache.hadoop.hbase.HTableDescriptor;
  import org.apache.hadoop.hbase.TableName;
  import org.apache.hadoop.hbase.client.*;
  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  import org.apache.hadoop.hbase.mapreduce.TableReducer;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.util.GenericOptionsParser;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

  import java.io.File;
  import java.io.FileInputStream;
  import java.io.IOException;

  public class ImportByMR {

      private static String table = "test_import";

      private static class ImportByMRMapper extends Mapper<LongWritable, Text, Text, Text> {

          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String[] sp = value.toString().split(" ");
              if (sp.length < 2) {
                  return;
              }
              context.write(new Text(sp[0]), new Text(sp[1]));
          }
      }

      private static class ImportByMRReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {

          @Override
          public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
              byte[] bRowKey = key.toString().getBytes();
              ImmutableBytesWritable rowKey = new ImmutableBytesWritable(bRowKey);

              for (Text t : value) {
                  Put p = new Put(bRowKey);
                  p.setDurability(Durability.SKIP_WAL);
                  p.addColumn("content".getBytes(), "a".getBytes(), t.toString().getBytes());
                  context.write(rowKey, p);
              }
          }
      }

      private static void createTable(Configuration conf) throws IOException {
          TableName tableName = TableName.valueOf(table);
          Connection connection = ConnectionFactory.createConnection(conf);
          Admin admin = connection.getAdmin();
          if (admin.tableExists(tableName)) {
              System.out.println("table exists!recreating.......");
              admin.disableTable(tableName);
              admin.deleteTable(tableName);
          }
          HTableDescriptor htd = new HTableDescriptor(tableName);
          HColumnDescriptor tcd = new HColumnDescriptor("content");
          htd.addFamily(tcd);
          admin.createTable(htd);
      }

      public static void main(String[] argv) throws IOException, ClassNotFoundException, InterruptedException {
          Configuration conf = HBaseConfiguration.create();
          File file = new File("/opt/hbase/conf/hbase-site.xml");
          FileInputStream in = new FileInputStream(file);
          conf.addResource(in);
          createTable(conf);
          GenericOptionsParser optionParser = new GenericOptionsParser(conf, argv);
          String[] remainingArgs = optionParser.getRemainingArgs();

          Job job = Job.getInstance(conf, ImportByMR.class.getSimpleName());
          job.setJarByClass(ImportByMR.class);
          job.setMapperClass(ImportByMRMapper.class);
          TableMapReduceUtil.initTableReducerJob(table, ImportByMRReducer.class, job);
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);
          job.setOutputKeyClass(ImmutableBytesWritable.class);
          job.setOutputValueClass(Mutation.class);
          job.setNumReduceTasks(1);
          FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
  }

hbase-tools-1.0.0.jar 是将上述代码打成的jar包,APP_HOME 是 jar 包所放置的目录,/user/inputPath 下是需要导入到HBase中的数据。 数据格式为 rowkey value,两列 tab 分隔。需自行准备后通过 bin/hdfs dfs -put 到 HDFS 的 /user/inputPath 目录。

依次执行下述命令,执行成功后可简单通过测试一中的 HBase Shell 来验证数据。

  cd /opt/hadoop

  bin/hadoop jar $APP_HOME/hbase-tools-1.0.0.jar com.qingcloud.hbase.ImportByMR /user/inputPath

方案二

方案二 MapReduce 代码如下,Map 对数据做进一步处理,Reduce 无需指定,会根据 Map 的 outputValue 自动选择实现。

  package com.qingcloud.hbase

  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hbase.*;
  import org.apache.hadoop.hbase.client.*;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.util.GenericOptionsParser;

  import java.io.File;
  import java.io.FileInputStream;
  import java.io.IOException;

  public class ImportByBulkLoad {

      private static String myTable = "test_import";

      private static class ImportByBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String[] sp = value.toString().split(" ");
              if (sp.length < 2) {
                  return;
              }
              byte[] bRowKey = sp[0].getBytes();
              ImmutableBytesWritable rowKey = new ImmutableBytesWritable(bRowKey);
              Put p = new Put(bRowKey);
              p.setDurability(Durability.SKIP_WAL);
              p.addColumn("content".getBytes(), "a".getBytes(), sp[1].getBytes());
              context.write(rowKey, p);
          }
      }

      public static void main(String[] argv) throws IOException, ClassNotFoundException, InterruptedException {
          Configuration conf = HBaseConfiguration.create();
          File file = new File("/opt/hbase/conf/hbase-site.xml");
          FileInputStream in = new FileInputStream(file);
          conf.addResource(in);
          GenericOptionsParser optionParser = new GenericOptionsParser(conf, argv);
          String[] remainingArgs = optionParser.getRemainingArgs();
          Job job = Job.getInstance(conf, ImportByBulkLoad.class.getSimpleName());
          job.setJarByClass(ImportByBulkLoad.class);
          job.setMapperClass(ImportByBulkLoadMapper.class);
          job.setMapOutputKeyClass(ImmutableBytesWritable.class);
          job.setMapOutputValueClass(Put.class);
          FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
          HFileOutputFormat2.setOutputPath(job, new Path(remainingArgs[1]));
          TableName tableName = TableName.valueOf(myTable);
          Connection connection = ConnectionFactory.createConnection(conf);
          Table table = connection.getTable(tableName);
          RegionLocator regionLocator = connection.getRegionLocator(tableName);
          HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
  }

hbase-tools-1.0.0.jar 是将上述代码打成的 jar 包,APP_HOME 是 jar 包所放置的目录,/user/inputPath 下是需要导入到HBase中的数据。数据格式为 rowkey value,两列 tab 分隔。

需自行准备后通过 bin/hdfs dfs -put 到 HDFS 的 /user/inputPath 目录。/user/outputPath 是 MapReduce 生成的 HFile 格式的结果。test_import 是 HBase 表名。

依次执行下述命令,执行成功后可简单通过测试一中的 HBase Shell 来验证数据。

  cd /opt/hadoop

  bin/hdfs dfs -rmr /user/outputPath

  export HADOOP_CLASSPATH=`/opt/hbase/bin/hbase classpath`

  bin/hadoop jar $APP_HOME/hbase-tools-1.0.0.jar com.qingcloud.hbase.ImportByBulkLoad /user/inputPath /user/outputPath

  bin/hadoop jar /opt/hbase/lib/hbase-server-<VERSION>.jar completebulkload /user/outputPath test_import

方案三

方案三无需书写代码,/user/inputPath 下是需要导入到 HBase 中的数据。数据格式为 rowkey value,两列 tab 分隔。需自行准备后通过 bin/hdfs dfs -put 到 HDFS 的 /user/inputPath 目录。 test_import 是 HBase 表名。

依次执行下述命令,执行成功后可简单通过测试一中的 HBase Shell 来验证数据。

  cd /opt/hadoop

  bin/hdfs dfs -rmr /user/outputPath

  export HADOOP_CLASSPATH=`/opt/hbase/bin/hbase classpath`

  bin/hadoop jar /opt/hbase/lib/hbase-server-<VERSION>.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,content:a test_import /user/inputPath

  cd /opt/hadoop

  bin/hdfs dfs -rmr /user/outputPath

  cd /opt/hbase

  bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,content:a test_import /user/inputPath