博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hive执行流程(3)-Driver类分析1Driver类整体流程
阅读量:6304 次
发布时间:2019-06-22

本文共 10054 字,大约阅读时间需要 33 分钟。

Driver类是对

1
org.apache.hadoop.hive.ql.processors.CommandProcessor.java

接口的实现,重写了run方法,定义了常见sql的执行方式.

1
public 
class 
Driver 
implements 
CommandProcessor

具体的方法调用顺序:

1
2
run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal--->
compile--analyzer(BaseSemanticAnalyzer)--->execute

其中compile和execute是两个比较重要的方法:

compile用来完成语法和语义的分析,生成执行计划

execute执行物理计划,即提交相应的mapredjob

通过打印perflog可以看到Driver类的简单地时序图:

下面来看下Driver类的几个常用的方法实现:

1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:

1
org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

2)checkConcurrency 用来判断当前hive设置是否支持并发控制:

1
boolean 
supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

主要是通过判断hive.support.concurrency参数,默认是false

3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
public 
ClusterStatus getClusterStatus() 
throws 
Exception {
    
ClusterStatus cs;
    
try 
{
      
JobConf job = 
new 
JobConf(conf , ExecDriver.
class
);
      
JobClient jc = 
new 
JobClient(job);
      
cs = jc.getClusterStatus();
    
catch 
(Exception e) {
      
e.printStackTrace();
      
throw 
e;
    
}
    
LOG.info( 
"Returning cluster status: " 
+ cs.toString());
    
return 
cs;
  
}

4)getSchema   

1
doAuthorization/doAuthorizationV2/getHivePrivObjects

用来在开启权限验证情况下对sql的权限检测操作

1
getLockObjects/acquireReadWriteLocks/releaseLocks

都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
getLockObjects:
  
private 
List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)
      
throws 
SemanticException {
    
List<HiveLockObj> locks = 
new 
LinkedList<HiveLockObj>();
    
HiveLockObjectData lockData =
      
new 
HiveLockObjectData( plan.getQueryId(),
                             
String. valueOf(System.currentTimeMillis ()),
                             
"IMPLICIT"
,
                             
plan.getQueryStr());
    
if 
(d != 
null
) {
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(d.getName(), lockData), mode));  
//数据库层面的锁
      
return 
locks;
    
}
    
if 
(t != 
null
) {  
// 表层面的锁
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(t.getDbName(), lockData), mode));
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(t, lockData), mode));
      
mode = HiveLockMode.SHARED;
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(t.getDbName(), lockData), mode));
      
return 
locks;
    
}
    
if 
(p != 
null
) { 
//分区层面的锁
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(p.getTable().getDbName(), lockData), mode));
      
if 
(!(p 
instanceof 
DummyPartition)) {
        
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(p, lockData), mode));
      
}
      
// All the parents are locked in shared mode
      
mode = HiveLockMode.SHARED;
      
// For dummy partitions, only partition name is needed
      
String name = p.getName();
      
if 
(p 
instanceof 
DummyPartition) {
        
name = p.getName().split( 
"@"
)[
2
];
      
}
      
String partialName = 
""
;
      
String[] partns = name.split( 
"/"
);
      
int 
len = p 
instanceof 
DummyPartition ? partns.length : partns.length - 
1
;
      
Map<String, String> partialSpec = 
new 
LinkedHashMap<String, String>();
      
for 
int 
idx = 
0
; idx < len; idx++) {
        
String partn = partns[idx];
        
partialName += partn;
        
String[] nameValue = partn.split( 
"="
);
        
assert
(nameValue.length == 
2
);
        
partialSpec.put(nameValue[
0
], nameValue[
1
]);
        
try 
{
          
locks.add( 
new 
HiveLockObj(
                      
new 
HiveLockObject(
new 
DummyPartition(p.getTable(), p.getTable().getDbName()
                                                            
"/" 
+ p.getTable().getTableName()
                                                            
"/" 
+ partialName,
                                                              
partialSpec), lockData), mode));
          
partialName += 
"/"
;
        
catch 
(HiveException e) {
          
throw 
new 
SemanticException(e.getMessage());
        
}
      
}
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(p.getTable(), lockData), mode));
      
locks.add( 
new 
HiveLockObj(
new 
HiveLockObject(p.getTable().getDbName(), lockData), mode));
    
}
    
return 
locks;
  
}

acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法

releaseLocks调用了锁具体实现类的releaseLocks方法

run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:

1
2
3
4
5
运行hive.exec.driver.run.hooks中设置的hook,
运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类
--->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等)
---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法
---->返回CommandProcessorResponse对象

相关代码: 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
private 
CommandProcessorResponse runInternal(String command, 
boolean 
alreadyCompiled)
      
throws 
CommandNeedRetryException {
    
errorMessage = 
null
;
    
SQLState = 
null
;
    
downstreamError = 
null
;
    
if 
(!validateConfVariables()) {
      
return 
new 
CommandProcessorResponse(
12
, errorMessage , SQLState );
    
}
    
HiveDriverRunHookContext hookContext = 
new 
HiveDriverRunHookContextImpl(conf , command);
    
// Get all the driver run hooks and pre-execute them.
    
List<HiveDriverRunHook> driverRunHooks;
    
try 
{               
//运行hive.exec.driver.run.hooks中设置的hook
      
driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, 
          
HiveDriverRunHook. 
class
);         
      
for 
(HiveDriverRunHook driverRunHook : driverRunHooks) {
          
driverRunHook.preDriverRun(hookContext); 
//运行HiveDriverRunHook相关类的的preDriverRun方法
      
}
    
catch 
(Exception e) {
      
errorMessage = 
"FAILED: Hive Internal Error: " 
+ Utilities.getNameMessage(e);
      
SQLState = ErrorMsg. findSQLState(e.getMessage());
      
downstreamError = e;
      
console.printError( errorMessage + 
"\n"
          
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
      
return 
new 
CommandProcessorResponse(
12
, errorMessage , SQLState );
    
}
    
// Reset the perf logger
    
PerfLogger perfLogger = PerfLogger.getPerfLogger( 
true
);
    
perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN);
    
perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
    
int 
ret;
    
boolean 
requireLock = 
false
;
    
boolean 
ckLock = 
false
;
    
try 
{
      
ckLock = checkConcurrency();  
//检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
      
createTxnManager();
    
catch 
(SemanticException e) {
      
errorMessage = 
"FAILED: Error in semantic analysis: " 
+ e.getMessage();
      
SQLState = ErrorMsg. findSQLState(e.getMessage());
      
downstreamError = e;
      
console.printError( errorMessage, 
"\n"
          
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
      
ret = 
10
;
      
return 
new 
CommandProcessorResponse(ret, errorMessage , SQLState );
    
}
    
ret = recordValidTxns();
    
if 
(ret != 
0
return 
new 
CommandProcessorResponse(ret, errorMessage, SQLState);
    
if 
(!alreadyCompiled) {
      
ret = compileInternal(command);  
//调用compileInternal方法
      
if 
(ret != 
0
) {
        
return 
new 
CommandProcessorResponse(ret, errorMessage, SQLState);
      
}
    
}
    
// the reason that we set the txn manager for the cxt here is because each
    
// query has its own ctx object. The txn mgr is shared across the
    
// same instance of Driver, which can run multiple queries.
    
ctx.setHiveTxnManager( txnMgr);
    
if 
(ckLock) {  
//断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false
      
boolean 
lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
      
if
(lockOnlyMapred) {
        
Queue<Task<? 
extends 
Serializable>> taskQueue = 
new 
LinkedList<Task<? 
extends 
Serializable>>();
        
taskQueue.addAll( plan.getRootTasks());
        
while 
(taskQueue.peek() != 
null
) {
          
Task<? 
extends 
Serializable> tsk = taskQueue.remove();
          
requireLock = requireLock || tsk.requireLock();
          
if
(requireLock) {
            
break
;
          
}
          
if 
(tsk 
instanceof 
ConditionalTask) {
            
taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
          
}
          
if
(tsk.getChildTasks()!= 
null
) {
            
taskQueue.addAll(tsk.getChildTasks());
          
}
          
// does not add back up task here, because back up task should be the same
          
// type of the original task.
        
}
      
else 
{
        
requireLock = 
true
;
      
}
    
}
    
if 
(requireLock) { 
//获取锁
      
ret = acquireReadWriteLocks();
      
if 
(ret != 
0
) {
        
try 
{
          
releaseLocks( ctx.getHiveLocks());
        
catch 
(LockException e) {
          
// Not much to do here
        
}
        
return 
new 
CommandProcessorResponse(ret, errorMessage, SQLState);
      
}
    
}
    
ret = execute(); 
//job运行
    
if 
(ret != 
0
) {
      
//if needRequireLock is false, the release here will do nothing because there is no lock
      
try 
{
        
releaseLocks( ctx.getHiveLocks());
      
catch 
(LockException e) {
        
// Nothing to do here
      
}
      
return 
new 
CommandProcessorResponse(ret, errorMessage , SQLState );
    
}
    
//if needRequireLock is false, the release here will do nothing because there is no lock
    
try 
{
      
releaseLocks( ctx.getHiveLocks());
    
catch 
(LockException e) {
      
errorMessage = 
"FAILED: Hive Internal Error: " 
+ Utilities.getNameMessage(e);
      
SQLState = ErrorMsg. findSQLState(e.getMessage());
      
downstreamError = e;
      
console.printError( errorMessage + 
"\n"
          
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
      
return 
new 
CommandProcessorResponse(
12
, errorMessage , SQLState );
    
}
    
perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN);
    
perfLogger.close(LOG, plan);
    
// Take all the driver run hooks and post-execute them.
    
try 
{
      
for 
(HiveDriverRunHook driverRunHook : driverRunHooks) {  
//运行HiveDriverRunHook相关类的的postDriverRun方法
          
driverRunHook.postDriverRun(hookContext);
      
}
    
catch 
(Exception e) {
      
errorMessage = 
"FAILED: Hive Internal Error: " 
+ Utilities.getNameMessage(e);
      
SQLState = ErrorMsg. findSQLState(e.getMessage());
      
downstreamError = e;
      
console.printError( errorMessage + 
"\n"
          
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
      
return 
new 
CommandProcessorResponse(
12
, errorMessage , SQLState );
    
}
    
return 
new 
CommandProcessorResponse(ret);
  
}

8)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
再来看下compileInternal方法
  
private 
static 
final 
Object compileMonitor = 
new 
Object();
  
private 
int 
compileInternal(String command) {
    
int 
ret;
    
synchronized 
( compileMonitor) {
      
ret = compile(command);  
//调用compile方法
    
}
    
if 
(ret != 
0
) {
      
try 
{
        
releaseLocks( ctx.getHiveLocks());
      
catch 
(LockException e) {
        
LOG.warn(
"Exception in releasing locks. "
            
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
      
}
    
}
    
return 
ret;
  
}

 调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解

9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等

(这里信息也比较多,后面单独讲解

本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1571890,如需转载请自行联系原作者

你可能感兴趣的文章
Docker 自定义SSH服务镜像
查看>>
JavaScript强化教程 —— Cocos2d-JS自动JSB绑定规则修改
查看>>
configure: error: in `/root/httpd-2.2.11/srclib/apr': c
查看>>
CentOS7搭建Kubernetes-dashboard管理服务
查看>>
buildroot下查找外部编译器通过ext-toolchain-wrapper调用的参数
查看>>
MySQL Replication 主主配置详细说明
查看>>
Linux的任务调度
查看>>
在Android studio中添加jar包方法如下
查看>>
iframe 在ie下面总是弹出新窗口解决方法
查看>>
分享10款漂亮实用的CSS3按钮
查看>>
安装nginx 常见错误及 解决方法
查看>>
Gorun8电子商城
查看>>
在之前链表的基础上改良的链表
查看>>
android编译系统makefile(Android.mk)写法
查看>>
MD5源代码C++
查看>>
Eclipse 添加 Ibator
查看>>
Linux中变量$#,$@,$0,$1,$2,$*,$$,$?的含义
查看>>
Python编程语言
查看>>
十四、转到 linux
查看>>
Got error 241 'Invalid schema
查看>>