Dunwu Blog

大道至简,知易行难

Hive 常用 DDL 操作

Database

查看数据列表

1
show databases;

使用数据库

1
USE database_name;

新建数据库

语法:

1
2
3
4
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMA 是等价的
[COMMENT database_comment] --数据库注释
[LOCATION hdfs_path] --存储在 HDFS 上的位置
[WITH DBPROPERTIES (property_name=property_value, ...)]; --指定额外属性

示例:

1
2
3
CREATE DATABASE IF NOT EXISTS hive_test
COMMENT 'hive database for test'
WITH DBPROPERTIES ('create'='heibaiying');

查看数据库信息

语法:

1
DESC DATABASE [EXTENDED] db_name; --EXTENDED 表示是否显示额外属性

示例:

1
DESC DATABASE  EXTENDED hive_test;

删除数据库

语法:

1
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
  • 默认行为是 RESTRICT,如果数据库中存在表则删除失败。
  • 要想删除库及其中的表,可以使用 CASCADE 级联删除。

示例:

1
DROP DATABASE IF EXISTS hive_test CASCADE;

创建表

建表语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --表名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列数据类型
[COMMENT table_comment] --表描述
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --分区表分区规则
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --分桶表分桶规则
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --指定倾斜列和值
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 指定行分隔符、存储文件格式或采用自定义存储格式
[LOCATION hdfs_path] -- 指定表的存储位置
[TBLPROPERTIES (property_name=property_value, ...)] --指定表的属性
[AS select_statement]; --从查询结果创建表

内部表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

外部表

1
2
3
4
5
6
7
8
9
10
11
CREATE EXTERNAL TABLE emp_external(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_external';

使用 desc format emp_external 命令可以查看表的详细信息如下:

分区表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

分桶表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

倾斜表

通过指定一个或者多个列经常出现的值(严重偏斜),Hive 会自动将涉及到这些值的数据拆分为单独的文件。在查询时,如果涉及到倾斜值,它就直接从独立文件中获取数据,而不是扫描所有文件,这使得性能得到提升。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_skewed(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
SKEWED BY (empno) ON (66,88,100) --指定 empno 的倾斜值 66,88,100
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_skewed';

临时表

临时表仅对当前 session 可见,临时表的数据将存储在用户的暂存目录中,并在会话结束后删除。如果临时表与永久表表名相同,则对该表名的任何引用都将解析为临时表,而不是永久表。临时表还具有以下两个限制:

  • 不支持分区列;
  • 不支持创建索引。
1
2
3
4
5
6
7
8
9
10
CREATE TEMPORARY TABLE emp_temp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

CTAS 创建表

支持从查询语句的结果创建表:

1
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

复制表结构

语法:

1
2
3
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name  --创建表表名
LIKE existing_table_or_view_name --被复制表的表名
[LOCATION hdfs_path]; --存储位置

示例:

1
CREATE TEMPORARY EXTERNAL TABLE  IF NOT EXISTS  emp_co  LIKE emp

加载数据到表

加载数据到表中属于 DML 操作,这里为了方便大家测试,先简单介绍一下加载本地数据到表中:

1
2
-- 加载数据到 emp 表中
load data local inpath "/usr/file/emp.txt" into table emp;

其中 emp.txt 的内容如下,你可以直接复制使用,也可以到本仓库的resources 目录下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369	SMITH	CLERK	7902	1980-12-17 00:00:00	800.00		20
7499 ALLEN SALESMAN 7698 1981-02-20 00:00:00 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-02-22 00:00:00 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-04-02 00:00:00 2975.00 20
7654 MARTIN SALESMAN 7698 1981-09-28 00:00:00 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850.00 30
7782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450.00 10
7788 SCOTT ANALYST 7566 1987-04-19 00:00:00 1500.00 20
7839 KING PRESIDENT 1981-11-17 00:00:00 5000.00 10
7844 TURNER SALESMAN 7698 1981-09-08 00:00:00 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-05-23 00:00:00 1100.00 20
7900 JAMES CLERK 7698 1981-12-03 00:00:00 950.00 30
7902 FORD ANALYST 7566 1981-12-03 00:00:00 3000.00 20
7934 MILLER CLERK 7782 1982-01-23 00:00:00 1300.00 10

加载后可查询表中数据

修改表

重命名表

语法:

1
ALTER TABLE table_name RENAME TO new_table_name;

示例:

1
ALTER TABLE emp_temp RENAME TO new_emp; --把 emp_temp 表重命名为 new_emp

修改列

语法:

1
2
ALTER TABLE table_name [PARTITION partition_spec] CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];

示例:

1
2
3
4
5
6
7
8
-- 修改字段名和类型
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- 修改字段 sal 的名称 并将其放置到 empno 字段后
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;

-- 为字段增加注释
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';

新增列

示例:

1
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');

清空表/删除表

清空表 hive-ddl.md

语法:

1
2
-- 清空整个表或表指定分区中的数据
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
  • 目前只有内部表才能执行 TRUNCATE 操作,外部表执行时会抛出异常 Cannot truncate non-managed table XXXX

示例:

1
TRUNCATE TABLE emp_mgt_ptn PARTITION (deptno=20);

删除表

语法:

1
DROP TABLE [IF EXISTS] table_name [PURGE];
  • 内部表:不仅会删除表的元数据,同时会删除 HDFS 上的数据;
  • 外部表:只会删除表的元数据,不会删除 HDFS 上的数据;
  • 删除视图引用的表时,不会给出警告(但视图已经无效了,必须由用户删除或重新创建)。

其他命令

Describe

查看数据库:

1
DESCRIBE|Desc DATABASE [EXTENDED] db_name;  --EXTENDED 是否显示额外属性

查看表:

1
DESCRIBE|Desc [EXTENDED|FORMATTED] table_name --FORMATTED 以友好的展现方式查看表详情

Show

1. 查看数据库列表

1
2
3
4
5
-- 语法
SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];

-- 示例:
SHOW DATABASES like 'hive*';

LIKE 子句允许使用正则表达式进行过滤,但是 SHOW 语句当中的 LIKE 子句只支持 *(通配符)和 |(条件或)两个符号。例如 employeesemp *emp * | * ees,所有这些都将匹配名为 employees 的数据库。

2. 查看表的列表

1
2
3
4
5
-- 语法
SHOW TABLES [IN database_name] ['identifier_with_wildcards'];

-- 示例
SHOW TABLES IN default;

3. 查看视图列表

1
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --仅支持 Hive 2.2.0

4. 查看表的分区列表

1
SHOW PARTITIONS table_name;

5. 查看表/视图的创建语句

1
SHOW CREATE TABLE ([db_name.]table_name|view_name);

参考资料

Hive 常用 DML 操作

加载文件数据到表

语法

1
2
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]
INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  • LOCAL 关键字代表从本地文件系统加载文件,省略则代表从 HDFS 上加载文件:
  • 从本地文件系统加载文件时, filepath 可以是绝对路径也可以是相对路径 (建议使用绝对路径);
  • 从 HDFS 加载文件时候,filepath 为文件完整的 URL 地址:如 hdfs://namenode:port/user/hive/project/ data1
  • filepath 可以是文件路径 (在这种情况下 Hive 会将文件移动到表中),也可以目录路径 (在这种情况下,Hive 会将该目录中的所有文件移动到表中);
  • 如果使用 OVERWRITE 关键字,则将删除目标表(或分区)的内容,使用新的数据填充;不使用此关键字,则数据以追加的方式加入;
  • 加载的目标可以是表或分区。如果是分区表,则必须指定加载数据的分区;
  • 加载文件的格式必须与建表时使用 STORED AS 指定的存储格式相同。

使用建议:

不论是本地路径还是 URL 都建议使用完整的。虽然可以使用不完整的 URL 地址,此时 Hive 将使用 hadoop 中的 fs.default.name 配置来推断地址,但是为避免不必要的错误,建议使用完整的本地路径或 URL 地址;

加载对象是分区表时建议显示指定分区。在 Hive 3.0 之后,内部将加载 (LOAD) 重写为 INSERT AS SELECT,此时如果不指定分区,INSERT AS SELECT 将假设最后一组列是分区列,如果该列不是表定义的分区,它将抛出错误。为避免错误,还是建议显示指定分区。

示例

新建分区表:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE emp_ptn(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

从 HDFS 上加载数据到分区表:

1
LOAD DATA  INPATH "hdfs://hadoop001:8020/mydir/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=20);

emp.txt 文件可在本仓库的 resources 目录中下载

加载后表中数据如下,分区列 deptno 全部赋值成 20:

查询结果插入到表

语法

1
2
3
4
5
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]]
select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)]
select_statement1 FROM from_statement;
  • Hive 0.13.0 开始,建表时可以通过使用 TBLPROPERTIES(“immutable”=“true”)来创建不可变表 (immutable table) ,如果不可以变表中存在数据,则 INSERT INTO 失败。(注:INSERT OVERWRITE 的语句不受 immutable 属性的影响);

  • 可以对表或分区执行插入操作。如果表已分区,则必须通过指定所有分区列的值来指定表的特定分区;

  • 从 Hive 1.1.0 开始,TABLE 关键字是可选的;

  • 从 Hive 1.2.0 开始 ,可以采用 INSERT INTO tablename(z,x,c1) 指明插入列;

  • 可以将 SELECT 语句的查询结果插入多个表(或分区),称为多表插入。语法如下:

    1
    2
    3
    4
    5
    FROM from_statement
    INSERT OVERWRITE TABLE tablename1
    [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
    [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
    [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

动态插入分区

1
2
3
4
5
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

在向分区表插入数据时候,分区列名是必须的,但是列值是可选的。如果给出了分区列值,我们将其称为静态分区,否则它是动态分区。动态分区列必须在 SELECT 语句的列中最后指定,并且与它们在 PARTITION() 子句中出现的顺序相同。

注意:Hive 0.9.0 之前的版本动态分区插入是默认禁用的,而 0.9.0 之后的版本则默认启用。以下是动态分区的相关配置:

配置 默认值 说明
hive.exec.dynamic.partition true 需要设置为 true 才能启用动态分区插入
hive.exec.dynamic.partition.mode strict 在严格模式 (strict) 下,用户必须至少指定一个静态分区,以防用户意外覆盖所有分区,在非严格模式下,允许所有分区都是动态的
hive.exec.max.dynamic.partitions.pernode 100 允许在每个 mapper/reducer 节点中创建的最大动态分区数
hive.exec.max.dynamic.partitions 1000 允许总共创建的最大动态分区数
hive.exec.max.created.files 100000 作业中所有 mapper/reducer 创建的 HDFS 文件的最大数量
hive.error.on.empty.partition false 如果动态分区插入生成空结果,是否抛出异常

示例

(1)新建 emp 表,作为查询对象表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

-- 加载数据到 emp 表中 这里直接从本地加载
load data local inpath "/usr/file/emp.txt" into table emp;

完成后 emp 表中数据如下:

(2)为清晰演示,先清空 emp_ptn 表中加载的数据:

1
TRUNCATE TABLE emp_ptn;

(3)静态分区演示:从 emp 表中查询部门编号为 20 的员工数据,并插入 emp_ptn 表中,语句如下:

1
2
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno=20)
SELECT empno,ename,job,mgr,hiredate,sal,comm FROM emp WHERE deptno=20;

完成后 emp_ptn 表中数据如下:

(4)接着演示动态分区:

1
2
3
4
5
6
-- 由于我们只有一个分区,且还是动态分区,所以需要关闭严格默认。因为在严格模式下,用户必须至少指定一个静态分区
set hive.exec.dynamic.partition.mode=nonstrict;

-- 动态分区 此时查询语句的最后一列为动态分区列,即 deptno
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno)
SELECT empno,ename,job,mgr,hiredate,sal,comm,deptno FROM emp WHERE deptno=30;

完成后 emp_ptn 表中数据如下:

使用 SQL 语句插入值

1
2
INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2] ...)]
VALUES ( value [, value ...] )
  • 使用时必须为表中的每个列都提供值。不支持只向部分列插入值(可以为缺省值的列提供空值来消除这个弊端);
  • 如果目标表表支持 ACID 及其事务管理器,则插入后自动提交;
  • 不支持支持复杂类型 (array, map, struct, union) 的插入。

更新和删除数据

语法

更新和删除的语法比较简单,和关系型数据库一致。需要注意的是这两个操作都只能在支持 ACID 的表,也就是事务表上才能执行。

1
2
3
4
5
-- 更新
UPDATE tablename SET column = value [, column = value ...] [WHERE expression]

--删除
DELETE FROM tablename [WHERE expression]

示例

1. 修改配置

首先需要更改 hive-site.xml,添加如下配置,开启事务支持,配置完成后需要重启 Hive 服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>

2. 创建测试表

创建用于测试的事务表,建表时候指定属性 transactional = true 则代表该表是事务表。需要注意的是,按照官方文档 的说明,目前 Hive 中的事务表有以下限制:

  • 必须是 buckets Table;
  • 仅支持 ORC 文件格式;
  • 不支持 LOAD DATA …语句。
1
2
3
4
5
6
CREATE TABLE emp_ts(
empno int,
ename String
)
CLUSTERED BY (empno) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true");

3. 插入测试数据

1
INSERT INTO TABLE emp_ts  VALUES (1,"ming"),(2,"hong");

插入数据依靠的是 MapReduce 作业,执行成功后数据如下:

4. 测试更新和删除

1
2
3
4
5
--更新数据
UPDATE emp_ts SET ename = "lan" WHERE empno=1;

--删除数据
DELETE FROM emp_ts WHERE empno=2;

更新和删除数据依靠的也是 MapReduce 作业,执行成功后数据如下:

查询结果写出到文件系统

语法

1
2
3
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...
  • OVERWRITE 关键字表示输出文件存在时,先删除后再重新写入;

  • 和 Load 语句一样,建议无论是本地路径还是 URL 地址都使用完整的;

  • 写入文件系统的数据被序列化为文本,其中列默认由^A 分隔,行由换行符分隔。如果列不是基本类型,则将其序列化为 JSON 格式。其中行分隔符不允许自定义,但列分隔符可以自定义,如下:

    1
    2
    3
    4
    5
    6
    7
    -- 定义列分隔符为'\t'
    insert overwrite local directory './test-04'
    row format delimited
    FIELDS TERMINATED BY '\t'
    COLLECTION ITEMS TERMINATED BY ','
    MAP KEYS TERMINATED BY ':'
    select * from src;

示例

这里我们将上面创建的 emp_ptn 表导出到本地文件系统,语句如下:

1
2
3
4
INSERT OVERWRITE LOCAL DIRECTORY '/usr/file/ouput'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
SELECT * FROM emp_ptn;

导出结果如下:

参考资料

Hive 运维

Hive 安装

下载并解压

下载所需版本的 Hive,这里我下载版本为 cdh5.15.2。下载地址:http://archive.cloudera.com/cdh5/cdh/5/

1
2
# 下载后进行解压
tar -zxvf hive-1.1.0-cdh5.15.2.tar.gz

配置环境变量

1
# vim /etc/profile

添加环境变量:

1
2
export HIVE_HOME=/usr/app/hive-1.1.0-cdh5.15.2
export PATH=$HIVE_HOME/bin:$PATH

使得配置的环境变量立即生效:

1
# source /etc/profile

修改配置

1. hive-env.sh

进入安装目录下的 conf/ 目录,拷贝 Hive 的环境配置模板 flume-env.sh.template

1
cp hive-env.sh.template hive-env.sh

修改 hive-env.sh,指定 Hadoop 的安装路径:

1
HADOOP_HOME=/usr/app/hadoop-2.6.0-cdh5.15.2

2. hive-site.xml

新建 hive-site.xml 文件,内容如下,主要是配置存放元数据的 MySQL 的地址、驱动、用户名和密码等信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop001:3306/hadoop_hive?createDatabaseIfNotExist=true</value>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>

</configuration>

拷贝数据库驱动

将 MySQL 驱动包拷贝到 Hive 安装目录的 lib 目录下, MySQL 驱动的下载地址为:https://dev.mysql.com/downloads/connector/j/

初始化元数据库

  • 当使用的 hive 是 1.x 版本时,可以不进行初始化操作,Hive 会在第一次启动的时候会自动进行初始化,但不会生成所有的元数据信息表,只会初始化必要的一部分,在之后的使用中用到其余表时会自动创建;

  • 当使用的 hive 是 2.x 版本时,必须手动初始化元数据库。初始化命令:

    1
    2
    # schematool 命令在安装目录的 bin 目录下,由于上面已经配置过环境变量,在任意位置执行即可
    schematool -dbType mysql -initSchema

这里我使用的是 CDH 的 hive-1.1.0-cdh5.15.2.tar.gz,对应 Hive 1.1.0 版本,可以跳过这一步。

启动

由于已经将 Hive 的 bin 目录配置到环境变量,直接使用以下命令启动,成功进入交互式命令行后执行 show databases 命令,无异常则代表搭建成功。

1
# hive

img

在 Mysql 中也能看到 Hive 创建的库和存放元数据信息的表

img

HiveServer2/beeline

Hive 内置了 HiveServer 和 HiveServer2 服务,两者都允许客户端使用多种编程语言进行连接,但是 HiveServer 不能处理多个客户端的并发请求,因此产生了 HiveServer2。

HiveServer2(HS2)允许远程客户端可以使用各种编程语言向 Hive 提交请求并检索结果,支持多客户端并发访问和身份验证。HS2 是由多个服务组成的单个进程,其包括基于 Thrift 的 Hive 服务(TCP 或 HTTP)和用于 Web UI 的 Jetty Web 服务。

HiveServer2 拥有自己的 CLI 工具——Beeline。Beeline 是一个基于 SQLLine 的 JDBC 客户端。由于目前 HiveServer2 是 Hive 开发维护的重点,所以官方更加推荐使用 Beeline 而不是 Hive CLI。以下主要讲解 Beeline 的配置方式。

修改 Hadoop 配置

修改 hadoop 集群的 core-site.xml 配置文件,增加如下配置,指定 hadoop 的 root 用户可以代理本机上所有的用户。

1
2
3
4
5
6
7
8
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>

之所以要配置这一步,是因为 hadoop 2.0 以后引入了安全伪装机制,使得 hadoop 不允许上层系统(如 hive)直接将实际用户传递到 hadoop 层,而应该将实际用户传递给一个超级代理,由该代理在 hadoop 上执行操作,以避免任意客户端随意操作 hadoop。如果不配置这一步,在之后的连接中可能会抛出 AuthorizationException 异常。

关于 Hadoop 的用户代理机制,可以参考:hadoop 的用户代理机制Superusers Acting On Behalf Of Other Users

启动 hiveserver2

由于上面已经配置过环境变量,这里直接启动即可:

1
# nohup hiveserver2 &

使用 beeline

可以使用以下命令进入 beeline 交互式命令行,出现 Connected 则代表连接成功。

1
beeline -u jdbc:hive2://hadoop001:10000 -n root

Beeline 选项

Beeline 拥有更多可使用参数,可以使用 beeline --help 查看,完整参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Usage: java org.apache.hive.cli.beeline.BeeLine
-u <database url> the JDBC URL to connect to
-r reconnect to last saved connect url (in conjunction with !save)
-n <username> the username to connect as
-p <password> the password to connect as
-d <driver class> the driver class to use
-i <init file> script file for initialization
-e <query> query that should be executed
-f <exec file> script file that should be executed
-w (or) --password-file <password file> the password file to read password from
--hiveconf property=value Use value for given property
--hivevar name=value hive variable name and value
This is Hive specific settings in which variables
can be set at session level and referenced in Hive
commands or queries.
--property-file=<property-file> the file to read connection properties (url, driver, user, password) from
--color=[true/false] control whether color is used for display
--showHeader=[true/false] show column names in query results
--headerInterval=ROWS; the interval between which heades are displayed
--fastConnect=[true/false] skip building table/column list for tab-completion
--autoCommit=[true/false] enable/disable automatic transaction commit
--verbose=[true/false] show verbose error messages and debug info
--showWarnings=[true/false] display connection warnings
--showNestedErrs=[true/false] display nested errors
--numberFormat=[pattern] format numbers using DecimalFormat pattern
--force=[true/false] continue running script even after errors
--maxWidth=MAXWIDTH the maximum width of the terminal
--maxColumnWidth=MAXCOLWIDTH the maximum width to use when displaying columns
--silent=[true/false] be more silent
--autosave=[true/false] automatically save preferences
--outputformat=[table/vertical/csv2/tsv2/dsv/csv/tsv] format mode for result display
--incrementalBufferRows=NUMROWS the number of rows to buffer when printing rows on stdout,
defaults to 1000; only applicable if --incremental=true
and --outputformat=table
--truncateTable=[true/false] truncate table column when it exceeds length
--delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)
--isolation=LEVEL set the transaction isolation level
--nullemptystring=[true/false] set to true to get historic behavior of printing null as empty string
--maxHistoryRows=MAXHISTORYROWS The maximum number of rows to store beeline history.
--convertBinaryArrayToString=[true/false] display binary column data as string or as byte array
--help display this message

常用参数

在 Hive CLI 中支持的参数,Beeline 都支持,常用的参数如下。更多参数说明可以参见官方文档 Beeline Command Options

参数 说明
-u 数据库地址
-n 用户名
-p 密码
-d 驱动 (可选)
-e* 执行 SQL 命令
-f* 执行 SQL 脚本
-i (or)--init 在进入交互模式之前运行初始化脚本
--property-file 指定配置文件
--hiveconf property=value 指定配置属性
--hivevar name=value 用户自定义属性,在会话级别有效

示例: 使用用户名和密码连接 Hive

1
beeline -u jdbc:hive2://localhost:10000  -n username -p password

Hive 命令

Help

使用 hive -H 或者 hive --help 命令可以查看所有命令的帮助,显示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
usage: hive
-d,--define <key=value> Variable subsitution to apply to hive
commands. e.g. -d A=B or --define A=B --定义用户自定义变量
--database <databasename> Specify the database to use -- 指定使用的数据库
-e <quoted-query-string> SQL from command line -- 执行指定的 SQL
-f <filename> SQL from files --执行 SQL 脚本
-H,--help Print help information -- 打印帮助信息
--hiveconf <property=value> Use value for given property --自定义配置
--hivevar <key=value> Variable subsitution to apply to hive --自定义变量
commands. e.g. --hivevar A=B
-i <filename> Initialization SQL file --在进入交互模式之前运行初始化脚本
-S,--silent Silent mode in interactive shell --静默模式
-v,--verbose Verbose mode (echo executed SQL to the console) --详细模式

交互式命令行

直接使用 Hive 命令,不加任何参数,即可进入交互式命令行。

执行 SQL 命令

在不进入交互式命令行的情况下,可以使用 hive -e执行 SQL 命令。

1
hive -e 'select * from emp';

img

执行 SQL 脚本

用于执行的 sql 脚本可以在本地文件系统,也可以在 HDFS 上。

1
2
3
4
5
# 本地文件系统
hive -f /usr/file/simple.sql;

# HDFS文件系统
hive -f hdfs://hadoop001:8020/tmp/simple.sql;

其中 simple.sql 内容如下:

1
select * from emp;

配置 Hive 变量

可以使用 --hiveconf 设置 Hive 运行时的变量。

1
2
3
hive -e 'select * from emp' \
--hiveconf hive.exec.scratchdir=/tmp/hive_scratch \
--hiveconf mapred.reduce.tasks=4;

hive.exec.scratchdir:指定 HDFS 上目录位置,用于存储不同 map/reduce 阶段的执行计划和这些阶段的中间输出结果。

配置文件启动

使用 -i 可以在进入交互模式之前运行初始化脚本,相当于指定配置文件启动。

1
hive -i /usr/file/hive-init.conf;

其中 hive-init.conf 的内容如下:

1
set hive.exec.mode.local.auto = true;

hive.exec.mode.local.auto 默认值为 false,这里设置为 true ,代表开启本地模式。

用户自定义变量

--define--hivevar在功能上是等价的,都是用来实现自定义变量,这里给出一个示例:

定义变量:

1
hive  --define  n=ename --hiveconf  --hivevar j=job;

在查询中引用自定义变量:

1
2
3
4
5
6
7
# 以下两条语句等价
hive > select ${n} from emp;
hive > select ${hivevar:n} from emp;

# 以下两条语句等价
hive > select ${j} from emp;
hive > select ${hivevar:j} from emp;

结果如下:

img

Hive 配置

可以通过三种方式对 Hive 的相关属性进行配置,分别介绍如下:

配置文件

方式一为使用配置文件,使用配置文件指定的配置是永久有效的。Hive 有以下三个可选的配置文件:

  • hive-site.xml - Hive 的主要配置文件;
  • hivemetastore-site.xml - 关于元数据的配置;
  • hiveserver2-site.xml - 关于 HiveServer2 的配置。

示例如下,在 hive-site.xml 配置 hive.exec.scratchdir

1
2
3
4
5
<property>
<name>hive.exec.scratchdir</name>
<value>/tmp/mydir</value>
<description>Scratch space for Hive jobs</description>
</property>

hiveconf

方式二为在启动命令行 (Hive CLI / Beeline) 的时候使用 --hiveconf 指定配置,这种方式指定的配置作用于整个 Session。

1
hive --hiveconf hive.exec.scratchdir=/tmp/mydir

set

方式三为在交互式环境下 (Hive CLI / Beeline),使用 set 命令指定。这种设置的作用范围也是 Session 级别的,配置对于执行该命令后的所有命令生效。set 兼具设置参数和查看参数的功能。如下:

1
2
3
4
5
6
7
8
0: jdbc:hive2://hadoop001:10000> set hive.exec.scratchdir=/tmp/mydir;
No rows affected (0.025 seconds)
0: jdbc:hive2://hadoop001:10000> set hive.exec.scratchdir;
+----------------------------------+--+
| set |
+----------------------------------+--+
| hive.exec.scratchdir=/tmp/mydir |
+----------------------------------+--+

配置优先级

配置的优先顺序如下 (由低到高):
hive-site.xml - >hivemetastore-site.xml- > hiveserver2-site.xml - >-- hiveconf- > set

配置参数

Hive 可选的配置参数非常多,在用到时查阅官方文档即可AdminManual Configuration

参考资料

HDFS 入门

HDFS 是 Hadoop 分布式文件系统。

关键词:分布式、文件系统

HDFS 简介

HDFSHadoop Distributed File System 的缩写,即 Hadoop 的分布式文件系统。

HDFS 是一种用于存储具有流数据访问模式的超大文件的文件系统,它运行在廉价的机器集群上。

HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供数以 PB 计的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

HDFS 的优点

  • 高容错 - 数据冗余多副本,副本丢失后自动恢复
  • 高可用 - NameNode HA、安全模式
  • 高扩展 - 能够处理 10K 节点的规模;处理数据达到 GB、TB、甚至 PB 级别的数据;能够处理百万规模以上的文件数量,数量相当之大。
  • 批处理 - 流式数据访问;数据位置暴露给计算框架
  • 构建在廉价商用机器上 - 提供了容错和恢复机制

HDFS 的缺点

  • 不适合低延迟数据访问 - 适合高吞吐率的场景,就是在某一时间内写入大量的数据。但是它在低延时的情况下是不行的,比如毫秒级以内读取数据,它是很难做到的。
  • 不适合大量小文件存储
    • 存储大量小文件(这里的小文件是指小于 HDFS 系统的 Block 大小的文件(默认 64M))的话,它会占用 NameNode 大量的内存来存储文件、目录和块信息。这样是不可取的,因为 NameNode 的内存总是有限的。
    • 磁盘寻道时间超过读取时间
  • 不支持并发写入 - 一个文件同时只能有一个写入者
  • 不支持文件随机修改 - 仅支持追加写入

HDFS 架构

HDFS 采用主从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成。

集群中的 Datanode 一般是一个节点一个,负责管理它所在节点上的存储。HDFS 暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组 Datanode 上。Namenode 执行文件系统的名字空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体 Datanode 节点的映射。

img

NameNode

NameNode 负责管理文件系统的命名空间以及客户端对文件的访问。NameNode 的职责:

  • 管理命名空间
  • 管理元数据:文件的位置、所有者、权限、数据块等
  • 管理 Block 副本策略:默认 3 个副本
  • 处理客户端读写请求,为 DataNode 分配任务

DataNode

DataNode 负责文件数据的存储和读写操作,HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中

  • 存储 Block 和数据校验和
  • 执行客户端发送的读写操作
  • 通过心跳机制定期(默认 3 秒)向 NameNode 汇报运行状态和 Block 列表信息
  • 集群启动时,DataNode 向 NameNode 提供 Block 列表信息

命名空间

HDFS 的 文件系统命名空间 的层次结构与大多数文件系统类似 (如 Linux), 支持目录和文件的创建、移动、删除和重命名等操作,支持配置用户和访问权限,但不支持硬链接和软连接。NameNode 负责维护文件系统名称空间,记录对名称空间或其属性的任何更改。

Block 数据块

  • HDFS 最小存储单元
  • 文件写入 HDFS 会被切分成若干个 Block
  • Block 大小固定,默认为 128MB,可自定义
  • 若一个 Block 的大小小于设定值,不会占用整个块空间
  • 默认情况下每个 Block 有 3 个副本

Client

  • 将文件切分为 Block 数据块
  • 与 NameNode 交互,获取文件元数据
  • 与 DataNode 交互,读取或写入数据
  • 管理 HDFS

HDFS 数据流

HDFS 读文件

img

  1. 客户端调用 FileSyste 对象的 open() 方法在分布式文件系统中打开要读取的文件
  2. 分布式文件系统通过使用 RPC(远程过程调用)来调用 namenode,确定文件起始块的位置
  3. 分布式文件系统的 DistributedFileSystem 类返回一个支持文件定位的输入流 FSDataInputStream 对象,FSDataInputStream 对象接着封装 DFSInputStream 对象(存储着文件起始几个块的 datanode 地址),客户端对这个输入流调用 read()方法。
  4. DFSInputStream 连接距离最近的 datanode,通过反复调用 read 方法,将数据从 datanode 传输到客户端
  5. 到达块的末端时,DFSInputStream 关闭与该 datanode 的连接,寻找下一个块的最佳 datanode
  6. 客户端完成读取,对 FSDataInputStream 调用 close()方法关闭连接

HDFS 写文件

img

  1. 客户端通过对 DistributedFileSystem 对象调用 create() 函数来新建文件
  2. 分布式文件系统对 namenod 创建一个 RPC 调用,在文件系统的命名空间中新建一个文件
  3. Namenode 对新建文件进行检查无误后,分布式文件系统返回给客户端一个 FSDataOutputStream 对象,FSDataOutputStream 对象封装一个 DFSoutPutstream 对象,负责处理 namenode 和 datanode 之间的通信,客户端开始写入数据
  4. FSDataOutputStream 将数据分成一个一个的数据包,写入内部队列“数据队列”,DataStreamer 负责将数据包依次流式传输到由一组 namenode 构成的管线中。
  5. DFSOutputStream 维护着确认队列来等待 datanode 收到确认回执,收到管道中所有 datanode 确认后,数据包从确认队列删除。
  6. 客户端完成数据的写入,对数据流调用 close() 方法。
  7. namenode 确认完成

HDFS 数据复制

由于 Hadoop 被设计运行在廉价的机器上,这意味着硬件是不可靠的,为了保证容错性,HDFS 提供了数据复制机制。HDFS 将每一个文件存储为一系列,每个块由多个副本来保证容错,块的大小和复制因子可以自行配置(默认情况下,块大小是 128M,默认复制因子是 3)。

img

Namenode 全权管理数据块的复制,它周期性地从集群中的每个 Datanode 接收心跳信号和块状态报告(Blockreport)。接收到心跳信号意味着该 Datanode 节点工作正常。块状态报告包含了一个该 Datanode 上所有数据块的列表。

img

大型的 HDFS 实例在通常分布在多个机架的多台服务器上,不同机架上的两台服务器之间通过交换机进行通讯。在大多数情况下,同一机架中的服务器间的网络带宽大于不同机架中的服务器之间的带宽。因此 HDFS 采用机架感知副本放置策略,对于常见情况,当复制因子为 3 时,HDFS 的放置策略是:

  • 副本 1:放在 Client 所在节点
    • 对于远程 Client,系统会随机选择节点
  • 副本 2:放在不同的机架节点上
  • 副本 3:放在与第二个副本同一机架的不同节点上
  • 副本 N:随机选择
  • 节点选择:同等条件下优先选择空闲节点

为了最大限度地减少带宽消耗和读取延迟,HDFS 在执行读取请求时,优先读取距离读取器最近的副本。如果在与读取器节点相同的机架上存在副本,则优先选择该副本。如果 HDFS 群集跨越多个数据中心,则优先选择本地数据中心上的副本。

HDFS 高可用

数据存储故障容错

磁盘介质在存储过程中受环境或者老化影响,其存储的数据可能会出现错乱。HDFS 的应对措施是,对于存储在 DataNode 上的数据块,计算并存储校验和(CheckSum)。在读取数据的时候,重新计算读取出来的数据的校验和,如果校验不正确就抛出异常,应用程序捕获异常后就到其他 DataNode 上读取备份数据。

磁盘故障容错

如果 DataNode 监测到本机的某块磁盘损坏,就将该块磁盘上存储的所有 BlockID 报告给 NameNode,NameNode 检查这些数据块还在哪些 DataNode 上有备份,通知相应的 DataNode 服务器将对应的数据块复制到其他服务器上,以保证数据块的备份数满足要求。

DataNode 故障容错

DataNode 会通过心跳和 NameNode 保持通信,如果 DataNode 超时未发送心跳,NameNode 就会认为这个 DataNode 已经宕机失效,立即查找这个 DataNode 上存储的数据块有哪些,以及这些数据块还存储在哪些服务器上,随后通知这些服务器再复制一份数据块到其他服务器上,保证 HDFS 存储的数据块备份数符合用户设置的数目,即使再出现服务器宕机,也不会丢失数据。

NameNode 故障容错

NameNode 是整个 HDFS 的核心,记录着 HDFS 文件分配表信息,所有的文件路径和数据块存储信息都保存在 NameNode,如果 NameNode 故障,整个 HDFS 系统集群都无法使用;如果 NameNode 上记录的数据丢失,整个集群所有 DataNode 存储的数据也就没用了。

NameNode 的 HA 机制

NameNode 通过 Active NameNode 和 Standby NameNode 实现主备。

  • Active NameNode - 是正在工作的 NameNode;
  • Standby NameNode - 是备份的 NameNode。

Active NameNode 宕机后,Standby NameNode 快速升级为新的 Active NameNode。

Standby NameNode 周期性同步 edits 编辑日志,定期合并 fsimage 与 edits 到本地磁盘。

Hadoop 3.0 允许配置多个 Standby NameNode。

元数据文件

  • edits(编辑日志文件) - 保存了自最新检查点(Checkpoint)之后的所有文件更新操作。
  • fsimage(元数据检查点镜像文件) - 保存了文件系统中所有的目录和文件信息,如:某个目录下有哪些子目录和文件,以及文件名、文件副本数、文件由哪些 Block 组成等。

Active NameNode 内存中有一份最新的元数据(= fsimage + edits)。

Standby NameNode 在检查点定期将内存中的元数据保存到 fsimage 文件中。

利用 QJM 实现元数据高可用

基于 Paxos 算法

QJM 机制(Quorum Journal Manager)

只要保证 Quorum(法定人数)数量的操作成功,就认为这是一次最终成功的操作

QJM 共享存储系统

  • 部署奇数(2N+1)个 JournalNode
  • JournalNode 负责存储 edits 编辑日志
  • 写 edits 的时候,只要超过半数(N+1)的 JournalNode 返回成功,就代表本次写入成功
  • 最多可容忍 N 个 JournalNode 宕机

利用 ZooKeeper 实现 Active 节点选举。

附:图解 HDFS 存储原理

说明:以下图片引用自博客:翻译经典 HDFS 原理讲解漫画

HDFS 写数据原理

img

img

img

HDFS 读数据原理

img

HDFS 故障类型和其检测方法

img

img

第二部分:读写故障的处理

img

第三部分:DataNode 故障处理

img

副本布局策略

img

参考资料

HDFS 运维

HDFS 命令

显示当前目录结构

1
2
3
4
5
6
# 显示当前目录结构
hdfs dfs -ls <path>
# 递归显示当前目录结构
hdfs dfs -ls -R <path>
# 显示根目录下内容
hdfs dfs -ls /

创建目录

1
2
3
4
# 创建目录
hdfs dfs -mkdir <path>
# 递归创建目录
hdfs dfs -mkdir -p <path>

删除操作

1
2
3
4
# 删除文件
hdfs dfs -rm <path>
# 递归删除目录和文件
hdfs dfs -rm -R <path>

导入文件到 HDFS

1
2
3
# 二选一执行即可
hdfs dfs -put [localsrc] [dst]
hdfs dfs -copyFromLocal [localsrc] [dst]

从 HDFS 导出文件

1
2
3
# 二选一执行即可
hdfs dfs -get [dst] [localsrc]
hdfs dfs -copyToLocal [dst] [localsrc]

查看文件内容

1
2
3
# 二选一执行即可
hdfs dfs -text <path>
hdfs dfs -cat <path>

显示文件的最后一千字节

1
2
3
hdfs dfs -tail <path>
# 和Linux下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hdfs dfs -tail -f <path>

拷贝文件

1
hdfs dfs -cp [src] [dst]

移动文件

1
hdfs dfs -mv [src] [dst]

统计当前目录下各文件大小

  • 默认单位字节
  • -s : 显示所有文件大小总和,
  • -h : 将以更友好的方式显示文件大小(例如 64.0m 而不是 67108864)
1
hdfs dfs -du <path>

合并下载多个文件

  • -nl 在每个文件的末尾添加换行符(LF)
  • -skip-empty-file 跳过空文件
1
2
3
hdfs dfs -getmerge
# 示例 将HDFS上的hbase-policy.xml和hbase-site.xml文件合并后下载到本地的/usr/test.xml
hdfs dfs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml

统计文件系统的可用空间信息

1
hdfs dfs -df -h /

更改文件复制因子

1
hdfs dfs -setrep [-R] [-w] <numReplicas> <path>
  • 更改文件的复制因子。如果 path 是目录,则更改其下所有文件的复制因子
  • -w : 请求命令是否等待复制完成
1
2
# 示例
hdfs dfs -setrep -w 3 /user/hadoop/dir1

权限控制

1
2
3
4
5
6
7
# 权限控制和Linux上使用方式一致
# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hdfs dfs -chgrp [-R] GROUP URI [URI ...]
# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
# 修改文件的拥有者 用户必须是超级用户。
hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

文件检测

1
hdfs dfs -test - [defsz]  URI

可选选项:

  • -d:如果路径是目录,返回 0。
  • -e:如果路径存在,则返回 0。
  • -f:如果路径是文件,则返回 0。
  • -s:如果路径不为空,则返回 0。
  • -r:如果路径存在且授予读权限,则返回 0。
  • -w:如果路径存在且授予写入权限,则返回 0。
  • -z:如果文件长度为零,则返回 0。
1
2
# 示例
hdfs dfs -test -e filename

HDFS 安全模式

什么是安全模式?

  • 安全模式是 HDFS 的一种特殊状态,在这种状态下,HDFS 只接收读数据请求,而不接收写入、删除、修改等变更请求。
  • 安全模式是 HDFS 确保 Block 数据安全的一种保护机制。
  • Active NameNode 启动时,HDFS 会进入安全模式,DataNode 主动向 NameNode 汇报可用 Block 列表等信息,在系统达到安全标准前,HDFS 一直处于“只读”状态。

何时正常离开安全模式

  • Block 上报率:DataNode 上报的可用 Block 个数 / NameNode 元数据记录的 Block 个数
  • 当 Block 上报率 >= 阈值时,HDFS 才能离开安全模式,默认阈值为 0.999
  • 不建议手动强制退出安全模式

触发安全模式的原因

  • NameNode 重启
  • NameNode 磁盘空间不足
  • Block 上报率低于阈值
  • DataNode 无法正常启动
  • 日志中出现严重异常
  • 用户操作不当,如:强制关机(特别注意!)

故障排查

  • 找到 DataNode 不能正常启动的原因,重启 DataNode
  • 清理 NameNode 磁盘
  • 谨慎操作,有问题找星环,以免丢失数据

参考资料

HDFS Java API

想要使用 HDFS API,需要导入依赖 hadoop-client。如果是 CDH 版本的 Hadoop,还需要额外指明其仓库地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.heibaiying</groupId>
<artifactId>hdfs-java-api</artifactId>
<version>1.0</version>


<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0-cdh5.15.2</hadoop.version>
</properties>


<!---配置 CDH 仓库地址-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>


<dependencies>
<!--Hadoop-client-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>

FileSystem

FileSystem 是所有 HDFS 操作的主入口。由于之后的每个单元测试都需要用到它,这里使用 @Before 注解进行标注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;

@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的 Hadoop,所以副本系数设置为 1,默认值为 3
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}


@After
public void destroy() {
fileSystem = null;
}

创建目录

支持递归创建目录:

1
2
3
4
@Test
public void mkDir() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}

创建指定权限的目录

FsPermission(FsAction u, FsAction g, FsAction o) 的三个参数分别对应:创建者权限,同组其他用户权限,其他用户权限,权限值定义在 FsAction 枚举类中。

1
2
3
4
5
@Test
public void mkDirWithPermission() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}

创建文件,并写入内容

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void create() throws Exception {
// 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
true, 4096);
out.write("hello hadoop!".getBytes());
out.write("hello spark!".getBytes());
out.write("hello flink!".getBytes());
// 强制将缓冲区中内容刷出
out.flush();
out.close();
}

判断文件是否存在

1
2
3
4
5
@Test
public void exist() throws Exception {
boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
System.out.println(exists);
}

查看文件内容

查看小文本文件的内容,直接转换成字符串后输出:

1
2
3
4
5
6
@Test
public void readToString() throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);
}

inputStreamToString 是一个自定义方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 把输入流转换为指定编码的字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("\n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

文件重命名

1
2
3
4
5
6
7
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfs-api/test/a.txt");
Path newPath = new Path("/hdfs-api/test/b.txt");
boolean result = fileSystem.rename(oldPath, newPath);
System.out.println(result);
}

删除目录或文件

1
2
3
4
5
6
7
8
9
public void delete() throws Exception {
/*
* 第二个参数代表是否递归删除
* + 如果 path 是一个目录且递归删除为 true, 则删除该目录及其中所有文件;
* + 如果 path 是一个目录但递归删除为 false,则会则抛出异常。
*/
boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
System.out.println(result);
}

上传文件到 HDFS

1
2
3
4
5
6
7
@Test
public void copyFromLocalFile() throws Exception {
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:\\BigData-Notes\\notes\\installation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);
}

上传大文件并显示上传进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void copyFromLocalBigFile() throws Exception {

File file = new File("D:\\kafka.tgz");
final float fileSize = file.length();
InputStream in = new BufferedInputStream(new FileInputStream(file));

FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
new Progressable() {
long fileCount = 0;

public void progress() {
fileCount++;
// progress 方法每上传大约 64KB 的数据后就会被调用一次
System.out.println("上传进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
}
});

IOUtils.copyBytes(in, out, 4096);

}

从 HDFS 上下载文件

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:\\app\\");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是 true,即删除;
* 最后一个参数表示是否将 RawLocalFileSystem 用作本地文件系统;
* RawLocalFileSystem 默认为 false,通常情况下可以不设置,
* 但如果你在执行时候抛出 NullPointerException 异常,则代表你的文件系统与程序可能存在不兼容的情况 (window 下常见),
* 此时可以将 RawLocalFileSystem 设置为 true
*/
fileSystem.copyToLocalFile(false, src, dst, true);
}

查看指定目录下所有文件的信息

1
2
3
4
5
6
7
public void listFiles() throws Exception {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
for (FileStatus fileStatus : statuses) {
//fileStatus 的 toString 方法被重写过,直接打印可以看到所有信息
System.out.println(fileStatus.toString());
}
}

FileStatus 中包含了文件的基本信息,比如文件路径,是否是文件夹,修改时间,访问时间,所有者,所属组,文件权限,是否是符号链接等,输出内容示例如下:

1
2
3
4
5
6
7
8
9
10
FileStatus{
path=hdfs://192.168.0.106:8020/hdfs-api/test;
isDirectory=true;
modification_time=1556680796191;
access_time=0;
owner=root;
group=supergroup;
permission=rwxr-xr-x;
isSymlink=false
}

递归查看指定目录下所有文件的信息

1
2
3
4
5
6
7
@Test
public void listFilesRecursive() throws Exception {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hbase"), true);
while (files.hasNext()) {
System.out.println(files.next());
}
}

和上面输出类似,只是多了文本大小,副本系数,块大小信息。

1
2
3
4
5
6
7
8
9
10
11
LocatedFileStatus{
path=hdfs://192.168.0.106:8020/hbase/hbase.version;
isDirectory=false;
length=7;
replication=1;
blocksize=134217728;
modification_time=1554129052916;
access_time=1554902661455;
owner=root; group=supergroup;
permission=rw-r--r--;
isSymlink=false}

查看文件的块信息

1
2
3
4
5
6
7
8
9
@Test
public void getFileBlockLocations() throws Exception {

FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blocks) {
System.out.println(block);
}
}

块输出信息有三个值,分别是文件的起始偏移量 (offset),文件大小 (length),块所在的主机名 (hosts)。

1
0,57028557,hadoop001

这里我上传的文件只有 57M(小于 128M),且程序中设置了副本系数为 1,所有只有一个块信息。

Java 容器之 Queue

Queue 简介

Queue 接口

Queue 接口定义如下:

1
public interface Queue<E> extends Collection<E> {}

AbstractQueue 抽象类

AbstractQueue 类提供 Queue 接口的核心实现,以最大限度地减少实现 Queue 接口所需的工作。

AbstractQueue 抽象类定义如下:

1
2
3
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {}

Deque 接口

Deque 接口是 double ended queue 的缩写,即双端队列。Deque 继承 Queue 接口,并扩展支持在队列的两端插入和删除元素

所以提供了特定的方法,如:

大多数的实现对元素的数量没有限制,但这个接口既支持有容量限制的 deque,也支持没有固定大小限制的。

ArrayDeque

ArrayDequeDeque 的顺序表实现。

ArrayDeque 用一个动态数组实现了栈和队列所需的所有操作。

LinkedList

LinkedListDeque 的链表实现。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LinkedListQueueDemo {

public static void main(String[] args) {
//add()和remove()方法在失败的时候会抛出异常(不推荐)
Queue<String> queue = new LinkedList<>();

queue.offer("a"); // 入队
queue.offer("b"); // 入队
queue.offer("c"); // 入队
for (String q : queue) {
System.out.println(q);
}
System.out.println("===");
System.out.println("poll=" + queue.poll()); // 出队
for (String q : queue) {
System.out.println(q);
}
System.out.println("===");
System.out.println("element=" + queue.element()); //返回第一个元素
for (String q : queue) {
System.out.println(q);
}
System.out.println("===");
System.out.println("peek=" + queue.peek()); //返回第一个元素
for (String q : queue) {
System.out.println(q);
}
}

}

PriorityQueue

PriorityQueue 类定义如下:

1
2
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable {}

PriorityQueue 要点:

  • PriorityQueue 实现了 Serializable,支持序列化。
  • PriorityQueue 类是无界优先级队列。
  • PriorityQueue 中的元素根据自然顺序或 Comparator 提供的顺序排序。
  • PriorityQueue 不接受 null 值元素。
  • PriorityQueue 不是线程安全的。

参考资料

Java NIO

关键词:ChannelBufferSelector非阻塞多路复用

NIO 简介

NIO 是一种同步非阻塞的 I/O 模型,在 Java 1.4 中引入了 NIO 框架,对应 java.nio 包,提供了 ChannelSelectorBuffer 等抽象。

NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 SocketServerSocket 相对应的 SocketChannelServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞 I/O 来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

NIO 和 BIO 的区别

Non-blocking IO(非阻塞)

BIO 是阻塞的,NIO 是非阻塞的

BIO 的各种流是阻塞的。这意味着,当一个线程调用 read()write() 时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。在此期间,该线程不能再干其他任何事。

NIO 使我们可以进行非阻塞 IO 操作。比如说,单线程中从通道读取数据到 buffer,同时可以继续做别的事情,当数据读取到 buffer 中后,线程再继续处理数据。写数据也是一样的。另外,非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

Buffer(缓冲区)

**BIO 面向流(Stream oriented),而 NIO 面向缓冲区(Buffer oriented)**。

Buffer 是一个对象,它包含一些要写入或者要读出的数据。在 NIO 类库中加入 Buffer 对象,体现了 NIO 与 BIO 的一个重要区别。在面向流的 BIO 中可以将数据直接写入或者将数据直接读到 Stream 对象中。虽然 Stream 中也有 Buffer 开头的扩展类,但只是流的包装类,还是从流读到缓冲区,而 NIO 却是直接读到 Buffer 中进行操作。

在 NIO 厍中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读缓冲区中的数据; 在写入数据时,写入到缓冲区中。任何时候访问 NIO 中的数据,都是通过缓冲区进行操作。

最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能用于操作 byte 数组。除了 ByteBuffer,还有其他的一些缓冲区,事实上,每一种 Java 基本类型(除了 Boolean 类型)都对应有一种缓冲区。

Channel (通道)

NIO 通过 Channel(通道) 进行读写。

通道是双向的,可读也可写,而流的读写是单向的。无论读写,通道只能和 Buffer 交互。因为 Buffer,通道可以异步地读写。

Selector (选择器)

NIO 有选择器,而 IO 没有。

选择器用于使用单个线程处理多个通道。因此,它需要较少的线程来处理这些通道。线程之间的切换对于操作系统来说是昂贵的。 因此,为了提高系统效率选择器是有用的。

NIO 的基本流程

通常来说 NIO 中的所有 IO 都是从 Channel(通道) 开始的。

  • 从通道进行数据读取 :创建一个缓冲区,然后请求通道读取数据。
  • 从通道进行数据写入 :创建一个缓冲区,填充数据,并要求通道写入数据。

NIO 核心组件

NIO 包含下面几个核心的组件:

  • Channel(通道)
  • Buffer(缓冲区)
  • Selector(选择器)

Channel(通道)

通道(Channel)是对 BIO 中的流的模拟,可以通过它读写数据。

Channel,类似在 Linux 之类操作系统上看到的文件描述符,是 NIO 中被用来支持批量式 IO 操作的一种抽象。

File 或者 Socket,通常被认为是比较高层次的抽象,而 Channel 则是更加操作系统底层的一种抽象,这也使得 NIO 得以充分利用现代操作系统底层机制,获得特定场景的性能优化,例如,DMA(Direct Memory Access)等。不同层次的抽象是相互关联的,我们可以通过 Socket 获取 Channel,反之亦然。

通道与流的不同之处在于:

  • 流是单向的 - 一个流只能单纯的负责读或写。
  • 通道是双向的 - 一个通道可以同时用于读写。

通道包括以下类型:

  • FileChannel:从文件中读写数据;
  • DatagramChannel:通过 UDP 读写网络中数据;
  • SocketChannel:通过 TCP 读写网络中数据;
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

Buffer(缓冲区)

NIO 与传统 I/O 不同,它是基于块(Block)的,它以块为基本单位处理数据。Buffer 是一块连续的内存块,是 NIO 读写数据的缓冲。Buffer 可以将文件一次性读入内存再做后续处理,而传统的方式是边读文件边处理数据。

Channel 读写的数据都必须先置于缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

BIO 和 NIO 已经很好地集成了,java.io.* 已经以 NIO 为基础重新实现了,所以现在它可以利用 NIO 的一些特性。例如,java.io.* 包中的一些类包含以块的形式读写数据的方法,这使得即使在面向流的系统中,处理速度也会更快。

缓冲区包括以下类型:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

缓冲区状态变量

  • capacity:最大容量;
  • position:当前已经读写的字节数;
  • limit:还可以读写的字节数。
  • mark:记录上一次 postion 的位置,默认是 0,算是一个便利性的考虑,往往不是必须
    的。

缓冲区状态变量的改变过程举例:

  1. 新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。
  2. 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 移动设置为 5,limit 保持不变。
  3. 在将缓冲区的数据写到输出通道之前,需要先调用 flip() 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。
  4. 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。
  5. 最后需要调用 clear() 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。

文件 NIO 示例

以下展示了使用 NIO 快速复制文件的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void fastCopy(String src, String dist) throws IOException {

/* 获得源文件的输入字节流 */
FileInputStream fin = new FileInputStream(src);

/* 获取输入字节流的文件通道 */
FileChannel fcin = fin.getChannel();

/* 获取目标文件的输出字节流 */
FileOutputStream fout = new FileOutputStream(dist);

/* 获取输出字节流的通道 */
FileChannel fcout = fout.getChannel();

/* 为缓冲区分配 1024 个字节 */
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

while (true) {

/* 从输入通道中读取数据到缓冲区中 */
int r = fcin.read(buffer);

/* read() 返回 -1 表示 EOF */
if (r == -1) {
break;
}

/* 切换读写 */
buffer.flip();

/* 把缓冲区的内容写入输出文件中 */
fcout.write(buffer);

/* 清空缓冲区 */
buffer.clear();
}
}

DirectBuffer

NIO 还提供了一个可以直接访问物理内存的类 DirectBuffer。普通的 Buffer 分配的是 JVM 堆内存,而 DirectBuffer 是直接分配物理内存。

数据要输出到外部设备,必须先从用户空间复制到内核空间,再复制到输出设备,而 DirectBuffer 则是直接将步骤简化为从内核空间复制到外部设备,减少了数据拷贝。

这里拓展一点,由于 DirectBuffer 申请的是非 JVM 的物理内存,所以创建和销毁的代价很高。DirectBuffer 申请的内存并不是直接由 JVM 负责垃圾回收,但在 DirectBuffer 包装类被回收时,会通过 Java 引用机制来释放该内存块。

Selector(选择器)

NIO 常常被叫做非阻塞 IO,主要是因为 NIO 在网络通信中的非阻塞特性被广泛使用。

Selector 是 Java NIO 编程的基础。用于检查一个或多个 NIO Channel 的状态是否处于可读、可写。

NIO 实现了 IO 多路复用中的 Reactor 模型

  • 一个线程(Thread)使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件(accpetread,如果某个 Channel 上面发生监听事件,这个 Channel 就处于就绪状态,然后进行 I/O 操作。

  • 通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行。

  • 因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件具有更好的性能。

需要注意的是,只有 SocketChannel 才能配置为非阻塞,而 FileChannel 不能,因为 FileChannel 配置非阻塞也没有意义。

目前操作系统的 I/O 多路复用机制都使用了 epoll,相比传统的 select 机制,epoll 没有最大连接句柄 1024 的限制。所以 Selector 在理论上可以轮询成千上万的客户端。

创建选择器

1
Selector selector = Selector.open();

将通道注册到选择器上

1
2
3
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

通道必须配置为非阻塞模式,否则使用选择器就没有任何意义了,因为如果通道在某个事件上被阻塞,那么服务器就不能响应其它事件,必须等待这个事件处理完毕才能去处理其它事件,显然这和选择器的作用背道而驰。

在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

它们在 SelectionKey 的定义如下:

1
2
3
4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

可以看出每个事件可以被当成一个位域,从而组成事件集整数。例如:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

监听事件

1
int num = selector.select();

使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。

获取到达的事件

1
2
3
4
5
6
7
8
9
10
11
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// ...
} else if (key.isReadable()) {
// ...
}
keyIterator.remove();
}

事件循环

因为一次 select() 调用不能处理完所有的事件,并且服务器端有可能需要一直监听事件,因此服务器端处理事件的代码一般会放在一个死循环内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
while (true) {
int num = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
// ...
} else if (key.isReadable()) {
// ...
}
keyIterator.remove();
}
}

套接字 NIO 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NIOServer {

public static void main(String[] args) throws IOException {

Selector selector = Selector.open();

ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

ServerSocket serverSocket = ssChannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
serverSocket.bind(address);

while (true) {

selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();

while (keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if (key.isAcceptable()) {

ServerSocketChannel ssChannel1 = (ServerSocketChannel) key.channel();

// 服务器会为每个新连接创建一个 SocketChannel
SocketChannel sChannel = ssChannel1.accept();
sChannel.configureBlocking(false);

// 这个新连接主要用于从客户端读取数据
sChannel.register(selector, SelectionKey.OP_READ);

} else if (key.isReadable()) {

SocketChannel sChannel = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sChannel));
sChannel.close();
}

keyIterator.remove();
}
}
}

private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {

ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder data = new StringBuilder();

while (true) {

buffer.clear();
int n = sChannel.read(buffer);
if (n == -1) {
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}
1
2
3
4
5
6
7
8
9
10
public class NIOClient {

public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream out = socket.getOutputStream();
String s = "hello world";
out.write(s.getBytes());
out.close();
}
}

内存映射文件

内存映射文件 I/O 是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的 I/O 快得多。

向内存映射文件写入可能是危险的,只是改变数组的单个元素这样的简单操作,就可能会直接修改磁盘上的文件。修改数据与将数据保存到磁盘是没有分开的。

下面代码行将文件的前 1024 个字节映射到内存中,map() 方法返回一个 MappedByteBuffer,它是 ByteBuffer 的子类。因此,可以像使用其他任何 ByteBuffer 一样使用新映射的缓冲区,操作系统会在需要时负责执行映射。

1
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, 1024);

NIO vs. BIO

BIO 与 NIO 最重要的区别是数据打包和传输的方式:BIO 以流的方式处理数据,而 NIO 以块的方式处理数据

  • 面向流的 BIO 一次处理一个字节数据:一个输入流产生一个字节数据,一个输出流消费一个字节数据。为流式数据创建过滤器非常容易,链接几个过滤器,以便每个过滤器只负责复杂处理机制的一部分。不利的一面是,面向流的 I/O 通常相当慢。
  • 面向块的 NIO 一次处理一个数据块,按块处理数据比按流处理数据要快得多。但是面向块的 NIO 缺少一些面向流的 BIO 所具有的优雅性和简单性。

BIO 模式:

img

NIO 模式:

img

参考资料

Java 网络编程

关键词:SocketServerSocketDatagramPacketDatagramSocket

网络编程是指编写运行在多个设备(计算机)的程序,这些设备都通过网络连接起来。

java.net 包中提供了低层次的网络通信细节。你可以直接使用这些类和接口,来专注于解决问题,而不用关注通信细节。

java.net 包中提供了两种常见的网络协议的支持:

  • TCP - TCP 是传输控制协议的缩写,它保障了两个应用程序之间的可靠通信。通常用于互联网协议,被称 TCP/ IP。
  • UDP - UDP 是用户数据报协议的缩写,一个无连接的协议。提供了应用程序之间要发送的数据的数据包。

Socket 和 ServerSocket

套接字(Socket)使用 TCP 提供了两台计算机之间的通信机制。 客户端程序创建一个套接字,并尝试连接服务器的套接字。

Java 通过 Socket 和 ServerSocket 实现对 TCP 的支持。Java 中的 Socket 通信可以简单理解为:**java.net.Socket 代表客户端,java.net.ServerSocket 代表服务端**,二者可以建立连接,然后通信。

以下为 Socket 通信中建立建立的基本流程:

  • 服务器实例化一个 ServerSocket 对象,表示服务器绑定一个端口。
  • 服务器调用 ServerSocketaccept() 方法,该方法将一直等待,直到客户端连接到服务器的绑定端口(即监听端口)。
  • 服务器监听端口时,客户端实例化一个 Socket 对象,指定服务器名称和端口号来请求连接。
  • Socket 类的构造函数试图将客户端连接到指定的服务器和端口号。如果通信被建立,则在客户端创建一个 Socket 对象能够与服务器进行通信。
  • 在服务器端,accept() 方法返回服务器上一个新的 Socket 引用,该引用连接到客户端的 Socket

连接建立后,可以通过使用 IO 流进行通信。每一个 Socket 都有一个输出流和一个输入流。客户端的输出流连接到服务器端的输入流,而客户端的输入流连接到服务器端的输出流。

TCP 是一个双向的通信协议,因此数据可以通过两个数据流在同一时间发送,以下是一些类提供的一套完整的有用的方法来实现 sockets。

ServerSocket

服务器程序通过使用 java.net.ServerSocket 类以获取一个端口,并且监听客户端请求连接此端口的请求。

ServerSocket 构造方法

ServerSocket 有多个构造方法:

方法 描述
ServerSocket() 创建非绑定服务器套接字。
ServerSocket(int port) 创建绑定到特定端口的服务器套接字。
ServerSocket(int port, int backlog) 利用指定的 backlog 创建服务器套接字并将其绑定到指定的本地端口号。
ServerSocket(int port, int backlog, InetAddress address) 使用指定的端口、监听 backlog 和要绑定到的本地 IP 地址创建服务器。

ServerSocket 常用方法

创建非绑定服务器套接字。 如果 ServerSocket 构造方法没有抛出异常,就意味着你的应用程序已经成功绑定到指定的端口,并且侦听客户端请求。

这里有一些 ServerSocket 类的常用方法:

方法 描述
int getLocalPort() 返回此套接字在其上侦听的端口。
Socket accept() 监听并接受到此套接字的连接。
void setSoTimeout(int timeout) 通过指定超时值启用/禁用 SO_TIMEOUT,以毫秒为单位。
void bind(SocketAddress host, int backlog) ServerSocket 绑定到特定地址(IP 地址和端口号)。

Socket

java.net.Socket 类代表客户端和服务器都用来互相沟通的套接字。客户端要获取一个 Socket 对象通过实例化 ,而 服务器获得一个 Socket 对象则通过 accept() 方法 a 的返回值。

Socket 构造方法

Socket 类有 5 个构造方法:

方法 描述
Socket() 通过系统默认类型的 SocketImpl 创建未连接套接字
Socket(String host, int port) 创建一个流套接字并将其连接到指定主机上的指定端口号。
Socket(InetAddress host, int port) 创建一个流套接字并将其连接到指定 IP 地址的指定端口号。
Socket(String host, int port, InetAddress localAddress, int localPort) 创建一个套接字并将其连接到指定远程主机上的指定远程端口。
Socket(InetAddress host, int port, InetAddress localAddress, int localPort) 创建一个套接字并将其连接到指定远程地址上的指定远程端口。

当 Socket 构造方法返回,并没有简单的实例化了一个 Socket 对象,它实际上会尝试连接到指定的服务器和端口。

Socket 常用方法

下面列出了一些感兴趣的方法,注意客户端和服务器端都有一个 Socket 对象,所以无论客户端还是服务端都能够调用这些方法。

方法 描述
void connect(SocketAddress host, int timeout) 将此套接字连接到服务器,并指定一个超时值。
InetAddress getInetAddress() 返回套接字连接的地址。
int getPort() 返回此套接字连接到的远程端口。
int getLocalPort() 返回此套接字绑定到的本地端口。
SocketAddress getRemoteSocketAddress() 返回此套接字连接的端点的地址,如果未连接则返回 null。
InputStream getInputStream() 返回此套接字的输入流。
OutputStream getOutputStream() 返回此套接字的输出流。
void close() 关闭此套接字。

Socket 通信示例

服务端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HelloServer {

public static void main(String[] args) throws Exception {
// Socket 服务端
// 服务器在8888端口上监听
ServerSocket server = new ServerSocket(8888);
System.out.println("服务器运行中,等待客户端连接。");
// 得到连接,程序进入到阻塞状态
Socket client = server.accept();
// 打印流输出最方便
PrintStream out = new PrintStream(client.getOutputStream());
// 向客户端输出信息
out.println("hello world");
client.close();
server.close();
System.out.println("服务器已向客户端发送消息,退出。");
}

}

客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HelloClient {

public static void main(String[] args) throws Exception {
// Socket 客户端
Socket client = new Socket("localhost", 8888);
InputStreamReader inputStreamReader = new InputStreamReader(client.getInputStream());
// 一次性接收完成
BufferedReader buf = new BufferedReader(inputStreamReader);
String str = buf.readLine();
buf.close();
client.close();
System.out.println("客户端接收到服务器消息:" + str + ",退出");
}

}

DatagramSocket 和 DatagramPacket

Java 通过 DatagramSocketDatagramPacket 实现对 UDP 协议的支持。

  • DatagramPacket:数据包类
  • DatagramSocket:通信类

UDP 服务端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UDPServer {

public static void main(String[] args) throws Exception { // 所有异常抛出
String str = "hello World!!!";
DatagramSocket ds = new DatagramSocket(3000); // 服务端在3000端口上等待服务器发送信息
DatagramPacket dp =
new DatagramPacket(str.getBytes(), str.length(), InetAddress.getByName("localhost"), 9000); // 所有的信息使用buf保存
System.out.println("发送信息。");
ds.send(dp); // 发送信息出去
ds.close();
}

}

UDP 客户端示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class UDPClient {

public static void main(String[] args) throws Exception { // 所有异常抛出
byte[] buf = new byte[1024]; // 开辟空间,以接收数据
DatagramSocket ds = new DatagramSocket(9000); // 客户端在9000端口上等待服务器发送信息
DatagramPacket dp = new DatagramPacket(buf, 1024); // 所有的信息使用buf保存
ds.receive(dp); // 接收数据
String str = new String(dp.getData(), 0, dp.getLength()) + "from " + dp.getAddress().getHostAddress() + ":"
+ dp.getPort();
System.out.println(str); // 输出内容
}

}

InetAddress

InetAddress 类表示互联网协议(IP)地址。

没有公有的构造函数,只能通过静态方法来创建实例。

1
2
InetAddress.getByName(String host);
InetAddress.getByAddress(byte[] address);

URL

可以直接从 URL 中读取字节流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {

URL url = new URL("http://www.baidu.com");

/* 字节流 */
InputStream is = url.openStream();

/* 字符流 */
InputStreamReader isr = new InputStreamReader(is, "utf-8");

/* 提供缓存功能 */
BufferedReader br = new BufferedReader(isr);

String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}

br.close();
}

参考资料

监控工具对比

监控工具发展史

img

监控工具比对

特性对比

img

生态对比

img

技术选型

  • Zipkin 欠缺 APM 报表能力,不推荐。
  • 企业级,推荐 CAT
  • 关注和试点 SkyWalking。

用好调用链监控,需要订制化、自研能力。

参考资料

CAT、Zipkin 和 SkyWalking 该如何选型?