Dunwu Blog

大道至简,知易行难

Spring AOP

AOP 概念

什么是 AOP

AOP(Aspect-Oriented Programming,即 面向切面编程)与 OOP( Object-Oriented Programming,面向对象编程) 相辅相成,提供了与 OOP 不同的抽象软件结构的视角。

在 OOP 中,我们以类(class)作为我们的基本单元,而 AOP 中的基本单元是 Aspect(切面)

术语

Aspect(切面)

aspectpointcountadvice 组成, 它既包含了横切逻辑的定义, 也包括了连接点的定义. Spring AOP 就是负责实施切面的框架, 它将切面所定义的横切逻辑织入到切面所指定的连接点中.
AOP 的工作重心在于如何将增强织入目标对象的连接点上, 这里包含两个工作:

  1. 如何通过 pointcut 和 advice 定位到特定的 joinpoint 上
  2. 如何在 advice 中编写切面代码.

可以简单地认为, 使用 @Aspect 注解的类就是切面.

advice(增强)

由 aspect 添加到特定的 join point(即满足 point cut 规则的 join point) 的一段代码.
许多 AOP 框架, 包括 Spring AOP, 会将 advice 模拟为一个拦截器(interceptor), 并且在 join point 上维护多个 advice, 进行层层拦截.
例如 HTTP 鉴权的实现, 我们可以为每个使用 RequestMapping 标注的方法织入 advice, 当 HTTP 请求到来时, 首先进入到 advice 代码中, 在这里我们可以分析这个 HTTP 请求是否有相应的权限, 如果有, 则执行 Controller, 如果没有, 则抛出异常. 这里的 advice 就扮演着鉴权拦截器的角色了.

连接点(join point)

a point during the execution of a program, such as the execution of a method or the handling of an exception. In Spring AOP, a join point always represents a method execution.

程序运行中的一些时间点, 例如一个方法的执行, 或者是一个异常的处理.
在 Spring AOP 中, join point 总是方法的执行点, 即只有方法连接点.

切点(point cut)

匹配 join point 的谓词(a predicate that matches join points).
Advice 是和特定的 point cut 关联的, 并且在 point cut 相匹配的 join point 中执行.
在 Spring 中, 所有的方法都可以认为是 joinpoint, 但是我们并不希望在所有的方法上都添加 Advice, 而 pointcut 的作用就是提供一组规则(使用 AspectJ pointcut expression language 来描述) 来匹配joinpoint, 给满足规则的 joinpoint 添加 Advice.

关于 join point 和 point cut 的区别

在 Spring AOP 中, 所有的方法执行都是 join point. 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 因此 join point 和 point cut 本质上就是两个不同纬度上的东西.
advice 是在 join point 上执行的, 而 point cut 规定了哪些 join point 可以执行哪些 advice

introduction

为一个类型添加额外的方法或字段. Spring AOP 允许我们为 目标对象 引入新的接口(和对应的实现). 例如我们可以使用 introduction 来为一个 bean 实现 IsModified 接口, 并以此来简化 caching 的实现.

目标对象(Target)

织入 advice 的目标对象. 目标对象也被称为 advised object.
因为 Spring AOP 使用运行时代理的方式来实现 aspect, 因此 adviced object 总是一个代理对象(proxied object)
注意, adviced object 指的不是原来的类, 而是织入 advice 后所产生的代理类.

AOP proxy

一个类被 AOP 织入 advice, 就会产生一个结果类, 它是融合了原类和增强逻辑的代理类.
在 Spring AOP 中, 一个 AOP 代理是一个 JDK 动态代理对象或 CGLIB 代理对象.

织入(Weaving)

将 aspect 和其他对象连接起来, 并创建 adviced object 的过程.
根据不同的实现技术, AOP 织入有三种方式:

  • 编译器织入, 这要求有特殊的 Java 编译器.
  • 类装载期织入, 这需要有特殊的类装载器.
  • 动态代理织入, 在运行期为目标类添加增强(Advice)生成子类的方式.
    Spring 采用动态代理织入, 而 AspectJ 采用编译器织入和类装载期织入.

advice 的类型

  • before advice, 在 join point 前被执行的 advice. 虽然 before advice 是在 join point 前被执行, 但是它并不能够阻止 join point 的执行, 除非发生了异常(即我们在 before advice 代码中, 不能人为地决定是否继续执行 join point 中的代码)
  • after return advice, 在一个 join point 正常返回后执行的 advice
  • after throwing advice, 当一个 join point 抛出异常后执行的 advice
  • after(final) advice, 无论一个 join point 是正常退出还是发生了异常, 都会被执行的 advice.
  • around advice, 在 join point 前和 joint point 退出后都执行的 advice. 这个是最常用的 advice.

关于 AOP Proxy

Spring AOP 默认使用标准的 JDK 动态代理(dynamic proxy)技术来实现 AOP 代理, 通过它, 我们可以为任意的接口实现代理.
如果需要为一个类实现代理, 那么可以使用 CGLIB 代理. 当一个业务逻辑对象没有实现接口时, 那么 Spring AOP 就默认使用 CGLIB 来作为 AOP 代理了. 即如果我们需要为一个方法织入 advice, 但是这个方法不是一个接口所提供的方法, 则此时 Spring AOP 会使用 CGLIB 来实现动态代理. 鉴于此, Spring AOP 建议基于接口编程, 对接口进行 AOP 而不是类.

彻底理解 aspect, join point, point cut, advice

看完了上面的理论部分知识, 我相信还是会有不少朋友感觉到 AOP 的概念还是很模糊, 对 AOP 中的各种概念理解的还不是很透彻. 其实这很正常, 因为 AOP 中的概念是在是太多了, 我当时也是花了老大劲才梳理清楚的.
下面我以一个简单的例子来比喻一下 AOP 中 aspect, jointpoint, pointcut 与 advice 之间的关系.

让我们来假设一下, 从前有一个叫爪哇的小县城, 在一个月黑风高的晚上, 这个县城中发生了命案. 作案的凶手十分狡猾, 现场没有留下什么有价值的线索. 不过万幸的是, 刚从隔壁回来的老王恰好在这时候无意中发现了凶手行凶的过程, 但是由于天色已晚, 加上凶手蒙着面, 老王并没有看清凶手的面目, 只知道凶手是个男性, 身高约七尺五寸. 爪哇县的县令根据老王的描述, 对守门的士兵下命令说: 凡是发现有身高七尺五寸的男性, 都要抓过来审问. 士兵当然不敢违背县令的命令, 只好把进出城的所有符合条件的人都抓了起来.

来让我们看一下上面的一个小故事和 AOP 到底有什么对应关系.
首先我们知道, 在 Spring AOP 中 join point 指代的是所有方法的执行点, 而 point cut 是一个描述信息, 它修饰的是 join point, 通过 point cut, 我们就可以确定哪些 join point 可以被织入 Advice. 对应到我们在上面举的例子, 我们可以做一个简单的类比, join point 就相当于 爪哇的小县城里的百姓, point cut 就相当于 老王所做的指控, 即凶手是个男性, 身高约七尺五寸, 而 advice 则是施加在符合老王所描述的嫌疑人的动作: 抓过来审问.
为什么可以这样类比呢?

  • join point –> 爪哇的小县城里的百姓: 因为根据定义, join point 是所有可能被织入 advice 的候选的点, 在 Spring AOP 中, 则可以认为所有方法执行点都是 join point. 而在我们上面的例子中, 命案发生在小县城中, 按理说在此县城中的所有人都有可能是嫌疑人.
  • point cut –> 男性, 身高约七尺五寸: 我们知道, 所有的方法(joint point) 都可以织入 advice, 但是我们并不希望在所有方法上都织入 advice, 而 pointcut 的作用就是提供一组规则来匹配 joinpoint, 给满足规则的 joinpoint 添加 advice. 同理, 对于县令来说, 他再昏庸, 也知道不能把县城中的所有百姓都抓起来审问, 而是根据凶手是个男性, 身高约七尺五寸, 把符合条件的人抓起来. 在这里凶手是个男性, 身高约七尺五寸 就是一个修饰谓语, 它限定了凶手的范围, 满足此修饰规则的百姓都是嫌疑人, 都需要抓起来审问.
  • advice –> 抓过来审问, advice 是一个动作, 即一段 Java 代码, 这段 Java 代码是作用于 point cut 所限定的那些 join point 上的. 同理, 对比到我们的例子中, 抓过来审问 这个动作就是对作用于那些满足 男性, 身高约七尺五寸爪哇的小县城里的百姓.
  • aspect: aspect 是 point cut 与 advice 的组合, 因此在这里我们就可以类比: “根据老王的线索, 凡是发现有身高七尺五寸的男性, 都要抓过来审问” 这一整个动作可以被认为是一个 aspect.

或则我们也可以从语法的角度来简单类比一下. 我们在学英语时, 经常会接触什么 定语, 被动句 之类的概念, 那么可以做一个不严谨的类比, 即 joinpoint 可以认为是一个 宾语, 而 pointcut 则可以类比为修饰 joinpoint 的定语, 那么整个 aspect 就可以描述为: 满足 pointcut 规则的 joinpoint 会被添加相应的 advice 操作.

@AspectJ 支持

@AspectJ 是一种使用 Java 注解来实现 AOP 的编码风格。

@AspectJ 风格的 AOP 是 AspectJ Project 在 AspectJ 5 中引入的, 并且 Spring 也支持 @AspectJ 的 AOP 风格.

使能 @AspectJ 支持

@AspectJ 可以以 XML 的方式或以注解的方式来使能, 并且不论以哪种方式使能@ASpectJ, 我们都必须保证 aspectjweaver.jar 在 classpath 中.

使用 Java Configuration 方式使能@AspectJ

1
2
3
4
@Configuration
@EnableAspectJAutoProxy
public class AppConfig {
}

使用 XML 方式使能@AspectJ

1
<aop:aspectj-autoproxy/>

定义 aspect(切面)

当使用注解 @Aspect 标注一个 Bean 后, 那么 Spring 框架会自动收集这些 Bean, 并添加到 Spring AOP 中, 例如:

1
2
3
4
@Component
@Aspect
public class MyTest {
}

注意, 仅仅使用@Aspect 注解, 并不能将一个 Java 对象转换为 Bean, 因此我们还需要使用类似 @Component 之类的注解.
注意, 如果一个 类被@Aspect 标注, 则这个类就不能是其他 aspect 的 **advised object** 了, 因为使用 @Aspect 后, 这个类就会被排除在 auto-proxying 机制之外.

声明 pointcut

一个 pointcut 的声明由两部分组成:

  • 一个方法签名, 包括方法名和相关参数
  • 一个 pointcut 表达式, 用来指定哪些方法执行是我们感兴趣的(即因此可以织入 advice).

在@AspectJ 风格的 AOP 中, 我们使用一个方法来描述 pointcut, 即:

1
2
@Pointcut("execution(* com.xys.service.UserService.*(..))") // 切点表达式
private void dataAccessOperation() {} // 切点前面

这个方法必须无返回值.
这个方法本身就是 pointcut signature, pointcut 表达式使用@Pointcut 注解指定.
上面我们简单地定义了一个 pointcut, 这个 pointcut 所描述的是: 匹配所有在包 com.xys.service.UserService 下的所有方法的执行.

切点标志符(designator)

AspectJ5 的切点表达式由标志符(designator)和操作参数组成. 如 “execution(* greetTo(..))” 的切点表达式, **execution** 就是 标志符, 而圆括号里的 *****greetTo(..) 就是操作参数

execution

匹配 join point 的执行, 例如 “execution(* hello(..))” 表示匹配所有目标类中的 hello() 方法. 这个是最基本的 pointcut 标志符.

within

匹配特定包下的所有 join point, 例如 within(com.xys.*) 表示 com.xys 包中的所有连接点, 即包中的所有类的所有方法. 而within(com.xys.service.*Service) 表示在 com.xys.service 包中所有以 Service 结尾的类的所有的连接点.

this 与 target

this 的作用是匹配一个 bean, 这个 bean(Spring AOP proxy) 是一个给定类型的实例(instance of). 而 target 匹配的是一个目标对象(target object, 即需要织入 advice 的原始的类), 此对象是一个给定类型的实例(instance of).

bean

匹配 bean 名字为指定值的 bean 下的所有方法, 例如:

1
2
bean(*Service) // 匹配名字后缀为 Service 的 bean 下的所有方法
bean(myService) // 匹配名字为 myService 的 bean 下的所有方法
args

匹配参数满足要求的的方法.
例如:

1
2
3
4
5
6
7
8
@Pointcut("within(com.xys.demo2.*)")
public void pointcut2() {
}

@Before(value = "pointcut2() && args(name)")
public void doSomething(String name) {
logger.info("---page: {}---", name);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class NormalService {
private Logger logger = LoggerFactory.getLogger(getClass());

public void someMethod() {
logger.info("---NormalService: someMethod invoked---");
}

public String test(String name) {
logger.info("---NormalService: test invoked---");
return "服务一切正常";
}
}

当 NormalService.test 执行时, 则 advice doSomething 就会执行, test 方法的参数 name 就会传递到 doSomething 中.

常用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配只有一个参数 name 的方法
@Before(value = "aspectMethod() && args(name)")
public void doSomething(String name) {
}

// 匹配第一个参数为 name 的方法
@Before(value = "aspectMethod() && args(name, ..)")
public void doSomething(String name) {
}

// 匹配第二个参数为 name 的方法
Before(value = "aspectMethod() && args(*, name, ..)")
public void doSomething(String name) {
}
@annotation

匹配由指定注解所标注的方法, 例如:

1
2
3
@Pointcut("@annotation(com.xys.demo1.AuthChecker)")
public void pointcut() {
}

则匹配由注解 AuthChecker 所标注的方法.

常见的切点表达式

匹配方法签名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 匹配指定包中的所有的方法
execution(* com.xys.service.*(..))

// 匹配当前包中的指定类的所有方法
execution(* UserService.*(..))

// 匹配指定包中的所有 public 方法
execution(public * com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且返回值是 int 类型的方法
execution(public int com.xys.service.*(..))

// 匹配指定包中的所有 public 方法, 并且第一个参数是 String, 返回值是 int 类型的方法
execution(public int com.xys.service.*(String name, ..))
匹配类型签名
1
2
3
4
5
6
7
8
9
10
11
12
// 匹配指定包中的所有的方法, 但不包括子包
within(com.xys.service.*)

// 匹配指定包中的所有的方法, 包括子包
within(com.xys.service..*)

// 匹配当前包中的指定类中的方法
within(UserService)


// 匹配一个接口的所有实现类中的实现的方法
within(UserDao+)
匹配 Bean 名字
1
2
// 匹配以指定名字结尾的 Bean 中的所有方法
bean(*Service)
切点表达式组合
1
2
3
4
5
// 匹配以 Service 或 ServiceImpl 结尾的 bean
bean(*Service || *ServiceImpl)

// 匹配名字以 Service 结尾, 并且在包 com.xys.service 中的 bean
bean(*Service) && within(com.xys.service.*)

声明 advice

advice 是和一个 pointcut 表达式关联在一起的, 并且会在匹配的 join point 的方法执行的前/后/周围 运行. pointcut 表达式可以是简单的一个 pointcut 名字的引用, 或者是完整的 pointcut 表达式.
下面我们以几个简单的 advice 为例子, 来看一下一个 advice 是如何声明的.

Before advice

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @author xiongyongshun
* @version 1.0
* @created 16/9/9 13:13
*/
@Component
@Aspect
public class BeforeAspectTest {
// 定义一个 Pointcut, 使用 切点表达式函数 来描述对哪些 Join point 使用 advise.
@Pointcut("execution(* com.xys.service.UserService.*(..))")
public void dataAccessOperation() {
}
}
1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Before("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public void doBeforeAccessCheck(JoinPoint joinPoint) {
System.out.println("*****Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

这里, @Before 引用了一个 pointcut, 即 “com.xys.aspect.PointcutDefine.dataAccessOperation()” 是一个 pointcut 的名字.
如果我们在 advice 在内置 pointcut, 则可以:

1
2
3
4
5
6
7
8
9
@Component
@Aspect
public class AdviseDefine {
// 将 pointcut 和 advice 同时定义
@Before("within(com.xys.service..*)")
public void doAccessCheck(JoinPoint joinPoint) {
System.out.println("*****doAccessCheck, Before advise, method: " + joinPoint.getSignature().toShortString() + " *****");
}
}

around advice

around advice 比较特别, 它可以在一个方法的之前之前和之后添加不同的操作, 并且甚至可以决定何时, 如何, 是否调用匹配到的方法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Aspect
public class AdviseDefine {
// 定义 advise
@Around("com.xys.aspect.PointcutDefine.dataAccessOperation()")
public Object doAroundAccessCheck(ProceedingJoinPoint pjp) throws Throwable {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 开始
Object retVal = pjp.proceed();
stopWatch.stop();
// 结束
System.out.println("invoke method: " + pjp.getSignature().getName() + ", elapsed time: " + stopWatch.getTotalTimeMillis());
return retVal;
}
}

around advice 和前面的 before advice 差不多, 只是我们把注解 @Before 改为了 @Around 了.

参考资料

HDFS 应用

HDFSHadoop Distributed File System 的缩写,即 Hadoop 的分布式文件系统。

HDFS 是一种用于存储具有流数据访问模式的超大文件的文件系统,它运行在廉价的机器集群上。

HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供 PB 级的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

HDFS 命令

显示当前目录结构

1
2
3
4
5
6
# 显示当前目录结构
hdfs dfs -ls <path>
# 递归显示当前目录结构
hdfs dfs -ls -R <path>
# 显示根目录下内容
hdfs dfs -ls /

创建目录

1
2
3
4
# 创建目录
hdfs dfs -mkdir <path>
# 递归创建目录
hdfs dfs -mkdir -p <path>

删除操作

1
2
3
4
# 删除文件
hdfs dfs -rm <path>
# 递归删除目录和文件
hdfs dfs -rm -R <path>

导入文件到 HDFS

1
2
3
# 二选一执行即可
hdfs dfs -put [localsrc] [dst]
hdfs dfs -copyFromLocal [localsrc] [dst]

从 HDFS 导出文件

1
2
3
# 二选一执行即可
hdfs dfs -get [dst] [localsrc]
hdfs dfs -copyToLocal [dst] [localsrc]

查看文件内容

1
2
3
# 二选一执行即可
hdfs dfs -text <path>
hdfs dfs -cat <path>

显示文件的最后一千字节

1
2
3
hdfs dfs -tail <path>
# 和 Linux 下一样,会持续监听文件内容变化 并显示文件的最后一千字节
hdfs dfs -tail -f <path>

拷贝文件

1
hdfs dfs -cp [src] [dst]

移动文件

1
hdfs dfs -mv [src] [dst]

统计当前目录下各文件大小

  • 默认单位字节
  • -s : 显示所有文件大小总和,
  • -h : 将以更友好的方式显示文件大小(例如 64.0m 而不是 67108864)
1
hdfs dfs -du <path>

合并下载多个文件

  • -nl 在每个文件的末尾添加换行符(LF)
  • -skip-empty-file 跳过空文件
1
2
3
hdfs dfs -getmerge
# 示例 将 HDFS 上的 hbase-policy.xml 和 hbase-site.xml 文件合并后下载到本地的/usr/test.xml
hdfs dfs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml

统计文件系统的可用空间信息

1
hdfs dfs -df -h /

更改文件复制因子

1
hdfs dfs -setrep [-R] [-w] <numReplicas> <path>
  • 更改文件的复制因子。如果 path 是目录,则更改其下所有文件的复制因子
  • -w : 请求命令是否等待复制完成
1
2
# 示例
hdfs dfs -setrep -w 3 /user/hadoop/dir1

权限控制

1
2
3
4
5
6
7
# 权限控制和 Linux 上使用方式一致
# 变更文件或目录的所属群组。 用户必须是文件的所有者或超级用户。
hdfs dfs -chgrp [-R] GROUP URI [URI ...]
# 修改文件或目录的访问权限 用户必须是文件的所有者或超级用户。
hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
# 修改文件的拥有者 用户必须是超级用户。
hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

文件检测

1
hdfs dfs -test - [defsz]  URI

可选选项:

  • -d:如果路径是目录,返回 0。
  • -e:如果路径存在,则返回 0。
  • -f:如果路径是文件,则返回 0。
  • -s:如果路径不为空,则返回 0。
  • -r:如果路径存在且授予读权限,则返回 0。
  • -w:如果路径存在且授予写入权限,则返回 0。
  • -z:如果文件长度为零,则返回 0。
1
2
# 示例
hdfs dfs -test -e filename

HDFS API

简介

想要使用 HDFS API,需要导入依赖 hadoop-client。如果是 CDH 版本的 Hadoop,还需要额外指明其仓库地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.heibaiying</groupId>
<artifactId>hdfs-java-api</artifactId>
<version>1.0</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.6.0-cdh5.15.2</hadoop.version>
</properties>

<!---配置 CDH 仓库地址-->
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>

<dependencies>
<!--Hadoop-client-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>

FileSystem

FileSystem 是所有 HDFS 操作的主入口。由于之后的每个单元测试都需要用到它,这里使用 @Before 注解进行标注。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;

@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的 Hadoop, 所以副本系数设置为 1, 默认值为 3
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}

@After
public void destroy() {
fileSystem = null;
}

FileSystem 官方 Java API 文档

创建目录

支持递归创建目录:

1
2
3
4
@Test
public void mkDir() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}

创建指定权限的目录

FsPermission(FsAction u, FsAction g, FsAction o) 的三个参数分别对应:创建者权限,同组其他用户权限,其他用户权限,权限值定义在 FsAction 枚举类中。

1
2
3
4
5
@Test
public void mkDirWithPermission() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}

创建文件,并写入内容

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void create() throws Exception {
// 如果文件存在,默认会覆盖,可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
true, 4096);
out.write("hello hadoop!".getBytes());
out.write("hello spark!".getBytes());
out.write("hello flink!".getBytes());
// 强制将缓冲区中内容刷出
out.flush();
out.close();
}

判断文件是否存在

1
2
3
4
5
@Test
public void exist() throws Exception {
boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
System.out.println(exists);
}

查看文件内容

查看小文本文件的内容,直接转换成字符串后输出:

1
2
3
4
5
6
@Test
public void readToString() throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);
}

inputStreamToString 是一个自定义方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 把输入流转换为指定编码的字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("\n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

文件重命名

1
2
3
4
5
6
7
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfs-api/test/a.txt");
Path newPath = new Path("/hdfs-api/test/b.txt");
boolean result = fileSystem.rename(oldPath, newPath);
System.out.println(result);
}

删除目录或文件

1
2
3
4
5
6
7
8
9
public void delete() throws Exception {
/*
* 第二个参数代表是否递归删除
* + 如果 path 是一个目录且递归删除为 true, 则删除该目录及其中所有文件;
* + 如果 path 是一个目录但递归删除为 false, 则会则抛出异常。
*/
boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
System.out.println(result);
}

上传文件到 HDFS

1
2
3
4
5
6
7
@Test
public void copyFromLocalFile() throws Exception {
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:\\BigData-Notes\\notes\\installation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);
}

上传大文件并显示上传进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void copyFromLocalBigFile() throws Exception {

File file = new File("D:\\kafka.tgz");
final float fileSize = file.length();
InputStream in = new BufferedInputStream(new FileInputStream(file));

FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
new Progressable() {
long fileCount = 0;

public void progress() {
fileCount++;
// progress 方法每上传大约 64KB 的数据后就会被调用一次
System.out.println("上传进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
}
});

IOUtils.copyBytes(in, out, 4096);

}

从 HDFS 上下载文件

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:\\app\\");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是 true, 即删除;
* 最后一个参数表示是否将 RawLocalFileSystem 用作本地文件系统;
* RawLocalFileSystem 默认为 false, 通常情况下可以不设置,
* 但如果你在执行时候抛出 NullPointerException 异常,则代表你的文件系统与程序可能存在不兼容的情况 (window 下常见),
* 此时可以将 RawLocalFileSystem 设置为 true
*/
fileSystem.copyToLocalFile(false, src, dst, true);
}

查看指定目录下所有文件的信息

1
2
3
4
5
6
7
public void listFiles() throws Exception {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
for (FileStatus fileStatus : statuses) {
//fileStatus 的 toString 方法被重写过,直接打印可以看到所有信息
System.out.println(fileStatus.toString());
}
}

FileStatus 中包含了文件的基本信息,比如文件路径,是否是文件夹,修改时间,访问时间,所有者,所属组,文件权限,是否是符号链接等,输出内容示例如下:

1
2
3
4
5
6
7
8
9
10
FileStatus{
path=hdfs://192.168.0.106:8020/hdfs-api/test;
isDirectory=true;
modification_time=1556680796191;
access_time=0;
owner=root;
group=supergroup;
permission=rwxr-xr-x;
isSymlink=false
}

递归查看指定目录下所有文件的信息

1
2
3
4
5
6
7
@Test
public void listFilesRecursive() throws Exception {
RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/hbase"), true);
while (files.hasNext()) {
System.out.println(files.next());
}
}

和上面输出类似,只是多了文本大小,副本系数,块大小信息。

1
2
3
4
5
6
7
8
9
10
11
LocatedFileStatus{
path=hdfs://192.168.0.106:8020/hbase/hbase.version;
isDirectory=false;
length=7;
replication=1;
blocksize=134217728;
modification_time=1554129052916;
access_time=1554902661455;
owner=root; group=supergroup;
permission=rw-r--r--;
isSymlink=false}

查看文件的块信息

1
2
3
4
5
6
7
8
9
@Test
public void getFileBlockLocations() throws Exception {

FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blocks) {
System.out.println(block);
}
}

块输出信息有三个值,分别是文件的起始偏移量 (offset),文件大小 (length),块所在的主机名 (hosts)。

1
0,57028557,hadoop001

这里我上传的文件只有 57M(小于 128M),且程序中设置了副本系数为 1,所有只有一个块信息。

参考资料

Hadoop 面试

简介

【初级】简介一下大数据技术生态?

:::details 要点

  • 数据采集:Flume、Sqoop、Logstash、Filebeat
  • 分布式文件存储:Hadoop HDFS
  • NoSql
    • 文档数据库:Mongodb
    • 列式数据库:HBase
    • 搜索引擎:Solr、Elasticsearch
  • 分布式计算
    • 批处理:Hadoop MapReduce
    • 流处理:Storm、Kafka
    • 混合处理:Spark、Flink
  • 查询分析:Hive、Spark SQL、Flink SQL、Pig、Phoenix
  • 集群资源管理:Hadoop YARN
  • 分布式协调:Zookeeper
  • 任务调度:Azkaban、Oozie
  • 集群部署和监控:Ambari、Cloudera Manager

:::

【初级】什么是 HDFS?

:::details 要点

HDFSHadoop Distributed File System 的缩写,即 Hadoop 的分布式文件系统。

HDFS 是一种用于存储具有流数据访问模式的超大文件的文件系统,它运行在廉价的机器集群上。

HDFS 的设计目标是管理数以千计的服务器、数以万计的磁盘,将这么大规模的服务器计算资源当作一个单一的存储系统进行管理,对应用程序提供 PB 级的存储容量,让应用程序像使用普通文件系统一样存储大规模的文件数据。

HDFS 是在一个大规模分布式服务器集群上,对数据分片后进行并行读写及冗余存储。因为 HDFS 可以部署在一个比较大的服务器集群上,集群中所有服务器的磁盘都可供 HDFS 使用,所以整个 HDFS 的存储空间可以达到 PB 级容量。

HDFS 的常见使用场景:

  • 大数据存储 - HDFS 能够存储 PB 级甚至 EB 级的数据,适合存储日志数据、传感器数据、社交媒体数据等。
  • 批处理与分析 - HDFS 是 Hadoop MapReduce 的默认存储系统,MapReduce 作业直接从 HDFS 读取数据并进行分布式计算。
  • 数据仓库 - HDFS 可以作为数据仓库的底层存储,支持大规模数据的离线分析。
  • 数据冷备 - 由于 HDFS 的高可靠和低成本,适用于存储访问频率较低的冷数据(如历史数据、备份数据)。
  • 多媒体数据存储:HDFS 适合存储大规模的多媒体数据(如图像、视频、音频)。

:::

【初级】HDFS 有什么特性(优缺点)?

:::details 要点

HDFS 的优点

  • 高可用 - 冗余数据副本,支持自动故障恢复;支持 NameNode HA、安全模式
  • 易扩展 - 能够处理 10K 节点的规模;处理数据达到 GB、TB、甚至 PB 级别的数据;能够处理百万规模以上的文件数量,数量相当之大。
  • 批处理 - 流式数据访问;数据位置暴露给计算框架
  • 低成本 - HDFS 构建在廉价的商用机器上。

HDFS 的缺点

  • 不适合低延迟数据访问 - 适合高吞吐率的场景,就是在某一时间内写入大量的数据。但是它在低延时的情况下是不行的,比如毫秒级以内读取数据,它是很难做到的。
  • 不适合大量小文件存储
    • 存储大量小文件(这里的小文件是指小于 HDFS 系统的 Block 大小的文件(默认 64M)) 的话,它会占用 NameNode 大量的内存来存储文件、目录和块信息。这样是不可取的,因为 NameNode 的内存总是有限的。
    • 磁盘寻道时间超过读取时间
  • 不支持并发写入 - 一个文件同时只能有一个写入者
  • 不支持文件随机修改 - 仅支持追加写入

:::

【初级】什么是 YARN?

:::details 要点

YARN(Yet Another Resource Negotiator,即另一种资源调度器) 是 Hadoop 的集群资源管理系统。YARN 负责资源管理和调度。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。

在 Hadoop 1.x 版本,MapReduce 中的 jobTracker 担负了太多的责任,接收任务是它,资源调度是它,监控 TaskTracker 运行情况还是它。这样实现的好处是比较简单,但相对的,就容易出现一些问题,比如常见的单点故障问题。要解决这些问题,只能将 jobTracker 进行拆分,将其中部分功能拆解出来。沿着这个思路,于是有了 YARN。

:::

【初级】什么是 MapReduce?

:::details 要点

MapReduce 是 Hadoop 项目中的分布式计算框架。它降低了分布式计算的门槛,可以让用户轻松编写程序,让其以可靠、容错的方式运行在大型集群上并行处理海量数据(TB 级)。

MapReduce 的设计思路是:

  • 分而治之,并行计算
  • 移动计算,而非移动数据

MapReduce 作业通过将输入的数据集拆分为独立的块,这些块由 map 任务以并行的方式处理。框架对 map 的输出进行排序,然后将其输入到 reduce 任务中。作业的输入和输出都存储在文件系统中。该框架负责调度任务、监控任务并重新执行失败的任务。

通常,计算节点和存储节点是相同的,即 MapReduce 框架和 HDFS 在同一组节点上运行。此配置允许框架在已存在数据的节点上有效地调度任务,从而在整个集群中实现非常高的聚合带宽。

MapReduce 框架由一个主 ResourceManager、每个集群节点一个工作程序 NodeManager 和每个应用程序的 MRAppMaster (YARN 组件) 组成。

MapReduce 框架仅对 <key、value> 对进行作,也就是说,框架将作业的输入视为一组 <key、value> 对,并生成一组 <key、value> 对作为作业的输出,可以想象是不同的类型。类必须可由框架序列化,因此需要实现 Writable 接口。此外,关键类必须实现 WritableComparable 接口,以便于按框架进行排序。

MapReduce 作业的 Input 和 Output 类型:

1
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

MapReduce 适用场景:

  • 数据统计,如:网站的 PV、UV 统计
  • 搜索引擎构建索引
  • 海量数据查询

MapReduce 不适用场景:

  • OLAP - 要求毫秒或秒级返回结果
  • 流计算 - 流计算的输入数据集是动态的,而 MapReduce 是静态的
  • DAG 计算
    • 多个作业存在依赖关系,后一个的输入是前一个的输出,构成有向无环图 DAG
    • 每个 MapReduce 作业的输出结果都会落盘,造成大量磁盘 IO,导致性能非常低下

:::

【初级】MapReduce 有什么特性(优缺点)?

:::details 要点

MapReduce 有以下特性:

  • 移动计算,而非移动数据
  • 良好的扩展性:计算能力随着节点数增加,近似线性递增
  • 高可用
  • 适合海量数据的离线批处理
  • 降低了分布式编程的门槛

:::

架构

【高级】HDFS 的架构是怎样设计的?

:::details 要点

HDFS 架构有以下几个核心要点:

  • 主从架构
  • 按块分区
  • 数据副本
  • 命名空间

(1)HDFS 主从架构

HDFS 采用 master/slave 架构。一个 HDFS 集群是由一个 NameNode 和一定数目的 DataNode 组成。NameNode 是一个中心服务器,负责管理文件系统的命名空间 (namespace) 以及客户端对文件的访问。集群中的 DataNode 一般是一个节点一个,负责管理它所在节点上的存储。HDFS 暴露了文件系统的命名空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组 DataNode 上。NameNode 执行文件系统的命名空间操作,比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体 DataNode 节点的映射。DataNode 负责处理文件系统客户端的读写请求。在 NameNode 的统一调度下进行数据块的创建、删除和复制。

  • NameNode - 负责 HDFS 集群的管理、协调。具体来说,主要有以下职责:
    • 管理命名空间 - 执行有关命名空间的操作,例如打开,关闭、重命名文件和目录等。
    • 管理元数据 - 维护文件的位置、所有者、权限、数据块等。
    • 管理 Block 副本策略 - 默认 3 个副本
    • 客户端读写请求寻址
  • DataNode:负责提供来自文件系统客户端的读写请求,执行块的创建,删除等操作。具体来说,主要有以下职责:
    • 执行客户端发送的读写操作
    • 存储 Block 和数据校验和
    • 定期向 NameNode 发送心跳以续活
    • 定期向 NameNode 上报 Block 信息

(2)按块分区

HDFS 将文件数据分割成若干数据块(Block),每个 DataNode 存储一部分数据块,这样文件就分布存储在整个 HDFS 服务器集群中。

将大文件分割成 Block 的主要目的是为了优化网络传输和数据处理的效率。这种分割机制使得文件的不同部分可以并行处理,大大提高了数据处理的速度。

HDFS Block 有以下要点:

  • Block 是 HDFS 最小存储单元
  • 文件写入 HDFS 会被切分成若干个 Block
  • Block 大小固定,默认为 128MB,可通过 dfs.blocksize 参数修改
  • 若一个 Block 的大小小于设定值,不会占用整个块空间
  • 默认情况下每个 Block 有 3 个副本

这实际上是典型的分布式分区思想,使得 HDFS 具备了扩展能力。

(3)数据复制

HDFS 被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有副本。每个文件的数据块大小和副本系数都是可配置的。应用程序可以指定某个文件的副本数目。副本系数可以在文件创建的时候指定,也可以在之后改变。HDFS 中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告 (Blockreport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

(4)命名空间

HDFS 支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统命名空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。HDFS 不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是 HDFS 架构并不妨碍实现这些特性。

NameNode 负责维护文件系统的命名空间,任何对文件系统命名空间或属性的修改都将被 NameNode 记录下来。应用程序可以设置 HDFS 保存的文件的副本数目。文件副本的数目称为文件的副本系数,这个信息也是由 NameNode 保存的。

:::

【中级】HDFS 使用 NameNode 的好处 ?

:::details 要点

HDFS 使用 NameNode 的好处主要体现在以下几个方面:

  • 中心化的元数据管理 - NameNode 在 HDFS 中负责存储整个文件系统的元数据,包括文件和目录的结构、每个文件的数据块信息及其在 DataNode 上的位置等。这种中心化的管理,使得文件系统的组织和管理变得更加简洁高效,并且可以确保整个文件系统的一致性。
  • 易扩展 - 由于实际的数据存储在 DataNode 上,而 NameNode 只存储元数据,这样的架构设计使得 HDFS 可以轻松扩展到处理 PB 级别甚至更大规模的数据集。
  • 快速的文件访问:用户或应用程序在访问文件时,首先与 NameNode 交互以获得数据块的位置信息,然后直接从 DataNode 读取数据。这种方式可以快速定位数据,提高文件访问的效率。
  • 容错和恢复机制:NameNode 可以监控 DataNode 的状态,实现系统的容错。在 DataNode 发生故障时,NameNode 可以指导其它 DataNode 复制丢失的数据块,保证数据的可靠性。
  • 简化数据管理:NameNode 的存在简化了数据的管理和维护。例如,在进行数据备份、系统升级或扩展时,管理员只需要关注 NameNode 上的元数据,而不是每个节点上存储的实际数据。

然而,由于 NameNode 是中心节点,它也成为了系统的一个潜在瓶颈和单点故障。因此,HDFS 后来引入了主备 NameNode 机制来保证 NameNode 自身的可用性。

:::

【中级】HDFS 使用 Block 的好处 ?

:::details 要点

HDFS 采用文件分块(Block)进行存储管理,主要是基于以下几个原因:

  • 提高可靠性和容错性 - 通过将文件分成多个块,并在不同的 DataNode 上存储这些块的副本,HDFS 可以提高数据的可靠性。即使某些 DataNode 出现故障,其他节点上的副本仍然可以用于数据恢复。
  • 提高数据处理效率:在处理大规模数据集时,将大文件分割成小块可以提高数据处理的效率。这样,可以并行地在多个节点上处理不同的块,从而加速数据处理和分析。
  • 提高网络传输效率:分块存储还有利于网络传输。当处理或传输一个大文件的部分数据时,只需处理或传输相关的几个块,而不是整个文件,这减少了网络传输负担。
  • 易于扩展:分块机制使得 HDFS 易于扩展。可以简单地通过增加更多的 DataNode 来扩大存储容量和处理能力,而不需要对现有的数据块进行任何修改。
  • 负载均衡:分块存储还有助于在集群中实现负载均衡。不同的数据块可以分布在不同的节点上,从而均衡各个节点的存储和处理负载。

:::

【中级】NameNode 与 SecondaryNameNode 的区别与联系 ?

:::details 要点

NameNode 和 SecondaryNameNode 的区别

  • NameNode 是 HDFS 的主要节点,负责管理文件系统的命名空间。它维护着整个文件系统的目录和文件结构,以及所有文件的元数据,包括文件的数据块(block)信息、数据块的位置等。
  • SecondaryNameNode 是 NameNode 的辅助节点。
    • SecondaryNameNode 不是 NameNode 的备份,不能在 NameNode 故障时接管其功能。
    • HDFS 在运行过程中,所有的事务(如文件创建、删除等)都会首先记录在 NameNode 的内存和 EditLog 中。SecondaryNameNode 定期从 NameNode 获取这些日志文件,与文件系统的命名空间镜像(FsImage)合并,然后把新的 FsImage 送回给 NameNode,以帮助减少 NameNode 的内存压力。

NameNode 和 SecondaryNameNode 的联系

  • 共同目标:二者共同目的是维护 HDFS 的稳定和高效运作。NameNode 作为核心,负责实时的元数据管理;而 SecondaryNameNode 辅助 NameNode,通过定期处理 FsImage 和 EditLog,减轻 NameNode 的负担。
  • 数据交互:SecondaryNameNode 的工作依赖于与 NameNode 的交互,从 NameNode 获取元数据的状态和编辑日志。

:::

【中级】什么是 FsImage 和 EditLog?

:::details 要点

HDFS 中,FsImageEditLog是两个关键的文件,用于存储和管理文件系统的元数据。它们的主要区别如下:

FsImage(文件系统镜像)

  • 内容FsImage包含 HDFS 元数据的完整快照,例如文件系统的目录树、文件和目录的属性等。
  • 静态性:它是在特定时间点上的静态快照。一旦创建,除非进行新的快照操作,否则内容不会改变。
  • 使用场景:在 NameNode 启动时使用,用于加载文件系统的最初状态。此外,在进行系统备份时也会生成新的FsImage
  • 更新频率:不是实时更新的。通常在系统进行 checkpoint 操作时才会更新。

EditLog(编辑日志)

  • 内容EditLog记录了自上一个FsImage快照以来所有对文件系统所做的增量更改。这些更改包括文件和目录的创建、删除、重命名等操作。
  • 动态性:它是一个动态更新的日志文件。每次对文件系统进行更改时,这个更改就会记录在EditLog中。
  • 使用场景:用于记录所有的文件系统更改操作。在 NameNode 重启时,FsImage将与EditLog结合使用,以重建文件系统的最新状态。
  • 更新频率:实时更新。每次对文件系统的更改都会迅速反映在EditLog中。

结合使用

在 HDFS 中,FsImageEditLog一起工作,以确保文件系统的元数据既能够被可靠地存储,又能够反映最新的更改。定期进行 checkpoint 操作(由 Secondary NameNode 或 Standby NameNode 执行)会将EditLog中的更改应用到FsImage中,创建一个新的、更新的快照。这样可以保证在系统重启或恢复时,可以快速加载最新的文件系统状态。

:::

【中级】YARN 有哪些核心组件?

:::details 要点

YARN Architecture

YARN 有以下核心组件:

  • ResourceManager - ResourceManager 是管理资源和安排在 YARN 上运行的中央调度器。整个系统有且只有一个 ResourceManager,因为号令发布都来自一处,因此不存在调度不一致的情况(很多分布式系统都是通过经典的一主多从模式来解决一致性问题的)。它也包含了两个主要的子组件:
    • 定时调度器(Scheduler) - 从本质上来说,定时调度器就是一种策略,或者说一种算法。当 Client 提交一个任务的时候,它会根据所需要的资源以及当前集群的资源状况进行分配。注意,它只负责向应用程序分配资源,并不做监控以及应用程序的状态跟踪。
    • 应用管理器(ApplicationManager) - 应用管理器就是负责管理 Client 提交的应用。上面不是说到定时调度器(Scheduler)不对用户提交的程序监控嘛,其实啊,监控应用的工作正是由应用管理器(ApplicationManager)完成的。
  • NodeManager - NodeManager 是 ResourceManager 在每台机器的上代理,负责容器的管理,并监控他们的资源使用情况(cpu、内存、磁盘及网络等),以及向 ResourceManager/Scheduler 提供这些资源使用报告。
  • ApplicationMaster - 每当 Client 提交一个 Application 时候,就会新建一个 ApplicationMaster 。由这个 ApplicationMaster 去与 ResourceManager 申请容器资源,获得资源后会将要运行的程序发送到容器上启动,然后进行分布式计算。这么设计的原因在于,数据量大的时候,移动数据成本太高,耗时太久,改为移动计算代价较小。
  • Container - Container 是 YARN 对资源的抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当 AM 向 RM 申请资源时,RM 为 AM 返回的资源是用 Container 表示的。
    • YARN 会为每个任务分配一个 Container,该任务只能使用该 Container 中描述的资源。
    • ApplicationMaster 可在 Container 内运行任何类型的任务。例如,MapReduce ApplicationMaster 请求一个容器来启动 map 或 reduce 任务,而 Giraph ApplicationMaster 请求一个容器来运行 Giraph 任务。
    • 容器由 NodeManager 启动和管理,并被它所监控。
    • 容器被 ResourceManager 所调度。

:::

【中级】MapReduce 有哪些核心组件?

:::details 要点

MapReduce 有以下核心组件:

  • Job - Job 表示 MapReduce 作业配置。Job 通常用于指定 Mapper、combiner(如果有)、PartitionerReducerInputFormatOutputFormat 实现。
  • Mapper - Mapper 负责将输入键值对映射到一组中间键值对。转换的中间记录不需要与输入记录具有相同的类型。一个给定的输入键值对可能映射到零个或多个输出键值对。
  • Combiner - combinermap 运算后的可选操作,它实际上是一个本地化的 reduce 操作。它执行中间输出的本地聚合,这有助于减少从 Mapper 传输到 Reducer 的数据量。
  • Reducer - Reducer 将共享一个 key 的一组中间值归并为一个小的数值集。Reducer 有 3 个主要子阶段:shuffle,sort 和 reduce。
    • shuffle - Reducer 的输入就是 mapper 的排序输出。在这个阶段,框架通过 HTTP 获取所有 mapper 输出的相关分区。
    • sort - 在这个阶段中,框架将按照 key (因为不同 mapper 的输出中可能会有相同的 key) 对 Reducer 的输入进行分组。shuffle 和 sort 两个阶段是同时发生的。
    • reduce - 对按键分组的数据进行聚合统计。
  • Partitioner - Partitioner 负责控制 map 中间输出结果的键的分区。
    • 键(或者键的子集)用于产生分区,通常通过一个散列函数。
    • 分区总数与作业的 reduce 任务数是一样的。因此,它控制中间输出结果(也就是这条记录)的键发送给 m 个 reduce 任务中的哪一个来进行 reduce 操作。
  • InputFormat - InputFormat 描述 MapReduce 作业的输入规范。MapReduce 框架依赖作业的 InputFormat 来完成以下工作:
    • 确认作业的输入规范。
    • 把输入文件分割成多个逻辑的 InputSplit 实例,然后将每个实例分配给一个单独的 Mapper。InputSplit 表示要由单个 Mapper 处理的数据。
    • 提供 RecordReader 的实现。RecordReaderInputSplit 中读取 <key, value> 对,并提供给 Mapper 实现进行处理。
  • OutputFormat - OutputFormat 描述 MapReduce 作业的输出规范。MapReduce 框架依赖作业的 OutputFormat 来完成以下工作:
    • 确认作业的输出规范,例如检查输出路径是否已经存在。
    • 提供 RecordWriter 实现。RecordWriter 将输出 <key, value> 对到文件系统。

:::

工作流

【中级】HDFS 的写数据流程是怎样的?

:::details 要点

HDFS 写数据流程大致为:

  1. 按 Block 大小分割数据
  2. 通过 NameNode 寻址 DataNode
  3. 向 DataNode 写数据
  4. 完成后通知 NameNode

扩展:下面的漫画生动的展示了 HDFS 的写入流程,图片引用自博客:翻译经典 HDFS 原理讲解漫画

HDFS 写数据的源码流程:

  1. 客户端通过对 DistributedFileSystem 对象调用 create() 函数来新建文件
  2. 分布式文件系统对 NameNode 创建一个 RPC 调用,在文件系统的命名空间中新建一个文件
  3. NameNode 对新建文件进行检查无误后,分布式文件系统返回给客户端一个 FSDataOutputStream 对象,FSDataOutputStream 对象封装一个 DFSoutPutstream 对象,负责处理 NameNode 和 DataNode 之间的通信,客户端开始写入数据
  4. FSDataOutputStream数据分成一个一个的数据包,写入内部数据队列,DataStreamer 负责将数据包依次流式传输到由一组 NameNode 构成的管道中。
  5. DFSOutputStream 维护着确认队列来等待 DataNode 收到确认回执,收到管道中所有 DataNode 确认后,数据包从确认队列删除
  6. 客户端完成数据的写入,调用 close() 方法关闭传输通道。
  7. NameNode 确认完成

:::

【中级】HDFS 的读数据流程是怎样的?

:::details 要点

HDFS 读数据流程大致为:

  1. 客户端向 NameNode 查询文件信息
  2. NameNode 返回相关信息
    • 该文件的所有数据块
    • 每个数据块对应的 DataNode(按距离客户端的远近排序)
  3. 客户端向 DataNode 读数据

HDFS 读数据的源码流程:

  1. 客户端调用 FileSystem 对象的 open() 方法在 HDFS 中打开要读取的文件
  2. HDFS 通过使用 RPC(远程过程调用)来调用 NameNode,确定文件起始块(Block)的位置
  3. DistributedFileSystem 类返回一个支持文件定位的输入流 FSDataInputStream 对象,FSDataInputStream 对象接着封装 DFSInputStream 对象(存储着文件起始几个块的 DataNode 地址),客户端对这个输入流调用 read() 方法。
  4. DFSInputStream 连接距离最近的 DataNode,通过反复调用 read 方法,将数据从 DataNode 传输到客户端
  5. 到达块的末端时,DFSInputStream 关闭与该 DataNode 的连接,寻找下一个块的最佳 DataNode
  6. 客户端完成读取,对 FSDataInputStream 调用 close() 方法关闭连接

:::

【中级】MapReduce 是如何工作的?

:::details 要点

MapReduce 任务过程分为两个处理阶段:map 极端和 reduce 阶段。每阶段都以键值对作为输入和输出,其类型由程序员来选择。程序员还需要写两个函数:map 函数和 reduce 函数。

以词频统计为例,其工作流再细分一下,可以划分为以下阶段:

  1. input - 读取文本文件;
  2. splitting - 将文件按照行进行拆分,此时得到的 K1 行数,V1 表示对应行的文本内容;
  3. mapping - 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  4. shuffling - 由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2 就是 Mapping 中的 V2;
  5. reducing - 这里的案例是统计单词出现的总次数,所以 ReducingList(V2) 进行归约求和操作,最终输出。

MapReduce 编程模型中 splittingshuffing 操作都是由框架实现的,需要我们自己编程实现的只有 mappingreducing,这也就是 MapReduce 这个称呼的来源。

MapReduce 工作流

:::

【中级】YARN 是如何工作的?

:::details 要点

这张图简单地标明了提交一个程序所经历的流程,接下来我们来具体说说每一步的过程。

  1. Client 向 ResourceManager 申请运行一个 Application 进程,这里我们假设是一个 MapReduce 作业。
  2. ResourceManager 向 NodeManager 通信,为该 Application 进程分配第一个容器。并在这个容器中运行这个应用程序对应的 ApplicationMaster。
  3. ApplicationMaster 启动以后,对作业(也就是 Application) 进行拆分,拆分 task 出来,这些 task 可以运行在一个或多个容器中。然后向 ResourceManager 申请要运行程序的容器,并定时向 ResourceManager 发送心跳。
  4. 申请到容器后,ApplicationMaster 会去和容器对应的 NodeManager 通信,而后将作业分发到对应的 NodeManager 中的容器去运行,这里会将拆分后的 MapReduce 进行分发,对应容器中运行的可能是 Map 任务,也可能是 Reduce 任务。
  5. 容器中运行的任务会向 ApplicationMaster 发送心跳,汇报自身情况。当程序运行完成后, ApplicationMaster 再向 ResourceManager 注销并释放容器资源。

:::

复制

复制主要指通过网络在多台机器上保存相同数据的副本

复制数据,可能出于各种各样的原因:

  • 提高可用性 - 当部分组件出现位障,系统依然可以继续工作,系统依然可以继续工作。
  • 降低访问延迟 - 使数据在地理位置上更接近用户。
  • 提高读吞吐量 - 扩展至多台机器以同时提供数据访问服务。

所有分布式系统都需要支持复制。

【中级】HDFS 的副本机制是怎样的?

:::details 要点

基于块的副本

由于 Hadoop 被设计运行在廉价的机器上,这意味着硬件是不可靠的,为了保证容错性,HDFS 提供了副本机制。HDFS 将文件分解为若干 Block,Block 是 HDFS 最小存储单元,每个 Block 有多个副本。

HDFS 的默认副本数为 3,更多的副本意味着更高的数据安全性,但同时也会带来更高的额外开销(存储成本和带宽成本)。3 个副本是在保障数据可靠性和系统成本之间的一个较好的平衡点。

副本数可以通过以下方式修改:

  • 在 HDFS 的配置文件 hdfs-site.xml 中,有一个名为 dfs.replication 的属性,可以设置全局的默认副本数。修改这个值后,需要重启 HDFS 使配置生效。
  • 针对单个文件或目录修改副本数:如果只想改变某个特定文件或目录的副本数,而不影响整个系统的默认设置,可以使用 HDFS 的命令行工具。例如,使用命令hdfs dfs -setrep -w <副本数> <文件/目录路径> 来修改特定文件或目录的副本数。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告 (BlockReport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

副本分布策略

副本分布策略是 HDFS 可靠性和性能的关键。优化的副本存放策略是 HDFS 区分于其他大部分分布式文件系统的重要特性。HDFS 采用一种称为机架感知 (rack-aware) 的策略来改进数据的可靠性、可用性和网络带宽的利用率。大型 HDFS 实例一般运行在跨越多个机架的计算机组成的集群上,不同机架上的两台机器之间的通信需要经过交换机。在大多数情况下,同一个机架内的两台机器间的带宽会比不同机架的两台机器间的带宽大。

通过一个机架感知的过程,NameNode 可以确定每个 DataNode 所属的机架 id。一个简单但没有优化的策略就是将副本存放在不同的机架上。这样可以有效防止当整个机架失效时数据的丢失,并且允许读数据的时候充分利用多个机架的带宽。这种策略设置可以将副本均匀分布在集群中,有利于当组件失效情况下的负载均衡。但是,因为这种策略的一个写操作需要传输数据块到多个机架,这增加了写的代价。

HDFS 默认的副本数为 3,此时 HDFS 的副本分布策略是:

  • 副本 1 - 放在 Client 所在节点;对于远程 Client,系统会随机选择节点
  • 副本 2 - 放在不同机架的节点上
  • 副本 3 - 放在与第二个副本同一机架的不同节点上
  • 副本 N - 在满足以下条件的节点中随机选择
    • 每个节点只存储一份副本
    • 每个机架最多存储两份副本
  • 优选 - 同等条件下优先选择空闲节点。
    • 如果某个 DataNode 节点上的空闲空间低于特定的临界点,按照均衡策略系统就会自动地将数据从这个 DataNode 移动到其他空闲的 DataNode。

副本选择

为了降低整体的带宽消耗和读取延时,HDFS 会尽量让客户端程序读取离它最近的副本。如果在客户端程序的同一个机架上有一个副本,那么就读取该副本。如果一个 HDFS 集群跨越多个数据中心,那么客户端也将首先读本地数据中心的副本。

为了最大限度地减少带宽消耗和读取延迟,HDFS 在执行读取请求时,优先读取距离读取器最近的副本。如果在与读取器节点相同的机架上存在副本,则优先选择该副本。如果 HDFS 群集跨越多个数据中心,则优先选择本地数据中心上的副本。

:::

【中级】HDFS 如何保证数据一致性?

:::details 要点

HDFS 的数据一致性主要依赖以下机制来保证:

  • NameNode 的中心化管理 - NameNode 在 HDFS 中负责存储整个文件系统的元数据,包括文件和目录的结构、每个文件的数据块信息及其在 DataNode 上的位置等。这种中心化的管理,使得文件系统的组织和管理变得更加简洁高效,并且可以确保整个文件系统的一致性。
  • 数据块的复制(Replication) - HDFS 采用副本来保证数据的可靠性。一旦数据写入完成,副本就会分散存储在不同的 DataNodes 上。尽管这种方法不是强一致性模型,但通过足够数量的副本和及时的副本替换策略,HDFS 能够提供较高水平的数据一致性和可靠性。
  • 写入和复制的原子性保证 - 在 HDFS 中,文件一旦创建,其内容就不能被更新,只能被追加或重写。这种方式简化了并发控制,因为写操作在文件级别上是原子的。在复制数据块时,HDFS 保证原子性复制,即一个数据块的所有副本在任何时间点上都是相同的。如果复制过程中出现错误,那么不完整的副本会被删除,系统会重新尝试复制直到成功。
  • 客户端的一致性协议 - 客户端在与 HDFS 交互时,遵循特定的协议。例如,客户端在完成文件写入之后,需要向 NameNode 通知,以确保 NameNode 更新文件的元数据。这样可以保证 NameNode 的元数据与实际存储的数据保持一致。
  • 定期检查和错误恢复
    • 心跳和健康检查 - DataNodes 定期向 NameNode 发送心跳和 Block 健康状况报告。NameNode 利用这些信息来检查和维护系统的整体一致性。例如,如果某个 DataNode 失败,NameNode 会重新组织数据块的副本。
    • 校验 - HDFS 在存储和传输数据时,会计算数据的校验和。在读取数据时,会验证这些校验和,确保数据的完整性。

通过这些机制,HDFS 确保了系统中的数据在正常操作和故障情况下的一致性和可靠性。虽然 HDFS 不提供像传统数据库那样的强一致性保证,但它的设计和实现确保了在大规模数据处理场景中的有效性和健壮性。

:::

容错

【中级】HDFS 有哪些故障类型?如何检测故障?

:::details 要点

HDFS 常见故障及检测方法:

  • 节点故障
    • DataNode 每 3 秒向 NameNode 发送心跳
    • 超时未收到心跳,NameNode 判定 DataNode 宕机
  • 通信故障
    • 客户端请求 DataNode 会收到 ACK
  • 数据损坏
    • 磁盘介质在存储过程中受环境或者老化影响,其存储的数据可能会出现错乱。HDFS 的应对措施是,对于存储在 DataNode 上的数据块,计算并存储校验和(CheckSum)。在读取数据的时候,重新计算读取出来的数据的校验和,如果校验不正确就抛出异常,应用程序捕获异常后就到其他 DataNode 上读取备份数据。
    • 如果 DataNode 监测到本机的某块磁盘损坏,就将该块磁盘上存储的所有 BlockID 报告给 NameNode,NameNode 检查这些数据块还在哪些 DataNode 上有备份,通知相应的 DataNode 服务器将对应的数据块复制到其他服务器上,以保证数据块的备份数满足要求。

:::

【中级】HDFS 读写故障如何处理?

:::details 要点

写入故障处理

  • 写入数据通过数据包传输
  • DataNode 接收数据后,返回 ACK
  • 如果客户端没有收到 ACK,就判定 DataNode 宕机,跳过节点
  • 没有充分备份的数据块信息通知到 NameNode

读取故障处理

  • 读数据先要通过 NameNode 寻址该数据块的所有 DataNode
  • 如果某 DataNode 宕机,则读取其他节点

:::

【中级】DataNode 故障如何处理?

:::details 要点

DataNode 每 3 秒会向 NameNode 发送心跳消息,以证明自身正常工作。如果 DataNode 超时未发送心跳,NameNode 就会认为该 DataNode 已经宕机。

NameNode 会立即查找该 DataNode 上存储的数据块有哪些,以及这些数据块还存储在哪些其他 DataNode 上。

随后,NameNode 通知这些 DataNode 再复制一份数据块到其他 DataNode 上,保证 HDFS 存储的数据块副本数符合配置数。即使再出现服务器宕机,也不会丢失数据。

:::

【中级】NameNode 故障如何处理?

:::details 要点

NameNode 是整个 HDFS 的核心,记录着 HDFS 文件分配表信息,所有的文件路径和数据块存储信息都保存在 NameNode,如果 NameNode 故障,整个 HDFS 系统集群都无法使用。如果 NameNode 上记录的数据丢失,整个集群所有 DataNode 存储的数据也就没用了。

NameNode 通过主备架构实现故障转移。

  • Active NameNode - 是正在工作的 NameNode;
  • Standby NameNode - 是备份的 NameNode。

Active NameNode 宕机后,Standby NameNode 快速升级为新的 Active NameNode。Standby NameNode 周期性同步 edits 编辑日志,定期合并 FsImageedits 到本地磁盘。

注:Hadoop 3.0 允许配置多个 Standby NameNode。

元数据文件

  • edits(编辑日志文件) - 保存了自最新检查点(Checkpoint)之后的所有文件更新操作。
  • FsImage(元数据检查点镜像文件) - 保存了文件系统中所有的目录和文件信息,如:某个目录下有哪些子目录和文件,以及文件名、文件副本数、文件由哪些 Block 组成等。

Active NameNode 内存中有一份最新的元数据(= FsImage + edits)。

Standby NameNode 在检查点定期将内存中的元数据保存到 FsImage 文件中。

利用 QJM 实现元数据高可用

基于 Paxos 算法

QJM 机制(Quorum Journal Manager)

只要保证 Quorum(法定人数)数量的操作成功,就认为这是一次最终成功的操作

QJM 共享存储系统

  • 部署奇数(2N+1)个 JournalNode
  • JournalNode 负责存储 edits 编辑日志
  • 写 edits 的时候,只要超过半数(N+1)的 JournalNode 返回成功,就代表本次写入成功
  • 最多可容忍 N 个 JournalNode 宕机

利用 ZooKeeper 实现 Active 节点选举。

:::

【中级】HDFS 安全模式有什么作用?

:::details 要点

在启动过程中,NameNode 进入安全模式。在这个模式下,它会检查数据块的健康状况和副本数量。只有在足够数量的数据块可用时,NameNode 才会退出安全模式,开始正常的操作。

:::

HA

【高级】HDFS 如何实现高可用?

:::details 要点

HDFS 高可用架构如下:

HDFS 高可用架构主要由以下组件所构成:

  • Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务。
  • 主备切换控制器 ZKFailoverController:ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换,当然 NameNode 目前也支持不依赖于 Zookeeper 的手动主备切换。
  • Zookeeper 集群:为主备切换控制器提供主备选举支持。
  • 共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。主 NameNode 和 NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在确认元数据完全同步之后才能继续对外提供服务。
  • DataNode 节点:除了通过共享存储系统共享 HDFS 的元数据信息之外,主 NameNode 和备 NameNode 还需要共享 HDFS 的数据块和 DataNode 之间的映射关系。DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。

目前 Hadoop 支持使用 Quorum Journal Manager (QJM) 或 Network File System (NFS) 作为共享的存储系统,这里以 QJM 集群为例进行说明:Active NameNode 首先把 EditLog 提交到 JournalNode 集群,然后 Standby NameNode 再从 JournalNode 集群定时同步 EditLog,当 Active NameNode 宕机后, Standby NameNode 在确认元数据完全同步之后就可以对外提供服务。

需要说明的是向 JournalNode 集群写入 EditLog 是遵循 “过半写入则成功” 的策略,所以你至少要有 3 个 JournalNode 节点,当然你也可以继续增加节点数量,但是应该保证节点总数是奇数。同时如果有 2N+1 台 JournalNode,那么根据过半写的原则,最多可以容忍有 N 台 JournalNode 节点挂掉。

:::

【高级】NameNode 如何实现主备切换?

:::details 要点

NameNode 实现主备切换的流程下图所示:

工作流程说明:

  1. HealthMonitor 初始化完成之后会启动内部的线程来定时调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法,对 NameNode 的健康状态进行检测。
  2. HealthMonitor 如果检测到 NameNode 的健康状态发生变化,会回调 ZKFailoverController 注册的相应方法进行处理。
  3. 如果 ZKFailoverController 判断需要进行主备切换,会首先使用 ActiveStandbyElector 来进行自动的主备选举。
  4. ActiveStandbyElector 与 Zookeeper 进行交互完成自动的主备选举。
  5. ActiveStandbyElector 在主备选举完成后,会回调 ZKFailoverController 的相应方法来通知当前的 NameNode 成为主 NameNode 或备 NameNode。
  6. ZKFailoverController 调用对应 NameNode 的 HAServiceProtocol RPC 接口的方法将 NameNode 转换为 Active 状态或 Standby 状态。

主备选举过程:

NameNode 在选举成功后,会在 zk 上创建了一个 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点,而没有选举成功的备 NameNode 会监控这个节点,通过 Watcher 来监听这个节点的状态变化事件,ZKFC 的 ActiveStandbyElector 主要关注这个节点的 NodeDeleted 事件(这部分实现跟 Kafka 中 Controller 的选举一样)。

如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时, ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock,这样处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件。收到这个事件之后,会马上再次进入到创建 /hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点的流程,如果创建成功,这个本来处于 Standby 状态的 NameNode 就选举为主 NameNode 并随后开始切换为 Active 状态。

当然,如果是 Active 状态的 NameNode 所在的机器整个宕掉的话,那么根据 Zookeeper 的临时节点特性,/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换。

:::

【高级】如何应对 HDFS 脑裂问题?

:::details 要点

在实际中,NameNode 可能会出现这种情况,NameNode 在垃圾回收(GC)时,可能会在长时间内整个系统无响应,因此,也就无法向 zk 写入心跳信息,这样的话可能会导致临时节点掉线,备 NameNode 会切换到 Active 状态,这种情况,可能会导致整个集群会有同时有两个 NameNode,这就是脑裂问题。

脑裂问题的解决方案是隔离(Fencing),主要是在以下三处采用隔离措施:

  • 第三方共享存储:任一时刻,只有一个 NN 可以写入;
  • DataNode:需要保证只有一个 NN 发出与管理数据副本有关的删除命令;
  • Client:需要保证同一时刻只有一个 NN 能够对 Client 的请求发出正确的响应。

关于这个问题目前解决方案的实现如下:

  • ActiveStandbyElector 为了实现隔离,会在成功创建 Zookeeper 节点 hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock 从而成为 Active NameNode 之后,创建另外一个路径为 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 的持久节点,这个节点里面保存了这个 Active NameNode 的地址信息;
  • Active NameNode 的 ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候,会一起删除这个持久节点;
  • 但如果 ActiveStandbyElector 在异常的状态下 Zookeeper Session 关闭 (比如前述的 Zookeeper 假死),那么由于 /hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来,后面当另一个 NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行 fencing。

在进行隔离的时候,会执行以下的操作:

首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的 transitionToStandby 方法,看能不能把它转换为 Standby 状态; 如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施。

Hadoop 目前主要提供两种隔离措施,通常会选择第一种:sshfence:通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死; shellfence:执行一个用户自定义的 shell 脚本来将对应的进程隔离。 只有在成功地执行完成 fencing 之后,选主成功的 ActiveStandbyElector 才会回调 ZKFailoverController 的 becomeActive 方法将对应的 NameNode 转换为 Active 状态,开始对外提供服务。

NameNode 选举的实现机制与 Kafka 的 Controller 类似,那么 Kafka 是如何避免脑裂问题的呢?

Controller 给 Broker 发送的请求中,都会携带 controller epoch 信息,如果 broker 发现当前请求的 epoch 小于缓存中的值,那么就证明这是来自旧 Controller 的请求,就会决绝这个请求,正常情况下是没什么问题的; 但是异常情况下呢?如果 Broker 先收到异常 Controller 的请求进行处理呢?现在看 Kafka 在这一部分并没有适合的方案; 正常情况下,Kafka 新的 Controller 选举出来之后,Controller 会向全局所有 broker 发送一个 metadata 请求,这样全局所有 Broker 都可以知道当前最新的 controller epoch,但是并不能保证可以完全避免上面这个问题,还是有出现这个问题的几率的,只不过非常小,而且即使出现了由于 Kafka 的高可靠架构,影响也非常有限,至少从目前看,这个问题并不是严重的问题。

通过标识每次选举的版本号,并以最新版本选举结果为准,是分布式选举避免脑裂的常见做法。在其他分布式系统中,epoch 可能会被称为 term、version 等。

:::

【高级】YARN 如何实现高可用?

:::details 要点

YARN ResourceManager 的高可用与 HDFS NameNode 的高可用类似,但是 ResourceManager 不像 NameNode ,没有那么多的元数据信息需要维护,所以它的状态信息可以直接写到 Zookeeper 上,并依赖 Zookeeper 来进行主备选举。

:::

参考资料

Hive 常用 DDL 操作

Database

查看数据列表

1
show databases;

使用数据库

1
USE database_name;

新建数据库

语法:

1
2
3
4
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name   --DATABASE|SCHEMA 是等价的
[COMMENT database_comment] --数据库注释
[LOCATION hdfs_path] --存储在 HDFS 上的位置
[WITH DBPROPERTIES (property_name=property_value, ...)]; --指定额外属性

示例:

1
2
3
CREATE DATABASE IF NOT EXISTS hive_test
COMMENT 'hive database for test'
WITH DBPROPERTIES ('create'='heibaiying');

查看数据库信息

语法:

1
DESC DATABASE [EXTENDED] db_name; --EXTENDED 表示是否显示额外属性

示例:

1
DESC DATABASE  EXTENDED hive_test;

删除数据库

语法:

1
DROP (DATABASE|SCHEMA) [IF EXISTS] database_name [RESTRICT|CASCADE];
  • 默认行为是 RESTRICT,如果数据库中存在表则删除失败。
  • 要想删除库及其中的表,可以使用 CASCADE 级联删除。

示例:

1
DROP DATABASE IF EXISTS hive_test CASCADE;

创建表

建表语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name     --表名
[(col_name data_type [COMMENT col_comment],
... [constraint_specification])] --列名 列数据类型
[COMMENT table_comment] --表描述
[PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] --分区表分区规则
[
CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS
] --分桶表分桶规则
[SKEWED BY (col_name, col_name, ...) ON ((col_value, col_value, ...), (col_value, col_value, ...), ...)
[STORED AS DIRECTORIES]
] --指定倾斜列和值
[
[ROW FORMAT row_format]
[STORED AS file_format]
| STORED BY 'storage.handler.class.name' [WITH SERDEPROPERTIES (...)]
] -- 指定行分隔符、存储文件格式或采用自定义存储格式
[LOCATION hdfs_path] -- 指定表的存储位置
[TBLPROPERTIES (property_name=property_value, ...)] --指定表的属性
[AS select_statement]; --从查询结果创建表

内部表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

外部表

1
2
3
4
5
6
7
8
9
10
11
CREATE EXTERNAL TABLE emp_external(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_external';

使用 desc format emp_external 命令可以查看表的详细信息如下:

分区表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

分桶表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

倾斜表

通过指定一个或者多个列经常出现的值(严重偏斜),Hive 会自动将涉及到这些值的数据拆分为单独的文件。在查询时,如果涉及到倾斜值,它就直接从独立文件中获取数据,而不是扫描所有文件,这使得性能得到提升。

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_skewed(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
SKEWED BY (empno) ON (66,88,100) --指定 empno 的倾斜值 66,88,100
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_skewed';

临时表

临时表仅对当前 session 可见,临时表的数据将存储在用户的暂存目录中,并在会话结束后删除。如果临时表与永久表表名相同,则对该表名的任何引用都将解析为临时表,而不是永久表。临时表还具有以下两个限制:

  • 不支持分区列;
  • 不支持创建索引。
1
2
3
4
5
6
7
8
9
10
CREATE TEMPORARY TABLE emp_temp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

CTAS 创建表

支持从查询语句的结果创建表:

1
CREATE TABLE emp_copy AS SELECT * FROM emp WHERE deptno='20';

复制表结构

语法:

1
2
3
CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name  --创建表表名
LIKE existing_table_or_view_name --被复制表的表名
[LOCATION hdfs_path]; --存储位置

示例:

1
CREATE TEMPORARY EXTERNAL TABLE  IF NOT EXISTS  emp_co  LIKE emp

加载数据到表

加载数据到表中属于 DML 操作,这里为了方便大家测试,先简单介绍一下加载本地数据到表中:

1
2
-- 加载数据到 emp 表中
load data local inpath "/usr/file/emp.txt" into table emp;

其中 emp.txt 的内容如下,你可以直接复制使用,也可以到本仓库的resources 目录下载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
7369	SMITH	CLERK	7902	1980-12-17 00:00:00	800.00		20
7499 ALLEN SALESMAN 7698 1981-02-20 00:00:00 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-02-22 00:00:00 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-04-02 00:00:00 2975.00 20
7654 MARTIN SALESMAN 7698 1981-09-28 00:00:00 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-05-01 00:00:00 2850.00 30
7782 CLARK MANAGER 7839 1981-06-09 00:00:00 2450.00 10
7788 SCOTT ANALYST 7566 1987-04-19 00:00:00 1500.00 20
7839 KING PRESIDENT 1981-11-17 00:00:00 5000.00 10
7844 TURNER SALESMAN 7698 1981-09-08 00:00:00 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-05-23 00:00:00 1100.00 20
7900 JAMES CLERK 7698 1981-12-03 00:00:00 950.00 30
7902 FORD ANALYST 7566 1981-12-03 00:00:00 3000.00 20
7934 MILLER CLERK 7782 1982-01-23 00:00:00 1300.00 10

加载后可查询表中数据

修改表

重命名表

语法:

1
ALTER TABLE table_name RENAME TO new_table_name;

示例:

1
ALTER TABLE emp_temp RENAME TO new_emp; --把 emp_temp 表重命名为 new_emp

修改列

语法:

1
2
ALTER TABLE table_name [PARTITION partition_spec] CHANGE [COLUMN] col_old_name col_new_name column_type
[COMMENT col_comment] [FIRST|AFTER column_name] [CASCADE|RESTRICT];

示例:

1
2
3
4
5
6
7
8
-- 修改字段名和类型
ALTER TABLE emp_temp CHANGE empno empno_new INT;

-- 修改字段 sal 的名称 并将其放置到 empno 字段后
ALTER TABLE emp_temp CHANGE sal sal_new decimal(7,2) AFTER ename;

-- 为字段增加注释
ALTER TABLE emp_temp CHANGE mgr mgr_new INT COMMENT 'this is column mgr';

新增列

示例:

1
ALTER TABLE emp_temp ADD COLUMNS (address STRING COMMENT 'home address');

清空表/删除表

清空表 hive-ddl.md

语法:

1
2
-- 清空整个表或表指定分区中的数据
TRUNCATE TABLE table_name [PARTITION (partition_column = partition_col_value, ...)];
  • 目前只有内部表才能执行 TRUNCATE 操作,外部表执行时会抛出异常 Cannot truncate non-managed table XXXX

示例:

1
TRUNCATE TABLE emp_mgt_ptn PARTITION (deptno=20);

删除表

语法:

1
DROP TABLE [IF EXISTS] table_name [PURGE];
  • 内部表:不仅会删除表的元数据,同时会删除 HDFS 上的数据;
  • 外部表:只会删除表的元数据,不会删除 HDFS 上的数据;
  • 删除视图引用的表时,不会给出警告(但视图已经无效了,必须由用户删除或重新创建)。

其他命令

Describe

查看数据库:

1
DESCRIBE|Desc DATABASE [EXTENDED] db_name;  --EXTENDED 是否显示额外属性

查看表:

1
DESCRIBE|Desc [EXTENDED|FORMATTED] table_name --FORMATTED 以友好的展现方式查看表详情

Show

1. 查看数据库列表

1
2
3
4
5
-- 语法
SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards'];

-- 示例:
SHOW DATABASES like 'hive*';

LIKE 子句允许使用正则表达式进行过滤,但是 SHOW 语句当中的 LIKE 子句只支持 *(通配符)和 |(条件或)两个符号。例如 employeesemp *emp * | * ees,所有这些都将匹配名为 employees 的数据库。

2. 查看表的列表

1
2
3
4
5
-- 语法
SHOW TABLES [IN database_name] ['identifier_with_wildcards'];

-- 示例
SHOW TABLES IN default;

3. 查看视图列表

1
SHOW VIEWS [IN/FROM database_name] [LIKE 'pattern_with_wildcards'];   --仅支持 Hive 2.2.0

4. 查看表的分区列表

1
SHOW PARTITIONS table_name;

5. 查看表/视图的创建语句

1
SHOW CREATE TABLE ([db_name.]table_name|view_name);

参考资料

Hive 常用 DML 操作

加载文件数据到表

语法

1
2
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]
INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  • LOCAL 关键字代表从本地文件系统加载文件,省略则代表从 HDFS 上加载文件:
  • 从本地文件系统加载文件时, filepath 可以是绝对路径也可以是相对路径 (建议使用绝对路径);
  • 从 HDFS 加载文件时候,filepath 为文件完整的 URL 地址:如 hdfs://namenode:port/user/hive/project/ data1
  • filepath 可以是文件路径 (在这种情况下 Hive 会将文件移动到表中),也可以目录路径 (在这种情况下,Hive 会将该目录中的所有文件移动到表中);
  • 如果使用 OVERWRITE 关键字,则将删除目标表(或分区)的内容,使用新的数据填充;不使用此关键字,则数据以追加的方式加入;
  • 加载的目标可以是表或分区。如果是分区表,则必须指定加载数据的分区;
  • 加载文件的格式必须与建表时使用 STORED AS 指定的存储格式相同。

使用建议:

不论是本地路径还是 URL 都建议使用完整的。虽然可以使用不完整的 URL 地址,此时 Hive 将使用 hadoop 中的 fs.default.name 配置来推断地址,但是为避免不必要的错误,建议使用完整的本地路径或 URL 地址;

加载对象是分区表时建议显示指定分区。在 Hive 3.0 之后,内部将加载 (LOAD) 重写为 INSERT AS SELECT,此时如果不指定分区,INSERT AS SELECT 将假设最后一组列是分区列,如果该列不是表定义的分区,它将抛出错误。为避免错误,还是建议显示指定分区。

示例

新建分区表:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE emp_ptn(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

从 HDFS 上加载数据到分区表:

1
LOAD DATA  INPATH "hdfs://hadoop001:8020/mydir/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=20);

emp.txt 文件可在本仓库的 resources 目录中下载

加载后表中数据如下,分区列 deptno 全部赋值成 20:

查询结果插入到表

语法

1
2
3
4
5
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]]
select_statement1 FROM from_statement;

INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)]
select_statement1 FROM from_statement;
  • Hive 0.13.0 开始,建表时可以通过使用 TBLPROPERTIES(“immutable”=“true”)来创建不可变表 (immutable table) ,如果不可以变表中存在数据,则 INSERT INTO 失败。(注:INSERT OVERWRITE 的语句不受 immutable 属性的影响);

  • 可以对表或分区执行插入操作。如果表已分区,则必须通过指定所有分区列的值来指定表的特定分区;

  • 从 Hive 1.1.0 开始,TABLE 关键字是可选的;

  • 从 Hive 1.2.0 开始 ,可以采用 INSERT INTO tablename(z,x,c1) 指明插入列;

  • 可以将 SELECT 语句的查询结果插入多个表(或分区),称为多表插入。语法如下:

    1
    2
    3
    4
    5
    FROM from_statement
    INSERT OVERWRITE TABLE tablename1
    [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
    [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
    [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;

动态插入分区

1
2
3
4
5
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...)
select_statement FROM from_statement;

在向分区表插入数据时候,分区列名是必须的,但是列值是可选的。如果给出了分区列值,我们将其称为静态分区,否则它是动态分区。动态分区列必须在 SELECT 语句的列中最后指定,并且与它们在 PARTITION() 子句中出现的顺序相同。

注意:Hive 0.9.0 之前的版本动态分区插入是默认禁用的,而 0.9.0 之后的版本则默认启用。以下是动态分区的相关配置:

配置 默认值 说明
hive.exec.dynamic.partition true 需要设置为 true 才能启用动态分区插入
hive.exec.dynamic.partition.mode strict 在严格模式 (strict) 下,用户必须至少指定一个静态分区,以防用户意外覆盖所有分区,在非严格模式下,允许所有分区都是动态的
hive.exec.max.dynamic.partitions.pernode 100 允许在每个 mapper/reducer 节点中创建的最大动态分区数
hive.exec.max.dynamic.partitions 1000 允许总共创建的最大动态分区数
hive.exec.max.created.files 100000 作业中所有 mapper/reducer 创建的 HDFS 文件的最大数量
hive.error.on.empty.partition false 如果动态分区插入生成空结果,是否抛出异常

示例

(1)新建 emp 表,作为查询对象表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE emp(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

-- 加载数据到 emp 表中 这里直接从本地加载
load data local inpath "/usr/file/emp.txt" into table emp;

完成后 emp 表中数据如下:

(2)为清晰演示,先清空 emp_ptn 表中加载的数据:

1
TRUNCATE TABLE emp_ptn;

(3)静态分区演示:从 emp 表中查询部门编号为 20 的员工数据,并插入 emp_ptn 表中,语句如下:

1
2
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno=20)
SELECT empno,ename,job,mgr,hiredate,sal,comm FROM emp WHERE deptno=20;

完成后 emp_ptn 表中数据如下:

(4)接着演示动态分区:

1
2
3
4
5
6
-- 由于我们只有一个分区,且还是动态分区,所以需要关闭严格默认。因为在严格模式下,用户必须至少指定一个静态分区
set hive.exec.dynamic.partition.mode=nonstrict;

-- 动态分区 此时查询语句的最后一列为动态分区列,即 deptno
INSERT OVERWRITE TABLE emp_ptn PARTITION (deptno)
SELECT empno,ename,job,mgr,hiredate,sal,comm,deptno FROM emp WHERE deptno=30;

完成后 emp_ptn 表中数据如下:

使用 SQL 语句插入值

1
2
INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2] ...)]
VALUES ( value [, value ...] )
  • 使用时必须为表中的每个列都提供值。不支持只向部分列插入值(可以为缺省值的列提供空值来消除这个弊端);
  • 如果目标表表支持 ACID 及其事务管理器,则插入后自动提交;
  • 不支持支持复杂类型 (array, map, struct, union) 的插入。

更新和删除数据

语法

更新和删除的语法比较简单,和关系型数据库一致。需要注意的是这两个操作都只能在支持 ACID 的表,也就是事务表上才能执行。

1
2
3
4
5
-- 更新
UPDATE tablename SET column = value [, column = value ...] [WHERE expression]

--删除
DELETE FROM tablename [WHERE expression]

示例

1. 修改配置

首先需要更改 hive-site.xml,添加如下配置,开启事务支持,配置完成后需要重启 Hive 服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.in.test</name>
<value>true</value>
</property>

2. 创建测试表

创建用于测试的事务表,建表时候指定属性 transactional = true 则代表该表是事务表。需要注意的是,按照官方文档 的说明,目前 Hive 中的事务表有以下限制:

  • 必须是 buckets Table;
  • 仅支持 ORC 文件格式;
  • 不支持 LOAD DATA …语句。
1
2
3
4
5
6
CREATE TABLE emp_ts(
empno int,
ename String
)
CLUSTERED BY (empno) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true");

3. 插入测试数据

1
INSERT INTO TABLE emp_ts  VALUES (1,"ming"),(2,"hong");

插入数据依靠的是 MapReduce 作业,执行成功后数据如下:

4. 测试更新和删除

1
2
3
4
5
--更新数据
UPDATE emp_ts SET ename = "lan" WHERE empno=1;

--删除数据
DELETE FROM emp_ts WHERE empno=2;

更新和删除数据依靠的也是 MapReduce 作业,执行成功后数据如下:

查询结果写出到文件系统

语法

1
2
3
INSERT OVERWRITE [LOCAL] DIRECTORY directory1
[ROW FORMAT row_format] [STORED AS file_format]
SELECT ... FROM ...
  • OVERWRITE 关键字表示输出文件存在时,先删除后再重新写入;

  • 和 Load 语句一样,建议无论是本地路径还是 URL 地址都使用完整的;

  • 写入文件系统的数据被序列化为文本,其中列默认由^A 分隔,行由换行符分隔。如果列不是基本类型,则将其序列化为 JSON 格式。其中行分隔符不允许自定义,但列分隔符可以自定义,如下:

    1
    2
    3
    4
    5
    6
    7
    -- 定义列分隔符为'\t'
    insert overwrite local directory './test-04'
    row format delimited
    FIELDS TERMINATED BY '\t'
    COLLECTION ITEMS TERMINATED BY ','
    MAP KEYS TERMINATED BY ':'
    select * from src;

示例

这里我们将上面创建的 emp_ptn 表导出到本地文件系统,语句如下:

1
2
3
4
INSERT OVERWRITE LOCAL DIRECTORY '/usr/file/ouput'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
SELECT * FROM emp_ptn;

导出结果如下:

参考资料

Hive 数据查询详解

数据准备

为了演示查询操作,这里需要预先创建三张表,并加载测试数据。

数据文件 emp.txt 和 dept.txt 可以从本仓库的resources 目录下载。

员工表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 -- 建表语句
CREATE TABLE emp(
empno INT, -- 员工表编号
ename STRING, -- 员工姓名
job STRING, -- 职位类型
mgr INT,
hiredate TIMESTAMP, --雇佣日期
sal DECIMAL(7,2), --工资
comm DECIMAL(7,2),
deptno INT) --部门编号
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

--加载数据
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp;

部门表

1
2
3
4
5
6
7
8
9
10
-- 建表语句
CREATE TABLE dept(
deptno INT, --部门编号
dname STRING, --部门名称
loc STRING --部门所在的城市
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

--加载数据
LOAD DATA LOCAL INPATH "/usr/file/dept.txt" OVERWRITE INTO TABLE dept;

分区表

这里需要额外创建一张分区表,主要是为了演示分区查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE EXTERNAL TABLE emp_ptn(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";


--加载数据
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=20)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=30)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=40)
LOAD DATA LOCAL INPATH "/usr/file/emp.txt" OVERWRITE INTO TABLE emp_ptn PARTITION (deptno=50)

单表查询

SELECT

1
2
-- 查询表中全部数据
SELECT * FROM emp;

WHERE

1
2
-- 查询 10 号部门中员工编号大于 7782 的员工信息
SELECT * FROM emp WHERE empno > 7782 AND deptno = 10;

DISTINCT

Hive 支持使用 DISTINCT 关键字去重。

1
2
-- 查询所有工作类型
SELECT DISTINCT job FROM emp;

分区查询

分区查询 (Partition Based Queries),可以指定某个分区或者分区范围。

1
2
3
-- 查询分区表中部门编号在[20,40]之间的员工
SELECT emp_ptn.* FROM emp_ptn
WHERE emp_ptn.deptno >= 20 AND emp_ptn.deptno <= 40;

LIMIT

1
2
-- 查询薪资最高的 5 名员工
SELECT * FROM emp ORDER BY sal DESC LIMIT 5;

GROUP BY

Hive 支持使用 GROUP BY 进行分组聚合操作。

1
2
3
4
set hive.map.aggr=true;

-- 查询各个部门薪酬综合
SELECT deptno,SUM(sal) FROM emp GROUP BY deptno;

hive.map.aggr 控制程序如何进行聚合。默认值为 false。如果设置为 true,Hive 会在 map 阶段就执行一次聚合。这可以提高聚合效率,但需要消耗更多内存。

ORDER AND SORT

可以使用 ORDER BY 或者 Sort BY 对查询结果进行排序,排序字段可以是整型也可以是字符串:如果是整型,则按照大小排序;如果是字符串,则按照字典序排序。ORDER BY 和 SORT BY 的区别如下:

  • 使用 ORDER BY 时会有一个 Reducer 对全部查询结果进行排序,可以保证数据的全局有序性;
  • 使用 SORT BY 时只会在每个 Reducer 中进行排序,这可以保证每个 Reducer 的输出数据是有序的,但不能保证全局有序。

由于 ORDER BY 的时间可能很长,如果你设置了严格模式 (hive.mapred.mode = strict),则其后面必须再跟一个 limit 子句。

注 :hive.mapred.mode 默认值是 nonstrict ,也就是非严格模式。

1
2
-- 查询员工工资,结果按照部门升序,按照工资降序排列
SELECT empno, deptno, sal FROM emp ORDER BY deptno ASC, sal DESC;

HAVING

可以使用 HAVING 对分组数据进行过滤。

1
2
-- 查询工资总和大于 9000 的所有部门
SELECT deptno,SUM(sal) FROM emp GROUP BY deptno HAVING SUM(sal)>9000;

DISTRIBUTE BY

默认情况下,MapReduce 程序会对 Map 输出结果的 Key 值进行散列,并均匀分发到所有 Reducer 上。如果想要把具有相同 Key 值的数据分发到同一个 Reducer 进行处理,这就需要使用 DISTRIBUTE BY 字句。

需要注意的是,DISTRIBUTE BY 虽然能保证具有相同 Key 值的数据分发到同一个 Reducer,但是不能保证数据在 Reducer 上是有序的。情况如下:

把以下 5 个数据发送到两个 Reducer 上进行处理:

1
2
3
4
5
k1
k2
k4
k3
k1

Reducer1 得到如下乱序数据:

1
2
3
k1
k2
k1

Reducer2 得到数据如下:

1
2
k4
k3

如果想让 Reducer 上的数据时有序的,可以结合 SORT BY 使用 (示例如下),或者使用下面我们将要介绍的 CLUSTER BY。

1
2
-- 将数据按照部门分发到对应的 Reducer 上处理
SELECT empno, deptno, sal FROM emp DISTRIBUTE BY deptno SORT BY deptno ASC;

CLUSTER BY

如果 SORT BYDISTRIBUTE BY 指定的是相同字段,且 SORT BY 排序规则是 ASC,此时可以使用 CLUSTER BY 进行替换,同时 CLUSTER BY 可以保证数据在全局是有序的。

1
SELECT empno, deptno, sal FROM emp CLUSTER  BY deptno ;

多表联结查询

Hive 支持内连接,外连接,左外连接,右外连接,笛卡尔连接,这和传统数据库中的概念是一致的,可以参见下图。

需要特别强调:JOIN 语句的关联条件必须用 ON 指定,不能用 WHERE 指定,否则就会先做笛卡尔积,再过滤,这会导致你得不到预期的结果 (下面的演示会有说明)。

img

INNER JOIN

1
2
3
4
5
6
7
8
-- 查询员工编号为 7369 的员工的详细信息
SELECT e.*,d.* FROM
emp e JOIN dept d
ON e.deptno = d.deptno
WHERE empno=7369;

--如果是三表或者更多表连接,语法如下
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)

LEFT OUTER JOIN

LEFT OUTER JOIN 和 LEFT JOIN 是等价的。

1
2
3
4
-- 左连接
SELECT e.*,d.*
FROM emp e LEFT OUTER JOIN dept d
ON e.deptno = d.deptno;

RIGHT OUTER JOIN

1
2
3
4
--右连接
SELECT e.*,d.*
FROM emp e RIGHT OUTER JOIN dept d
ON e.deptno = d.deptno;

执行右连接后,由于 40 号部门下没有任何员工,所以此时员工信息为 NULL。这个查询可以很好的复述上面提到的——JOIN 语句的关联条件必须用 ON 指定,不能用 WHERE 指定。你可以把 ON 改成 WHERE,你会发现无论如何都查不出 40 号部门这条数据,因为笛卡尔运算不会有 (NULL, 40) 这种情况。

img

FULL OUTER JOIN

1
2
3
SELECT e.*,d.*
FROM emp e FULL OUTER JOIN dept d
ON e.deptno = d.deptno;

LEFT SEMI JOIN

LEFT SEMI JOIN (左半连接)是 IN/EXISTS 子查询的一种更高效的实现。

  • JOIN 子句中右边的表只能在 ON 子句中设置过滤条件;
  • 查询结果只包含左边表的数据,所以只能 SELECT 左表中的列。
1
2
3
4
5
6
7
8
-- 查询在纽约办公的所有员工信息
SELECT emp.*
FROM emp LEFT SEMI JOIN dept
ON emp.deptno = dept.deptno AND dept.loc="NEW YORK";

--上面的语句就等价于
SELECT emp.* FROM emp
WHERE emp.deptno IN (SELECT deptno FROM dept WHERE loc="NEW YORK");

JOIN

笛卡尔积连接,这个连接日常的开发中可能很少遇到,且性能消耗比较大,基于这个原因,如果在严格模式下 (hive.mapred.mode = strict),Hive 会阻止用户执行此操作。

1
SELECT * FROM emp JOIN dept;

JOIN 优化

STREAMTABLE

在多表进行联结的时候,如果每个 ON 字句都使用到共同的列(如下面的 b.key),此时 Hive 会进行优化,将多表 JOIN 在同一个 map / reduce 作业上进行。同时假定查询的最后一个表(如下面的 c 表)是最大的一个表,在对每行记录进行 JOIN 操作时,它将尝试将其他的表缓存起来,然后扫描最后那个表进行计算。因此用户需要保证查询的表的大小从左到右是依次增加的。

1
`SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key) JOIN c ON (c.key = b.key)`

然后,用户并非需要总是把最大的表放在查询语句的最后面,Hive 提供了 /*+ STREAMTABLE() */ 标志,用于标识最大的表,示例如下:

1
2
3
4
SELECT /*+ STREAMTABLE(d) */  e.*,d.*
FROM emp e JOIN dept d
ON e.deptno = d.deptno
WHERE job='CLERK';

MAPJOIN

如果所有表中只有一张表是小表,那么 Hive 把这张小表加载到内存中。这时候程序会在 map 阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在 map 就进行了 JOIN 操作,从而可以省略 reduce 过程,这样效率可以提升很多。Hive 中提供了 /*+ MAPJOIN() */ 来标记小表,示例如下:

1
2
3
4
SELECT /*+ MAPJOIN(d) */ e.*,d.*
FROM emp e JOIN dept d
ON e.deptno = d.deptno
WHERE job='CLERK';

SELECT 的其他用途

查看当前数据库:

1
SELECT current_database()

本地模式

在上面演示的语句中,大多数都会触发 MapReduce, 少部分不会触发,比如 select * from emp limit 5 就不会触发 MR,此时 Hive 只是简单的读取数据文件中的内容,然后格式化后进行输出。在需要执行 MapReduce 的查询中,你会发现执行时间可能会很长,这时候你可以选择开启本地模式。

1
2
--本地模式默认关闭,需要手动开启此功能
SET hive.exec.mode.local.auto=true;

启用后,Hive 将分析查询中每个 map-reduce 作业的大小,如果满足以下条件,则可以在本地运行它:

  • 作业的总输入大小低于:hive.exec.mode.local.auto.inputbytes.max(默认为 128MB);
  • map-tasks 的总数小于:hive.exec.mode.local.auto.tasks.max(默认为 4);
  • 所需的 reduce 任务总数为 1 或 0。

因为我们测试的数据集很小,所以你再次去执行上面涉及 MR 操作的查询,你会发现速度会有显著的提升。

参考资料

Hive 简介

简介

Hive 是一个构建在 Hadoop 之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类 SQL 查询功能,用于查询的 SQL 语句会被转化为 MapReduce 作业,然后提交到 Hadoop 上运行。

特点

  1. 简单、容易上手 (提供了类似 sql 的查询语言 hql),使得精通 sql 但是不了解 Java 编程的人也能很好地进行大数据分析;
  2. 灵活性高,可以自定义用户函数 (UDF) 和存储格式;
  3. 为超大的数据集设计的计算和存储能力,集群扩展容易;
  4. 统一的元数据管理,可与 presto/impala/sparksql 等共享数据;
  5. 执行延迟高,不适合做数据的实时处理,但适合做海量数据的离线处理。

Hive 的体系架构

img

command-line shell & thrift/jdbc

可以用 command-line shell 和 thrift/jdbc 两种方式来操作数据:

  • command-line shell:通过 hive 命令行的的方式来操作数据;
  • thrift/jdbc:通过 thrift 协议按照标准的 JDBC 的方式操作数据。

Metastore

在 Hive 中,表名、表结构、字段名、字段类型、表的分隔符等统一被称为元数据。所有的元数据默认存储在 Hive 内置的 derby 数据库中,但由于 derby 只能有一个实例,也就是说不能有多个命令行客户端同时访问,所以在实际生产环境中,通常使用 MySQL 代替 derby。

Hive 进行的是统一的元数据管理,就是说你在 Hive 上创建了一张表,然后在 presto/impala/sparksql 中都是可以直接使用的,它们会从 Metastore 中获取统一的元数据信息,同样的你在 presto/impala/sparksql 中创建一张表,在 Hive 中也可以直接使用。

HQL 的执行流程

Hive 在执行一条 HQL 的时候,会经过以下步骤:

  1. 语法解析:Antlr 定义 SQL 的语法规则,完成 SQL 词法,语法解析,将 SQL 转化为抽象 语法树 AST Tree;
  2. 语义解析:遍历 AST Tree,抽象出查询的基本组成单元 QueryBlock;
  3. 生成逻辑执行计划:遍历 QueryBlock,翻译为执行操作树 OperatorTree;
  4. 优化逻辑执行计划:逻辑层优化器进行 OperatorTree 变换,合并不必要的 ReduceSinkOperator,减少 shuffle 数据量;
  5. 生成物理执行计划:遍历 OperatorTree,翻译为 MapReduce 任务;
  6. 优化物理执行计划:物理层优化器进行 MapReduce 任务的变换,生成最终的执行计划。

关于 Hive SQL 的详细执行流程可以参考美团技术团队的文章:Hive SQL 的编译过程

数据类型

基本数据类型

Hive 表中的列支持以下基本数据类型:

大类 类型
Integers(整型) TINYINT—1 字节的有符号整数
SMALLINT—2 字节的有符号整数
INT—4 字节的有符号整数
BIGINT—8 字节的有符号整数
Boolean(布尔型) BOOLEAN—TRUE/FALSE
Floating point numbers(浮点型) FLOAT— 单精度浮点型
DOUBLE—双精度浮点型
Fixed point numbers(定点数) DECIMAL—用户自定义精度定点数,比如 DECIMAL(7,2)
String types(字符串) STRING—指定字符集的字符序列
VARCHAR—具有最大长度限制的字符序列
CHAR—固定长度的字符序列
Date and time types(日期时间类型) TIMESTAMP — 时间戳
TIMESTAMP WITH LOCAL TIME ZONE — 时间戳,纳秒精度
DATE—日期类型
Binary types(二进制类型) BINARY—字节序列

TIMESTAMP 和 TIMESTAMP WITH LOCAL TIME ZONE 的区别如下:

  • TIMESTAMP WITH LOCAL TIME ZONE:用户提交时间给数据库时,会被转换成数据库所在的时区来保存。查询时则按照查询客户端的不同,转换为查询客户端所在时区的时间。
  • TIMESTAMP :提交什么时间就保存什么时间,查询时也不做任何转换。

隐式转换

Hive 中基本数据类型遵循以下的层次结构,按照这个层次结构,子类型到祖先类型允许隐式转换。例如 INT 类型的数据允许隐式转换为 BIGINT 类型。额外注意的是:按照类型层次结构允许将 STRING 类型隐式转换为 DOUBLE 类型。

img

复杂类型

类型 描述 示例
STRUCT 类似于对象,是字段的集合,字段的类型可以不同,可以使用 名称。字段名 方式进行访问 STRUCT (‘xiaoming’, 12 , ‘2018-12-12’)
MAP 键值对的集合,可以使用 名称 [key] 的方式访问对应的值 map(‘a’, 1, ‘b’, 2)
ARRAY 数组是一组具有相同类型和名称的变量的集合,可以使用 名称 [index] 访问对应的值 ARRAY(‘a’, ‘b’, ‘c’, ‘d’)

示例

如下给出一个基本数据类型和复杂数据类型的使用示例:

1
2
3
4
5
6
7
CREATE TABLE students(
name STRING, -- 姓名
age INT, -- 年龄
subject ARRAY<STRING>, --学科
score MAP<STRING,FLOAT>, --各个学科考试成绩
address STRUCT<houseNumber:int, street:STRING, city:STRING, province:STRING> --家庭居住地址
) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t";

内容格式

当数据存储在文本文件中,必须按照一定格式区别行和列,如使用逗号作为分隔符的 CSV 文件 (Comma-Separated Values) 或者使用制表符作为分隔值的 TSV 文件 (Tab-Separated Values)。但此时也存在一个缺点,就是正常的文件内容中也可能出现逗号或者制表符。

所以 Hive 默认使用了几个平时很少出现的字符,这些字符一般不会作为内容出现在文件中。Hive 默认的行和列分隔符如下表所示。

分隔符 描述
\n 对于文本文件来说,每行是一条记录,所以可以使用换行符来分割记录
^A (Ctrl+A) 分割字段 (列),在 CREATE TABLE 语句中也可以使用八进制编码 \001 来表示
^B 用于分割 ARRAY 或者 STRUCT 中的元素,或者用于 MAP 中键值对之间的分割,
在 CREATE TABLE 语句中也可以使用八进制编码 \002 表示
^C 用于 MAP 中键和值之间的分割,在 CREATE TABLE 语句中也可以使用八进制编码 \003 表示

使用示例如下:

1
2
3
4
5
6
CREATE TABLE page_view(viewTime INT, userid BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

存储格式

支持的存储格式

Hive 会在 HDFS 为每个数据库上创建一个目录,数据库中的表是该目录的子目录,表中的数据会以文件的形式存储在对应的表目录下。Hive 支持以下几种文件存储格式:

格式 说明
TextFile 存储为纯文本文件。 这是 Hive 默认的文件存储格式。这种存储方式数据不做压缩,磁盘开销大,数据解析开销大。
SequenceFile SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。
RCFile RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。
ORC Files ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。
Avro Files Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。
Parquet Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。

以上压缩格式中 ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用这两种格式。

指定存储格式

通常在创建表的时候使用 STORED AS 参数指定:

1
2
3
4
5
6
CREATE TABLE page_view(viewTime INT, userid BIGINT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

各个存储文件类型指定方式如下:

  • STORED AS TEXTFILE
  • STORED AS SEQUENCEFILE
  • STORED AS ORC
  • STORED AS PARQUET
  • STORED AS AVRO
  • STORED AS RCFILE

内部表和外部表

内部表又叫做管理表 (Managed/Internal Table),创建表时不做任何指定,默认创建的就是内部表。想要创建外部表 (External Table),则需要使用 External 进行修饰。 内部表和外部表主要区别如下:

内部表 外部表
数据存储位置 内部表数据存储的位置由 hive.metastore.warehouse.dir 参数指定,默认情况下表的数据存储在 HDFS 的 /user/hive/warehouse/数据库名。db/表名/ 目录下 外部表数据的存储位置创建表时由 Location 参数指定;
导入数据 在导入数据到内部表,内部表将数据移动到自己的数据仓库目录下,数据的生命周期由 Hive 来进行管理 外部表不会将数据移动到自己的数据仓库目录下,只是在元数据中存储了数据的位置
删除表 删除元数据(metadata)和文件 只删除元数据(metadata)

参考资料

Hive 表

分区表

概念

Hive 中的表对应为 HDFS 上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。

分区为 HDFS 上表目录的子目录,数据按照分区存储在子目录中。如果查询的 where 子句中包含分区条件,则直接从该分区去查找,而不是扫描整个表目录,合理的分区设计可以极大提高查询速度和性能。

分区表并非 Hive 独有的概念,实际上这个概念非常常见。通常,在管理大规模数据集的时候都需要进行分区,比如将日志文件按天进行分区,从而保证数据细粒度的划分,使得查询性能得到提升。比如,在我们常用的 Oracle 数据库中,当表中的数据量不断增大,查询数据的速度就会下降,这时也可以对表进行分区。表进行分区后,逻辑上表仍然是一张完整的表,只是将表中的数据存放到多个表空间(物理文件上),这样查询数据时,就不必要每次都扫描整张表,从而提升查询性能。

创建分区表

在 Hive 中可以使用 PARTITIONED BY 子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_partition(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2)
)
PARTITIONED BY (deptno INT) -- 按照部门编号进行分区
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_partition';

加载数据到分区表

加载数据到分区表时候必须要指定数据所处的分区:

1
2
3
4
# 加载部门编号为 20 的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp20.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=20)
# 加载部门编号为 30 的数据到表中
LOAD DATA LOCAL INPATH "/usr/file/emp30.txt" OVERWRITE INTO TABLE emp_partition PARTITION (deptno=30)

查看分区目录

这时候我们直接查看表目录,可以看到表目录下存在两个子目录,分别是 deptno=20deptno=30, 这就是分区目录,分区目录下才是我们加载的数据文件。

1
# hadoop fs -ls  hdfs://hadoop001:8020/hive/emp_partition/

这时候当你的查询语句的 where 包含 deptno=20,则就去对应的分区目录下进行查找,而不用扫描全表。

img

分桶表

简介

分区提供了一个隔离数据和优化查询的可行方案,但是并非所有的数据集都可以形成合理的分区,分区的数量也不是越多越好,过多的分区条件可能会导致很多分区上没有数据。同时 Hive 会限制动态分区可以创建的最大分区数,用来避免过多分区文件对文件系统产生负担。鉴于以上原因,Hive 还提供了一种更加细粒度的数据拆分方案:分桶表 (bucket Table)。

分桶表会将指定列的值进行哈希散列,并对 bucket(桶数量)取余,然后存储到对应的 bucket(桶)中。

理解分桶表

单从概念上理解分桶表可能会比较晦涩,其实和分区一样,分桶这个概念同样不是 Hive 独有的,对于 Java 开发人员而言,这可能是一个每天都会用到的概念,因为 Hive 中的分桶概念和 Java 数据结构中的 HashMap 的分桶概念是一致的。

当调用 HashMap 的 put() 方法存储数据时,程序会先对 key 值调用 hashCode() 方法计算出 hashcode,然后对数组长度取模计算出 index,最后将数据存储在数组 index 位置的链表上,链表达到一定阈值后会转换为红黑树 (JDK1.8+)。下图为 HashMap 的数据结构图:

img

图片引用自:HashMap vs. Hashtable

创建分桶表

在 Hive 中,我们可以通过 CLUSTERED BY 指定分桶列,并通过 SORTED BY 指定桶中数据的排序参考列。下面为分桶表建表语句示例:

1
2
3
4
5
6
7
8
9
10
11
12
CREATE EXTERNAL TABLE emp_bucket(
empno INT,
ename STRING,
job STRING,
mgr INT,
hiredate TIMESTAMP,
sal DECIMAL(7,2),
comm DECIMAL(7,2),
deptno INT)
CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS --按照员工编号散列到四个 bucket 中
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
LOCATION '/hive/emp_bucket';

加载数据到分桶表

这里直接使用 Load 语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。

这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:

设置强制分桶

1
set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步

在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。

CTAS 导入数据

1
INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  --这里的 emp 表就是一张普通的雇员表

可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:

img

查看分桶文件

bucket(桶) 本质上就是表目录下的具体文件:

img

分区表和分桶表结合使用

分区表和分桶表的本质都是将数据按照不同粒度进行拆分,从而使得在查询时候不必扫描全表,只需要扫描对应的分区或分桶,从而提升查询效率。两者可以结合起来使用,从而保证表数据在不同粒度上都能得到合理的拆分。下面是 Hive 官方给出的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE page_view_bucketed(
viewTime INT,
userid BIGINT,
page_url STRING,
referrer_url STRING,
ip STRING )
PARTITIONED BY(dt STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
STORED AS SEQUENCEFILE;

此时导入数据时需要指定分区:

1
2
3
INSERT OVERWRITE page_view_bucketed
PARTITION (dt='2009-02-25')
SELECT * FROM page_view WHERE dt='2009-02-25';

参考资料

Hive 视图和索引

视图

简介

Hive 中的视图和 RDBMS 中视图的概念一致,都是一组数据的逻辑表示,本质上就是一条 SELECT 语句的结果集。视图是纯粹的逻辑对象,没有关联的存储 (Hive 3.0.0 引入的物化视图除外),当查询引用视图时,Hive 可以将视图的定义与查询结合起来,例如将查询中的过滤器推送到视图中。

创建视图

1
2
3
4
5
CREATE VIEW [IF NOT EXISTS] [db_name.]view_name   -- 视图名称
[(column_name [COMMENT column_comment], ...) ] --列名
[COMMENT view_comment] --视图注释
[TBLPROPERTIES (property_name = property_value, ...)] --额外信息
AS SELECT ...;

在 Hive 中可以使用 CREATE VIEW 创建视图,如果已存在具有相同名称的表或视图,则会抛出异常,建议使用 IF NOT EXISTS 预做判断。在使用视图时候需要注意以下事项:

  • 视图是只读的,不能用作 LOAD / INSERT / ALTER 的目标;

  • 在创建视图时候视图就已经固定,对基表的后续更改(如添加列)将不会反映在视图;

  • 删除基表并不会删除视图,需要手动删除视图;

  • 视图可能包含 ORDER BYLIMIT 子句。如果引用视图的查询语句也包含这类子句,其执行优先级低于视图对应字句。例如,视图 custom_view 指定 LIMIT 5,查询语句为 select * from custom_view LIMIT 10,此时结果最多返回 5 行。

  • 创建视图时,如果未提供列名,则将从 SELECT 语句中自动派生列名;

  • 创建视图时,如果 SELECT 语句中包含其他表达式,例如 x + y,则列名称将以_C0,_C1 等形式生成;

    1
    CREATE VIEW  IF NOT EXISTS custom_view AS SELECT empno, empno+deptno , 1+2 FROM emp;

img

查看视图

1
2
3
4
5
6
-- 查看所有视图: 没有单独查看视图列表的语句,只能使用 show tables
show tables;
-- 查看某个视图
desc view_name;
-- 查看某个视图详细信息
desc formatted view_name;

删除视图

1
DROP VIEW [IF EXISTS] [db_name.]view_name;

删除视图时,如果被删除的视图被其他视图所引用,这时候程序不会发出警告,但是引用该视图其他视图已经失效,需要进行重建或者删除。

修改视图

1
ALTER VIEW [db_name.]view_name AS select_statement;

被更改的视图必须存在,且视图不能具有分区,如果视图具有分区,则修改失败。

修改视图属性

语法:

1
2
3
4
ALTER VIEW [db_name.]view_name SET TBLPROPERTIES table_properties;

table_properties:
: (property_name = property_value, property_name = property_value, ...)

示例:

1
ALTER VIEW custom_view SET TBLPROPERTIES ('create'='heibaiying','date'='2019-05-05');

img

索引

简介

Hive 在 0.7.0 引入了索引的功能,索引的设计目标是提高表某些列的查询速度。如果没有索引,带有谓词的查询(如’WHERE table1.column = 10’)会加载整个表或分区并处理所有行。但是如果 column 存在索引,则只需要加载和处理文件的一部分。

索引原理

在指定列上建立索引,会产生一张索引表(表结构如下),里面的字段包括:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。在查询涉及到索引字段时,首先到索引表查找索引列值对应的 HDFS 文件路径及偏移量,这样就避免了全表扫描。

1
2
3
4
5
6
7
+--------------+----------------+----------+--+
| col_name | data_type | comment |
+--------------+----------------+----------+--+
| empno | int | 建立索引的列 |
| _bucketname | string | HDFS 文件路径 |
| _offsets | array<bigint> | 偏移量 |
+--------------+----------------+----------+--+

创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE INDEX index_name     --索引名称
ON TABLE base_table_name (col_name, ...) --建立索引的列
AS index_type --索引类型
[WITH DEFERRED REBUILD] --重建索引
[IDXPROPERTIES (property_name=property_value, ...)] --索引额外属性
[IN TABLE index_table_name] --索引表的名字
[
[ ROW FORMAT ...] STORED AS ...
| STORED BY ...
] --索引表行分隔符 、 存储格式
[LOCATION hdfs_path] --索引表存储位置
[TBLPROPERTIES (...)] --索引表表属性
[COMMENT "index comment"]; --索引注释

查看索引

1
2
--显示表上所有列的索引
SHOW FORMATTED INDEX ON table_name;

删除索引

删除索引会删除对应的索引表。

1
DROP INDEX [IF EXISTS] index_name ON table_name;

如果存在索引的表被删除了,其对应的索引和索引表都会被删除。如果被索引表的某个分区被删除了,那么分区对应的分区索引也会被删除。

重建索引

1
ALTER INDEX index_name ON table_name [PARTITION partition_spec] REBUILD;

重建索引。如果指定了 PARTITION,则仅重建该分区的索引。

索引案例

创建索引

在 emp 表上针对 empno 字段创建名为 emp_index,索引数据存储在 emp_index_table 索引表中

1
2
3
4
create index emp_index on table emp(empno) as
'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
with deferred rebuild
in table emp_index_table ;

此时索引表中是没有数据的,需要重建索引才会有索引的数据。

重建索引

1
alter index emp_index on emp rebuild;

Hive 会启动 MapReduce 作业去建立索引,建立好后查看索引表数据如下。三个表字段分别代表:索引列的值、该值对应的 HDFS 文件路径、该值在文件中的偏移量。

img

自动使用索引

默认情况下,虽然建立了索引,但是 Hive 在查询时候是不会自动去使用索引的,需要开启相关配置。开启配置后,涉及到索引列的查询就会使用索引功能去优化查询。

1
2
3
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
SET hive.optimize.index.filter=true;
SET hive.optimize.index.filter.compact.minsize=0;

查看索引

1
SHOW INDEX ON emp;

img

索引的缺陷

索引表最主要的一个缺陷在于:索引表无法自动 rebuild,这也就意味着如果表中有数据新增或删除,则必须手动 rebuild,重新执行 MapReduce 作业,生成索引表数据。

同时按照官方文档 的说明,Hive 会从 3.0 开始移除索引功能,主要基于以下两个原因:

  • 具有自动重写的物化视图 (Materialized View) 可以产生与索引相似的效果(Hive 2.3.0 增加了对物化视图的支持,在 3.0 之后正式引入)。
  • 使用列式存储文件格式(Parquet,ORC)进行存储时,这些格式支持选择性扫描,可以跳过不需要的文件或块。

ORC 内置的索引功能可以参阅这篇文章:Hive 性能优化之 ORC 索引–Row Group Index vs Bloom Filter Index

参考资料

Hive 运维

Hive 安装

下载并解压

下载所需版本的 Hive,这里我下载版本为 cdh5.15.2。下载地址:http://archive.cloudera.com/cdh5/cdh/5/

1
2
# 下载后进行解压
tar -zxvf hive-1.1.0-cdh5.15.2.tar.gz

配置环境变量

1
# vim /etc/profile

添加环境变量:

1
2
export HIVE_HOME=/usr/app/hive-1.1.0-cdh5.15.2
export PATH=$HIVE_HOME/bin:$PATH

使得配置的环境变量立即生效:

1
# source /etc/profile

修改配置

1. hive-env.sh

进入安装目录下的 conf/ 目录,拷贝 Hive 的环境配置模板 flume-env.sh.template

1
cp hive-env.sh.template hive-env.sh

修改 hive-env.sh,指定 Hadoop 的安装路径:

1
HADOOP_HOME=/usr/app/hadoop-2.6.0-cdh5.15.2

2. hive-site.xml

新建 hive-site.xml 文件,内容如下,主要是配置存放元数据的 MySQL 的地址、驱动、用户名和密码等信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://hadoop001:3306/hadoop_hive?createDatabaseIfNotExist=true</value>
</property>

<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>

<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>

<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
</property>

</configuration>

拷贝数据库驱动

将 MySQL 驱动包拷贝到 Hive 安装目录的 lib 目录下, MySQL 驱动的下载地址为:https://dev.mysql.com/downloads/connector/j/

初始化元数据库

  • 当使用的 hive 是 1.x 版本时,可以不进行初始化操作,Hive 会在第一次启动的时候会自动进行初始化,但不会生成所有的元数据信息表,只会初始化必要的一部分,在之后的使用中用到其余表时会自动创建;

  • 当使用的 hive 是 2.x 版本时,必须手动初始化元数据库。初始化命令:

    1
    2
    # schematool 命令在安装目录的 bin 目录下,由于上面已经配置过环境变量,在任意位置执行即可
    schematool -dbType mysql -initSchema

这里我使用的是 CDH 的 hive-1.1.0-cdh5.15.2.tar.gz,对应 Hive 1.1.0 版本,可以跳过这一步。

启动

由于已经将 Hive 的 bin 目录配置到环境变量,直接使用以下命令启动,成功进入交互式命令行后执行 show databases 命令,无异常则代表搭建成功。

1
# hive

img

在 Mysql 中也能看到 Hive 创建的库和存放元数据信息的表

img

HiveServer2/beeline

Hive 内置了 HiveServer 和 HiveServer2 服务,两者都允许客户端使用多种编程语言进行连接,但是 HiveServer 不能处理多个客户端的并发请求,因此产生了 HiveServer2。

HiveServer2(HS2)允许远程客户端可以使用各种编程语言向 Hive 提交请求并检索结果,支持多客户端并发访问和身份验证。HS2 是由多个服务组成的单个进程,其包括基于 Thrift 的 Hive 服务(TCP 或 HTTP)和用于 Web UI 的 Jetty Web 服务。

HiveServer2 拥有自己的 CLI 工具——Beeline。Beeline 是一个基于 SQLLine 的 JDBC 客户端。由于目前 HiveServer2 是 Hive 开发维护的重点,所以官方更加推荐使用 Beeline 而不是 Hive CLI。以下主要讲解 Beeline 的配置方式。

修改 Hadoop 配置

修改 hadoop 集群的 core-site.xml 配置文件,增加如下配置,指定 hadoop 的 root 用户可以代理本机上所有的用户。

1
2
3
4
5
6
7
8
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>

之所以要配置这一步,是因为 hadoop 2.0 以后引入了安全伪装机制,使得 hadoop 不允许上层系统(如 hive)直接将实际用户传递到 hadoop 层,而应该将实际用户传递给一个超级代理,由该代理在 hadoop 上执行操作,以避免任意客户端随意操作 hadoop。如果不配置这一步,在之后的连接中可能会抛出 AuthorizationException 异常。

关于 Hadoop 的用户代理机制,可以参考:hadoop 的用户代理机制Superusers Acting On Behalf Of Other Users

启动 hiveserver2

由于上面已经配置过环境变量,这里直接启动即可:

1
# nohup hiveserver2 &

使用 beeline

可以使用以下命令进入 beeline 交互式命令行,出现 Connected 则代表连接成功。

1
beeline -u jdbc:hive2://hadoop001:10000 -n root

Beeline 选项

Beeline 拥有更多可使用参数,可以使用 beeline --help 查看,完整参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Usage: java org.apache.hive.cli.beeline.BeeLine
-u <database url> the JDBC URL to connect to
-r reconnect to last saved connect url (in conjunction with !save)
-n <username> the username to connect as
-p <password> the password to connect as
-d <driver class> the driver class to use
-i <init file> script file for initialization
-e <query> query that should be executed
-f <exec file> script file that should be executed
-w (or) --password-file <password file> the password file to read password from
--hiveconf property=value Use value for given property
--hivevar name=value hive variable name and value
This is Hive specific settings in which variables
can be set at session level and referenced in Hive
commands or queries.
--property-file=<property-file> the file to read connection properties (url, driver, user, password) from
--color=[true/false] control whether color is used for display
--showHeader=[true/false] show column names in query results
--headerInterval=ROWS; the interval between which heades are displayed
--fastConnect=[true/false] skip building table/column list for tab-completion
--autoCommit=[true/false] enable/disable automatic transaction commit
--verbose=[true/false] show verbose error messages and debug info
--showWarnings=[true/false] display connection warnings
--showNestedErrs=[true/false] display nested errors
--numberFormat=[pattern] format numbers using DecimalFormat pattern
--force=[true/false] continue running script even after errors
--maxWidth=MAXWIDTH the maximum width of the terminal
--maxColumnWidth=MAXCOLWIDTH the maximum width to use when displaying columns
--silent=[true/false] be more silent
--autosave=[true/false] automatically save preferences
--outputformat=[table/vertical/csv2/tsv2/dsv/csv/tsv] format mode for result display
--incrementalBufferRows=NUMROWS the number of rows to buffer when printing rows on stdout,
defaults to 1000; only applicable if --incremental=true
and --outputformat=table
--truncateTable=[true/false] truncate table column when it exceeds length
--delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)
--isolation=LEVEL set the transaction isolation level
--nullemptystring=[true/false] set to true to get historic behavior of printing null as empty string
--maxHistoryRows=MAXHISTORYROWS The maximum number of rows to store beeline history.
--convertBinaryArrayToString=[true/false] display binary column data as string or as byte array
--help display this message

常用参数

在 Hive CLI 中支持的参数,Beeline 都支持,常用的参数如下。更多参数说明可以参见官方文档 Beeline Command Options

参数 说明
-u 数据库地址
-n 用户名
-p 密码
-d 驱动 (可选)
-e* 执行 SQL 命令
-f* 执行 SQL 脚本
-i (or)--init 在进入交互模式之前运行初始化脚本
--property-file 指定配置文件
--hiveconf property=value 指定配置属性
--hivevar name=value 用户自定义属性,在会话级别有效

示例: 使用用户名和密码连接 Hive

1
beeline -u jdbc:hive2://localhost:10000  -n username -p password

Hive 命令

Help

使用 hive -H 或者 hive --help 命令可以查看所有命令的帮助,显示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
usage: hive
-d,--define <key=value> Variable subsitution to apply to hive
commands. e.g. -d A=B or --define A=B --定义用户自定义变量
--database <databasename> Specify the database to use -- 指定使用的数据库
-e <quoted-query-string> SQL from command line -- 执行指定的 SQL
-f <filename> SQL from files --执行 SQL 脚本
-H,--help Print help information -- 打印帮助信息
--hiveconf <property=value> Use value for given property --自定义配置
--hivevar <key=value> Variable subsitution to apply to hive --自定义变量
commands. e.g. --hivevar A=B
-i <filename> Initialization SQL file --在进入交互模式之前运行初始化脚本
-S,--silent Silent mode in interactive shell --静默模式
-v,--verbose Verbose mode (echo executed SQL to the console) --详细模式

交互式命令行

直接使用 Hive 命令,不加任何参数,即可进入交互式命令行。

执行 SQL 命令

在不进入交互式命令行的情况下,可以使用 hive -e执行 SQL 命令。

1
hive -e 'select * from emp';

img

执行 SQL 脚本

用于执行的 sql 脚本可以在本地文件系统,也可以在 HDFS 上。

1
2
3
4
5
# 本地文件系统
hive -f /usr/file/simple.sql;

# HDFS文件系统
hive -f hdfs://hadoop001:8020/tmp/simple.sql;

其中 simple.sql 内容如下:

1
select * from emp;

配置 Hive 变量

可以使用 --hiveconf 设置 Hive 运行时的变量。

1
2
3
hive -e 'select * from emp' \
--hiveconf hive.exec.scratchdir=/tmp/hive_scratch \
--hiveconf mapred.reduce.tasks=4;

hive.exec.scratchdir:指定 HDFS 上目录位置,用于存储不同 map/reduce 阶段的执行计划和这些阶段的中间输出结果。

配置文件启动

使用 -i 可以在进入交互模式之前运行初始化脚本,相当于指定配置文件启动。

1
hive -i /usr/file/hive-init.conf;

其中 hive-init.conf 的内容如下:

1
set hive.exec.mode.local.auto = true;

hive.exec.mode.local.auto 默认值为 false,这里设置为 true ,代表开启本地模式。

用户自定义变量

--define--hivevar在功能上是等价的,都是用来实现自定义变量,这里给出一个示例:

定义变量:

1
hive  --define  n=ename --hiveconf  --hivevar j=job;

在查询中引用自定义变量:

1
2
3
4
5
6
7
# 以下两条语句等价
hive > select ${n} from emp;
hive > select ${hivevar:n} from emp;

# 以下两条语句等价
hive > select ${j} from emp;
hive > select ${hivevar:j} from emp;

结果如下:

img

Hive 配置

可以通过三种方式对 Hive 的相关属性进行配置,分别介绍如下:

配置文件

方式一为使用配置文件,使用配置文件指定的配置是永久有效的。Hive 有以下三个可选的配置文件:

  • hive-site.xml - Hive 的主要配置文件;
  • hivemetastore-site.xml - 关于元数据的配置;
  • hiveserver2-site.xml - 关于 HiveServer2 的配置。

示例如下,在 hive-site.xml 配置 hive.exec.scratchdir

1
2
3
4
5
<property>
<name>hive.exec.scratchdir</name>
<value>/tmp/mydir</value>
<description>Scratch space for Hive jobs</description>
</property>

hiveconf

方式二为在启动命令行 (Hive CLI / Beeline) 的时候使用 --hiveconf 指定配置,这种方式指定的配置作用于整个 Session。

1
hive --hiveconf hive.exec.scratchdir=/tmp/mydir

set

方式三为在交互式环境下 (Hive CLI / Beeline),使用 set 命令指定。这种设置的作用范围也是 Session 级别的,配置对于执行该命令后的所有命令生效。set 兼具设置参数和查看参数的功能。如下:

1
2
3
4
5
6
7
8
0: jdbc:hive2://hadoop001:10000> set hive.exec.scratchdir=/tmp/mydir;
No rows affected (0.025 seconds)
0: jdbc:hive2://hadoop001:10000> set hive.exec.scratchdir;
+----------------------------------+--+
| set |
+----------------------------------+--+
| hive.exec.scratchdir=/tmp/mydir |
+----------------------------------+--+

配置优先级

配置的优先顺序如下 (由低到高):
hive-site.xml - >hivemetastore-site.xml- > hiveserver2-site.xml - >-- hiveconf- > set

配置参数

Hive 可选的配置参数非常多,在用到时查阅官方文档即可AdminManual Configuration

参考资料