dsg

    科技2024-01-20  113

    Hbase

    HBase的介绍HBase的特征HBase的存储机制HBase的搭建Hbase在第三方软件中的体现Hbase数据在Hadoop中存储的位置Hbase在zookeeper中的存储位置Hbase在Hbase中的体现Hbase在webui中的体现Hbase在shell中的体现 Hbase命令javaAPI操作Hbaseclient交互hbase的过程Hbase切割文件手动分区手动移动分区 预切割Hbase数据版本控制创表参数TTLKEEP_DELETED_CELLS 过滤器过滤器的分类比较过滤器专用过滤器 过滤器列表 比较运算符集合类比较器过滤器的用法 Hbase计数器计数器的命令javaAPI操作计数器 rowKey的设计布隆过滤器sql可视化界面

    HBase的介绍

    HBase是基于hadoop的数据库,分布式可伸缩(可任意增加或减少节点)大型数据存储,随机、实时读写数据,可以存储十亿行,百万列,版本化、非关系型,面向列(列不固定)的数据库,非关系型数据库还有Redis和mongodb,存储形式都是kv对。

    HBase的特征

    线性模块化扩展方式。严格一致性读写自动可配置表切割区域服务器之间自动容灾HBase支持Hadoop MR作业易于使用的Java API块缓存和布隆过滤器用于实时查询通过服务器端过滤器实现查询预测支持XML, Protobuf, and binary data等串行化数据技术可扩展的shell可视化

    HBase的存储机制

    面向列存储 ,table是按行排序,HBase的存储结构如下: 因此定位一个数据需要该数据的行号,列族和列,版本号

    HBase的搭建

    HBase有两种节点,master(管理节点)和RegionServer(区域服务器) 这里我选择Centos100作为master,Centos101和Centos102作为RS(一般来说应该选择奇数个服务器作为RS,但是本人电脑资源有限只能选择两台)

    安装jdk

    安装hadoop

    安装HBase

    #解压 tar -zvxf hbase-2.0.0-bin.tar.gz -C /soft/ #分发 xsync.sh hbase-2.0.0 #配置环境变量 export HBASE_HOME=/soft/hbase-2.0.0 export PATH=$PATH:$HBASE_HOME/bin source /etc/profile #验证环境变量是否配置成功 hbase version

    修改hbase-2.0.0/conf/hbase-env.sh,然后分发

    #修改JAVA_HOME路径 export JAVA_HOME=/soft/jdk1.8.0_65/ #修改zk管理,hbase自己已经集成了zk,如果我们想用自己搭建的zk去管理Hbase,就要把下面的属性设置成false export HBASE_MANAGES_ZK=false

    HBase本地模式 hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性

    <property> <name>hbase.rootdir</name> <value>file:/home/hadoop/HBase/HFiles</value> </property>

    HBase伪分布式 hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性

    <!--启用分布式--> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.rootdir</name> <value>hdfs://localhost:8030/hbase</value> </property>

    HBase完全分布式 hbase-2.0.0/conf/hbase-site.xml,添加或修改以下属性,配置完后分发

    <!--启用分布式--> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <!-- 指定hbase数据在hdfs上的存放路径,注意这里如果配置的是hadoopHA高可用的dfs.nameservices属性的值时需要将hadoop中的hdfs-sit.xml和core-site.xml复制到hbase-2.0.0/conf/路径下,否则不识别--> <property> <name>hbase.rootdir</name> <value>hdfs://mycluster/hbase</value> </property> <!-- 配置zk地址 --> <property> <name>hbase.zookeeper.quorum</name> <value>Centos100:2181,Centos101:2181,Centos102:2181</value> </property> <!-- zk的本地目录 --> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/home/zk/zookeeper</value> </property>

    配置hbase-2.0.0/conf/regionservers,指定区域服务器主机名(配置完分发)

    Centos101 Centos102

    启动集群(在master节点上启动,注意:完全分布式启动之前需要先启动hadoop和zk)

    #启动zk zkServer.sh start #启动hadoop start-all.sh #启动hbase start-hbase.sh

    启动成功后master节点主机上会有一个进程HMaster,RS节点上会有一个进程HRegionServer

    访问hbase的webui,端口是16010

    查看zk

    zkCli.sh ls / #查看RS节点 ls /hbase/rs #查看master get /hbase/master

    启用备份master

    #启用备份master直接去其他主机上执行以下命令即可在webui上的Backup Masters中看到(一般启动两个master就可以了) hbase-daemon.sh start master

    校时

    #hbase是以时间戳来作为数据的版本,所以说hbase集群的主机时间必须同步,否则容易发生故障

    Hbase在第三方软件中的体现

    Hbase数据在Hadoop中存储的位置

    名称空间

    表数据的分区

    列族

    列族中的数据

    Hbase在zookeeper中的存储位置

    名称空间 表 meta表所在的服务器 查询数据一般都是先通过zk查到meta所在的服务器,然后查询meta分区表,根据提供的rowKey找到对应分区所在的服务器,查到数据

    Hbase在Hbase中的体现

    Hbase在webui中的体现
    表格 表格分区信息
    Hbase在shell中的体现

    查询名称空间表和分区

    Hbase命令

    #启动master的命令 hbase-daemon.sh start master #启动RS的命令(这个命令也是在master主机上执行) hbase-daemons.sh start regionserver #全部启动的命令 start-hbase.sh #关闭hbase集群 stop-hbase.sh #进入hbase的shell命令状态 hbase shell #查看命令帮助 help

    命令分组:

    general(常规组)

    #查看当前系统登录用户 whoami #查看版本 version

    ddl(数据定义语言)

    #在名字空间mydb1下创建表t1,拥有三个列族f1,f2,f3 create 'mydb1:t1','f1','f2','f3' #查看表结构 describe 'mydb1:t1' desc 'mydb1:t1' #删除表,需要先禁用表然后才能删除表 disable 'mydb1:t1' drop 'mydb1:t1' #解禁表,表格禁用后查询插入都不行 enable 'mydb1:t1'

    namespace(名字空间)

    #列出名字空间,最初会有两个空间,一个是default,一个是hbase,hbase属于系统库 list_namespace #列出指定名字空间下的表,注意:hbase中所有的参数都要加上单引号 list_namespace_tables 'default' #创建名字空间mydb1 create_namespace 'mydb1'

    dml

    #插入数据id=100,name='tom',age=12,三级坐标对应一个值,名称空间和表,行号,列族和列 put 'mydb1:t1','row1','f1:id',100 put 'mydb1:t1','row1','f1:name','tom' put 'mydb1:t1','row1','f1:age',12 #查询指定行的数据,查询必须指定row,因为hbase是按照rowKey切割表的,只能指定了rowkey,hbase才能找到对应的区域 get 'mydb1:t1','row1' #扫描表t1(上亿数据谨慎使用) scan 'mydb1:t1' #统计表的行数 count 'mydb1:t1'

    tools

    #清理内存缓冲区到磁盘文件 flush 'mydb1:t1' #切割表,不带参数,默认是从中间切割 split 'mydb1:t1' #切割表,不带参数,从指定的rowKey切割 split 'mydb1:t1','row008888' #移动分区,格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码) move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341' #合并分区,参数为两个分区id,只能合并两个分区 merge_region 'e15b2d06eb033f19934c26f5af3eab3d','8bf5e09451b8b5f516dff93f834927dd'

    javaAPI操作Hbase

    将服务器上的hbase-site.xml配置文件拷到项目中的src根目录下,注意如果服务器上的配置文件写的是主机名,那你这边不能把主机名变成ip,需要去修改hosts文件ip映射,如果服务器上配置的ip,那就写ip,总之不要去更改从服务器上考下来的配置文件maven环境搭建 <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.0.0</version> </dependency> 代码实现(注意:当前代码都是在2.0.0版本的基础上写的,不同版本的hbase代码会有差异)import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.text.DecimalFormat; /** * 插入数据 */ public static void put() throws Exception { //创建配置对象 Configuration conf = HBaseConfiguration.create(); //创建连接 Connection conn = ConnectionFactory.createConnection(conf); //创建表名对象 TableName tName = TableName.valueOf("mydb1:t1"); //获得表 Table table = conn.getTable(tName); //将字符串转化成字节数组 byte[] rowId = Bytes.toBytes("row2"); Put put = new Put(rowId); byte[] f1 = Bytes.toBytes("f1"); byte[] id = Bytes.toBytes("id"); byte[] valueId = Bytes.toBytes(101); put.addColumn(f1,id,valueId); byte[] name = Bytes.toBytes("name"); byte[] valueName = Bytes.toBytes("zhangsan"); put.addColumn(f1,name,valueName); byte[] hobby = Bytes.toBytes("hobby"); byte[] valueHobby = Bytes.toBytes("打篮球"); put.addColumn(f1,hobby,valueHobby); //执行插入 table.put(put); conn.close(); } /** * 查询数据 * @throws Exception */ public static void get() throws Exception { //创建配置对象 Configuration conf = HBaseConfiguration.create(); //创建连接 Connection conn = ConnectionFactory.createConnection(conf); //创建表名对象 TableName tName = TableName.valueOf("mydb1:t1"); //获得表 Table table = conn.getTable(tName); //将字符串转化成字节数组 byte[] rowId = Bytes.toBytes("row1"); Get get = new Get(rowId); Result r = table.get(get); byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")); System.out.println(Bytes.toString(value)); conn.close(); } /** * 批处理 * @throws Exception */ public static void batch() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t5")); //该集合里可以放get,put,delete操作 List<Row> rows = new ArrayList<Row>(); Put put = new Put(Bytes.toBytes("row003")); put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name"),Bytes.toBytes("zhangsan")); rows.add(put); Get get = new Get(Bytes.toBytes("row001")); get.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name")); rows.add(get); Delete delete = new Delete(Bytes.toBytes("row002")); delete.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name")); rows.add(delete); //接收处理结果 Object[] results = new Object[rows.size()]; //处理 table.batch(rows,results); //get返回有结果,put和delete都是无返回结果的 Result r = (Result)results[1]; byte[] value = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")); System.out.println(Bytes.toString(value)); conn.close(); } /** * HBase批处理存储 * @throws Exception */ public static void putAll() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tName = TableName.valueOf("mydb1:t1"); Table table = conn.getTable(tName); //设置不要自动提交缓冲区的内容 List<Put> puts = new ArrayList<Put>(); //设置rowKey的格式,因为rowKey是按位排序的 DecimalFormat df = new DecimalFormat(); df.applyPattern("0000000"); for(int i = 1; i < 100000; i++){ Put put = new Put(Bytes.toBytes("row" + df.format(i))); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("id"),Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("name"),Bytes.toBytes("Tom" + i)); put.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("age"),Bytes.toBytes(i % 100)); puts.add(put); } table.put(puts); conn.close(); } /** * 创建名称空间 * @throws Exception */ public static void createNameSpace() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //创建名称空间 NamespaceDescriptor nd = NamespaceDescriptor.create("mydb2").build(); admin.createNamespace(nd); conn.close(); } /** * 查询名称空间 * @throws Exception */ public static void listNameSpaces() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); NamespaceDescriptor[] nds = admin.listNamespaceDescriptors(); for(NamespaceDescriptor nd:nds){ System.out.println(nd.getName()); } conn.close(); } /** * 创建表 * @throws Exception */ public static void createTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); TableName tn = TableName.valueOf("mydb2:t2"); TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn); //添加列族f1 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family1 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1")); table.setColumnFamily(family1); //添加列族f2 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family2 = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f2")); table.setColumnFamily(family2); //创建表 admin.createTable(table); conn.close(); } /** * 删除数据 * @throws Exception */ public static void deleteData() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb2:t2")); Delete delete = new Delete(Bytes.toBytes("row000001")); delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("id")); delete.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("name")); //删除第一行的f1列族的id和name table.delete(delete); conn.close(); } /** * 删除表 * @throws Exception */ public static void deleteTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //先禁用表 admin.disableTable(TableName.valueOf("mydb2:t2")); //再删除表 admin.deleteTable(TableName.valueOf("mydb2:t2")); conn.close(); } /** * 查询 * @throws Exception */ public static void scan() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); //设置查询范围 Scan scan = new Scan(); //设置每行返回的列数,如果超过这个列数会自动截断,在下一个迭代会返回(调优手段,对结果没有任何影响) scan.setBatch(3); //开启扫描器缓存,默认是-1(不开启),如果不开启,下面迭代数据时每一次调用next()都会向服务器发送一个RPC请求,影响性能(调优手段,对结果没有任何影响) scan.setCaching(1500); //默认包含首不包含尾,如果特别指定第二个参数为true就包含 scan.withStartRow(Bytes.toBytes("row004748"),true); scan.withStopRow(Bytes.toBytes("row004758"),true); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); Map<byte[], byte[]> map = result.getFamilyMap(Bytes.toBytes("f1")); for(byte[] key:map.keySet()){ String k = Bytes.toString(key); //注意:所有的值最好在存储时都转为字符串,否则在查询的时候动态查询无法准确定位它的数据类型,都转为字符串会乱码 String v= Bytes.toString(map.get(key)); System.out.print(k + "===" + v + "......"); } System.out.println(); } rs.close(); conn.close(); } /** * 更加细致的动态查询 * @throws Exception */ public static void scan2() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); //设置查询范围 Scan scan = new Scan(); //默认包含首不包含尾,如果特别指定第二个参数为true就包含 scan.withStartRow(Bytes.toBytes("row004748"),true); scan.withStopRow(Bytes.toBytes("row004758"),true); ResultScanner rs = table.getScanner(scan); Iterator<Result> iterator = rs.iterator(); while (iterator.hasNext()){ Result result = iterator.next(); //最外层的map的key是列族,下面一层的map的key是列,最里面一层的map的key是数据版本(时间戳) Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = result.getMap(); for(byte[] fBytes : map.keySet()){ //得到列族 String f = Bytes.toString(fBytes); NavigableMap<byte[], NavigableMap<Long, byte[]>> map2 = map.get(fBytes); for(byte[] cBytes : map2.keySet()){ //得到列 String c = Bytes.toString(cBytes); NavigableMap<Long, byte[]> map3 = map2.get(cBytes); for(Long e : map3.keySet()){ String val = Bytes.toString(map3.get(e)); System.out.print(f + ":" + c + ":" + e + ":" + val); } } } System.out.println(); } rs.close(); conn.close(); } /** * 获取多版本数据 * @throws Exception */ public static void getVersions() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tName = TableName.valueOf("mydb1:t2"); Table table = conn.getTable(tName); Get get = new Get(Bytes.toBytes("row001")); //获取所有版本的数据 get.readAllVersions(); //获取指定版本个数的数据 //get.readVersions(2); Result r = table.get(get); List<Cell> cells = r.getColumnCells(Bytes.toBytes("f1"), Bytes.toBytes("name")); for(Cell cell:cells){ String f = Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()); String q = Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()); long timestamp = cell.getTimestamp(); String val = Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()); System.out.println("列族:" + f + ";列:" + q + ";版本:" + timestamp + ";值:" + val); } conn.close(); }

    client交互hbase的过程

    hbase集群启动时,master负责分配区域到指定区域服务器。联系zk,找出meta表所在rs(regionserver) /hbase/meta-region-server定位row key,找到对应region server缓存信息在本地。联系RegionServerHRegionServer负责open HRegion对象,为每个列族创建Store对象,Store包含多个StoreFile实例, 他们是对HFile的轻量级封装。每个Store还对应了一个MemStore,用于内存存储数据。

    Hbase切割文件

    Hbase的分区默认是当分区文件的大小超过hbase.hregion.max.filesize属性值时,就会新开辟一个分区存储,默认是10G

    <property> <name>hbase.hregion.max.filesize</name> <value>10737418240</value> <source>hbase-default.xml</source> </property>

    手动分区

    #切割表,不带参数,默认是从中间切割 split 'mydb1:t1' #切割分区,在指定的rowKey切割 split 'mydb1:t1,row004927,1602595549576.ea72ecf8920a3525e226c786a3538847.','row008000'

    注意: 原数据信息是存在缓冲区里面一段时间,不会长时间保留,只有切割后的分区信息数据会永久保留在磁盘上

    #使用手动清理缓冲区可以清理掉原数据的信息,并将新的分区信息存储到磁盘上 flush 'hbase:meta'

    手动移动分区

    #格式:move 分区id,需要分配到哪台服务器上(服务器名称,端口,启动码) move 'e15b2d06eb033f19934c26f5af3eab3d','centos101,16020,1602661324341'

    分区id:

    需要分配到的服务器的名称,端口,启动码: meta表中显示:

    预切割

    预切割指的是在创建表的时候就对表划分好分区,这样的话就可以省的我们去不断的给表进行切割

    #创建表时指定切割rowKey create 'mydb1:t2','f1',SPLITS=>['row3000','row6000']

    Hbase数据版本控制

    #创建t2表,指定列族f1,数据允许保存3个版本 create 'mydb1:t2',{NAME=>'f1',VERSIONS=>'3'} #查询指定需要看的版本个数 get 'mydb1:t2','row001',{COLUMN=>'f1',VERSIONS=>4} #查询指定版本的数据 get 'mydb1:t2','row001',{COLUMN=>'f1',TIMESTAMP=>1603076828293} #查询指定区域版本的数据,左闭右开区间 get 'mydb1:t2','row001',{COLUMN=>'f1',TIMERANGE=>[1603075388967,1603076828293],VERSIONS=>10} #删除指定版本的数据(不指定默认删除最新版本的数据) delete 'mydb1:t2','row001','f1:name',1603076828293 #原生扫描(可以把表格指定以外的版本都扫描出来,并且包含已删除的数据) scan 'mydb1:t2',{COLUMN=>'f1',RAW=>true,VERSIONS=>10}

    创表参数

    TTL

    TTL是创建表的参数,用于指定数据的存活时间,单位秒,不指定默认是永久存活,如果超过这个时间使用get命令是无法查询到数据,但是可以使用原生扫描查询到(低版本的Hbase原生扫描同样查询不到)

    #创建一个表,并指定表里的数据可以存活的时间为10秒 create 'mydb1:t3' , {NAME=>'f1',TTL=>10}

    KEEP_DELETED_CELLS

    KEEP_DELETED_CELLS是创建表的参数,用于指定删除数据的所有版本是否全部删除,布尔值,默认值false,不全部删除,指定true,表示删除之后通过get操作将无法获得表里的数据,但是原生扫描还是可以查询到的

    #创建一个表,指定删除操作时删除全部版本的数据 create 'mydb1:t4' , {NAME=>'f1',KEEP_DELETED_CELLS=>true} #删除数据 delete 'mydb1:t4','row001','f1:name' #注意删除数据的时候需要清下缓存,否则还能查询到 flush 'mydb1:t4'

    过滤器

    过滤器的分类

    比较过滤器

    Hbase的所有比较过滤器都是继承org.apache.hadoop.hbase.filter.CompareFilter这个抽象类,Hbase分以下五种比较过滤器

    行过滤器(org.apache.hadoop.hbase.filter.RowFilter)列族过滤器(org.apache.hadoop.hbase.filter.FamilyFilter)列过滤器(org.apache.hadoop.hbase.filter.QualifierFilter)值过滤器(org.apache.hadoop.hbase.filter.ValueFilter)依赖过滤器(org.apache.hadoop.hbase.filter.DependentColumnFilter)
    专用过滤器

    Hbase提供的专用过滤器都是直接继承org.apache.hadoop.hbase.filter.FilterBase,Hbase有以下几种专用过滤器

    单列值过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueFilter)单列排除过滤器(org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter)前缀过滤器(org.apache.commons.io.filefilter.PrefixFileFilter)分页过滤器(org.apache.hadoop.hbase.filter.PageFilter)行键过滤器(org.apache.hadoop.hbase.filter.KeyOnlyFilter)首次行键过滤器(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter)包含结束的过滤器(org.apache.hadoop.hbase.filter.InclusiveStopFilter)时间戳过滤器(org.apache.hadoop.hbase.filter.TimestampsFilter)列计数过滤器(org.apache.hadoop.hbase.filter.ColumnCountGetFilter)列分页过滤(org.apache.hadoop.hbase.filter.ColumnPaginationFilter)
    过滤器列表

    org.apache.hadoop.hbase.filter.FilterList用于组合过滤器,他有两个枚举值

    MUST_PASS_ALL:and衔接过滤器MUST_PASS_ONE:or衔接过滤器

    比较运算符集合类

    org.apache.hadoop.hbase.CompareOperator类是Hbase过滤器比较运算符的类,该类下面有七个常量:

    常量名称描述LESS小于LESS_OR_EQUAL小于等于EQUAL等于NOT_EQUAL不等于GREATER_OR_EQUAL大于等于GREATER大于NO_OP排除一切值

    比较器

    HBase的比较器都继承了org.apache.hadoop.hbase.filter.ByteArrayComparable这个抽象类,Hbase共有以下几种比较器

    org.apache.hadoop.hbase.filter.BinaryComparator(按照阈值比较)org.apache.hadoop.hbase.filter.BinaryPrefixComparator(匹配前缀)org.apache.hadoop.hbase.filter.BitComparator(按位比较字节数组,只能与EQUAL 和NOT_EQUAL 配合使用)org.apache.hadoop.hbase.filter.NullComparator(判断当前值是否为null)org.apache.hadoop.hbase.filter.RegexStringComparator(与正则表达式匹配,只能与EQUAL 和NOT_EQUAL 配合使用)(正则表达式中的有两个常用的符号:(tom$:表示以tom结尾,^tom:表示以tom开头,相当于sql语句里面的like))org.apache.hadoop.hbase.filter.SubstringComparator(匹配子字符串,只能与EQUAL 和NOT_EQUAL 配合使用)

    过滤器的用法

    /** * 行级过滤 * @throws Exception */ public static void rowFilter() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Scan scan = new Scan(); //获取小于row000100的数据 RowFilter rf = new RowFilter(CompareOperator.LESS ,new BinaryComparator(Bytes.toBytes("row000100"))); scan.setFilter(rf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); ResultScanner rs = table.getScanner(scan); for(Result r : rs){ //String val = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"))); String row = Bytes.toString(r.getRow()); System.out.println(row); } rs.close(); conn.close(); } /** * 列族过滤 * @throws Exception */ public static void familyFilter() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Scan scan = new Scan(); //获取列族=f1的数据 FamilyFilter ff = new FamilyFilter(CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("f1"))); scan.setFilter(ff); Table table = conn.getTable(TableName.valueOf("mydb1:t6")); ResultScanner rs = table.getScanner(scan); for(Result r:rs){ System.out.println(Bytes.toString(r.value())); } } /** * 列过滤 * @throws Exception */ public static void qualifierFilter() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Get get = new Get(Bytes.toBytes("row000001")); QualifierFilter qf = new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("name"))); get.setFilter(qf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); Result r = table.get(get); System.out.println(Bytes.toString(r.value())); conn.close(); } /** * 依赖过滤器,根据某个列的值过滤一整行记录,相当于sql语句的条件表达式(重要) * @param drop * @param co * @param bc * @throws Exception */ public static void dependentFilter(boolean drop,CompareOperator co,ByteArrayComparable bc) throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Scan scan = new Scan(); //参数:参考列族,参考列,返回结果是否包含参考列的值(boolean值),比较运算符,比较值 DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop,CompareOperator.EQUAL ,new BinaryPrefixComparator(Bytes.toBytes("Tom1"))); //后面两个参数不填表示返回所有 //DependentColumnFilter df = new DependentColumnFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),drop); scan.setFilter(df); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); ResultScanner rs = table.getScanner(scan); for(Result r : rs){ int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"))); String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"))); int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age"))); System.out.println("id:" + id + ";name:" + name + ";age:" + age); } rs.close(); conn.close(); } /** * 单列值过滤器,根据某一列的值过滤一行记录 * @throws Exception */ public static void singleValue() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Scan scan = new Scan(); // SingleColumnValueFilter sf = new SingleColumnValueFilter(Bytes.toBytes("f1"),Bytes.toBytes("name"),CompareOperator.EQUAL , new BinaryComparator(Bytes.toBytes("Tom10"))); scan.setFilter(sf); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); ResultScanner rs = table.getScanner(scan); for(Result r : rs){ int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"))); String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"))); int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age"))); System.out.println("id:" + id + ";name:" + name + ";age:" + age); } rs.close(); conn.close(); } //过滤器组合 public static void fileList() throws Exception { //代码逻辑用sql表示:select * from mydb1:t1 where name like '^Tom1' and age = 55 Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Scan scan = new Scan(); SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("name"), CompareOperator.EQUAL, new RegexStringComparator("^Tom1")); SingleColumnValueFilter ageFilter = new SingleColumnValueFilter(Bytes.toBytes("f1"), Bytes.toBytes("age"), CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(55))); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); filterList.addFilter(nameFilter); filterList.addFilter(ageFilter); scan.setFilter(filterList); Table table = conn.getTable(TableName.valueOf("mydb1:t1")); ResultScanner rs = table.getScanner(scan); for(Result r : rs){ int id = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("id"))); String name = Bytes.toString(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"))); int age = Bytes.toInt(r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("age"))); System.out.println("id:" + id + ";name:" + name + ";age:" + age); } rs.close(); conn.close(); }

    Hbase计数器

    Hbase计数器一般用于收集统计信息或点击量,他能做到实时统计,并且没有高并发的情况,相对于延时比较高的批处理操作更好

    计数器的命令

    #执行以下命令每执行一次会在hits上加1,对表格并没有什么要求,第一次执行以下命令,就会把该列的默认值设为0,然后再加一 incr 'mydb1:t7','row001','dayily:hits' #查看计数器列的值(get也可以,但是计数器的数据类型是int类型,在命令行显示的是16进制,可读性比较差,使用get_counter可以很清晰的显示) get_counter 'mydb1:t7','row001','dayily:hits' #指定增加的步长,这个步长可以为负数 incr 'mydb1:t7','row001','dayily:hits',10

    javaAPI操作计数器

    /** * 计数器 * @throws Exception */ public static void incr() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); //创建计数器对象指定行 Increment increment = new Increment(Bytes.toBytes("row001")); //指定列族,列和步长 increment.addColumn(Bytes.toBytes("dayily"),Bytes.toBytes("hits"),1); increment.addColumn(Bytes.toBytes("monthly"),Bytes.toBytes("hits"),1); increment.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),1); Table table = conn.getTable(TableName.valueOf("mydb1:t7")); table.increment(increment); conn.close(); }

    rowKey的设计

    假设我们对全市的过车数据进行设计rowkey,总共有10台服务器

    过车数据的数据业务一般跟过车时间,车牌号码,卡口ID有关,我们用过车时间,车牌号码,卡口ID设计rowKey计算分区编号,共十台服务器,我们把过车时间,车牌号,卡口ID组合起来,求出hash值,然后用hash值与10求余,得到0-9的编号,根据编号可以把数据对应放在不同服务器上,防止热点问题rowkey组合:分区编号,过车时间,车牌号,卡口ID上面我们要查询过车时间,车牌号都是可以的,但是如果要查询卡口的过车数据就很麻烦,那么我们可以创建一个索引表,rowKey也按照上面的组合,但是要卡口ID方在分区编号的后面,然后把主表对应的rowKey作为值放在索引表里面

    布隆过滤器

    布隆过滤器是优化查询的一种手段,可以通过布隆过滤器判断一个分区中是否包含你查询的rowKey,如果这个文件不包含,它会给予你明确的答复,如果包含,它会给你一个可能性的答复,因此他可以提神get查询的性能,避免了全表扫描 创建一个带有布隆过滤器的表:

    public static void createBloomFilter() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); TableName tn = TableName.valueOf("mydb2:t3"); TableDescriptorBuilder.ModifyableTableDescriptor table = new TableDescriptorBuilder.ModifyableTableDescriptor(tn); //添加列族f1 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor family = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes("f1")); //设置布隆过滤器,NONE:不启用布隆过滤器(默认值),ROW:行级别布隆过滤器(推荐使用),ROWCOL:行和列级别(消耗资源比较大,不推荐使用) family.setBloomFilterType(BloomType.ROW); table.setColumnFamily(family); admin.createTable(table); conn.close(); }

    创建完成后,用desc查看表格描述,可以看到BLOOMFILTER => 'ROW'这样一段代码

    sql可视化界面

    下载apache-phoenix-5.0.0-HBase-2.0-bin.tar.gz将其上传到服务器,并解压复制phoenix-5.0.0-HBase-2.0-server.jar到Hbase的lib的目录下,并且分发重启Hbase进入/apache-phoenix-5.0.0-HBase-2.0-bin/bin目录下输入./sqlline.py Centos100连接到zk服务器,进入phoenix客户端,输入以下命令 #查看表 !tables #查看帮助 !help #创建表,必须指定primary key,这里主键就是rowKey !sql create table mydb1.person(id varchar(10) primary key,name varchar(100),age Integer); #删除表 !sql drop table mydb1.person; #查看表结构 !describe TEST; #插入数据修改数据 upsert into TEST(id,name) values ('row005','lisi'); #删除数据 delete from TEST where id = 'row005'; #退出 !quit 查看表格在Hbase中的位置以及表的描述 下载squirrel-sql-4.1.0-standard.jar双击squirrel-sql-4.1.0-standard.jar安装squirrel(直接下一步到底)将phoenix-5.0.0-HBase-2.0-client.jar复制到squirrel的安装目录下打开 squirrel点击下方按钮安装驱动 添加数据库连接 结构
    Processed: 0.017, SQL: 8