博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hive metastore源码阅读(二)
阅读量:6340 次
发布时间:2019-06-22

本文共 12099 字,大约阅读时间需要 40 分钟。

  最近随着项目的深入,发现hive meta有些弊端,就是你会发现它的元数据操作与操作物理集群的代码耦合在一起,非常不利于扩展。比如:在create_table的时候同时进行路径校验及创建,如下代码:

1   if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) { 2           if (tbl.getSd().getLocation() == null 3               || tbl.getSd().getLocation().isEmpty()) { 4             tblPath = wh.getTablePath( 5                 ms.getDatabase(tbl.getDbName()), tbl.getTableName()); 6           } else { 7             if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) { 8               LOG.warn("Location: " + tbl.getSd().getLocation() 9                   + " specified for non-external table:" + tbl.getTableName());10             }11             tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));12           }13           tbl.getSd().setLocation(tblPath.toString());14         }15 16         if (tblPath != null) {17           if (!wh.isDir(tblPath)) {18             if (!wh.mkdirs(tblPath, true)) {19               throw new MetaException(tblPath20                   + " is not a directory or unable to create one");21             }22             madeDir = true;23           }

   所以这是meta无法统一所有元数据的原因么。。其实hive metastore的代码从大的来看,就好比元数据的增删改查,从上次梳理中我们看到,在创建HiveMetaStore的init方法中,同时创建了三种Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用于对每一步事件的监听与记录。同时呢,它还new出了WareHouse,用以进行物理操作。

  

1     public void init() throws MetaException { 2       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL); 3       initListeners = MetaStoreUtils.getMetaStoreListeners( 4           MetaStoreInitListener.class, hiveConf, 5           hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS)); 6       for (MetaStoreInitListener singleInitListener: initListeners) { 7           MetaStoreInitContext context = new MetaStoreInitContext(); 8           singleInitListener.onInit(context); 9       }10 11       String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",12           HiveAlterHandler.class.getName());13       alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(14           alterHandlerName), hiveConf);15       wh = new Warehouse(hiveConf);16         。。。。17     }

 接下来,我们从元数据的生命周期开始,学习下Partiiton的生命周期。在HiveMetaStoreClient中,查找add_partition作为入口,这种操作在我们insert overwrite 以表中某个字段为分区时,比如dt=20170830,作用到的操作。或者是add_partitions,创建分区表后进行数据的导入,那么会创建多个分区路径,下面以add_partiitons为例:

1   public int add_partitions(List
new_parts) 2 throws InvalidObjectException, AlreadyExistsException, MetaException, 3 TException { 4 return client.add_partitions(new_parts); 5 } 6 7 @Override 8 public List
add_partitions( 9 List
parts, boolean ifNotExists, boolean needResults)10 throws InvalidObjectException, AlreadyExistsException, MetaException, TException {11 if (parts.isEmpty()) {12 return needResults ? new ArrayList
() : null;13 }14 Partition part = parts.get(0);15 AddPartitionsRequest req = new AddPartitionsRequest(16 part.getDbName(), part.getTableName(), parts, ifNotExists);17 req.setNeedResult(needResults);18 AddPartitionsResult result = client.add_partitions_req(req);19 return needResults ? filterHook.filterPartitions(result.getPartitions()) : null;20 }

  这里的client来自于ThriftHiveMetastore.Iface接口对象,其实现子类HiveMetaStore并调用init方法进行创建。随后将封装了AddPartitionsRequest类,其实这个类还是partition的属性,但是这样封装的好处是,今后再调用的时候不用再去获取partition的DbName,,TableName等信息,一次性封装以便后续直接使用该对象。随后,我们查看client调用add_partitions_req,下面代码高能预警,非常多,我们一点点分析。

  

1    private List
add_partitions_core( 2 RawStore ms, String dbName, String tblName, List
parts, boolean ifNotExists) 3 throws MetaException, InvalidObjectException, AlreadyExistsException, TException { 4 logInfo("add_partitions"); 5 boolean success = false; 6 // Ensures that the list doesn't have dups, and keeps track of directories we have created. 7 Map
addedPartitions = new HashMap
(); 8 List
result = new ArrayList
(); 9 List
existingParts = null;10 Table tbl = null;11 try {12 ms.openTransaction();13 tbl = ms.getTable(dbName, tblName);14 if (tbl == null) {15 throw new InvalidObjectException("Unable to add partitions because "16 + "database or table " + dbName + "." + tblName + " does not exist");17 }18 19 if (!parts.isEmpty()) {20 firePreEvent(new PreAddPartitionEvent(tbl, parts, this));21 }22 23 for (Partition part : parts) {24 if (!part.getTableName().equals(tblName) || !part.getDbName().equals(dbName)) {25 throw new MetaException("Partition does not belong to target table "26 + dbName + "." + tblName + ": " + part);27 }28 boolean shouldAdd = startAddPartition(ms, part, ifNotExists);29 if (!shouldAdd) {30 if (existingParts == null) {31 existingParts = new ArrayList
();32 }33 existingParts.add(part);34 LOG.info("Not adding partition " + part + " as it already exists");35 continue;36 }37 boolean madeDir = createLocationForAddedPartition(tbl, part);38 if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {39 // Technically, for ifNotExists case, we could insert one and discard the other40 // because the first one now "exists", but it seems better to report the problem41 // upstream as such a command doesn't make sense.42 throw new MetaException("Duplicate partitions in the list: " + part);43 }44 initializeAddedPartition(tbl, part, madeDir);45 result.add(part);46 }47 if (!result.isEmpty()) {48 success = ms.addPartitions(dbName, tblName, result);49 } else {50 success = true;51 }52 success = success && ms.commitTransaction();53 } finally {54 if (!success) {55 ms.rollbackTransaction();56 for (Entry
e : addedPartitions.entrySet()) {57 if (e.getValue()) {58 wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);59 // we just created this directory - it's not a case of pre-creation, so we nuke60 }61 }62 fireMetaStoreAddPartitionEvent(tbl, parts, null, false);63 } else {64 fireMetaStoreAddPartitionEvent(tbl, result, null, true);65 if (existingParts != null) {66 // The request has succeeded but we failed to add these partitions.67 fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);68 }69 }70 }71 return result;72 }
View Code

  首先呢

  1、ms.openTransaction(),这个上次已经提到过,是为了保证操作的原子性。随后 tbl = ms.getTable(dbName, tblName);

  2、通过dbName以及tableName获取正个Table对象。

  3、通过firePreEvent记录事件。

  4、开始循环遍历partiiton,通过startAddPartition方法校验该partition是否在元数据中存在

  5、调用createLocationForAddedPartition方法进行文件路径创建,随后调用initializeAddedPartition,主要是将table的param信息赋给partition,与hive的表结构有关,最终会将param扩展信息写入类似meta_partition_param的扩展信息表。

  6、待物理操作完毕之后,进行ms.addPartitions(dbName, tblName, result)元数据信息的meta录入。

  7、如果说partition的路径已经存在,则抛出异常,并且在最后删除已经创建的路径。这个有一次,请看上面,首先创建了一个Map,

Map<PartValEqWrapper, Boolean> addedPartitions = new HashMap<PartValEqWrapper, Boolean>();将partition对象作为key,mkdir成功失败的布尔值作为value,最终通过判断value的值,来删除创建成功的partition.

  删除,和查询就不说了,因为太过简单,那么alter_partition来了,client.alter_partition(dbName, tblName, newPart);从client端调用我也不说了~,传入dbName,tbleName以及新的partition,随之在hivemetaStore中调用了rename_partition方法:

  

@Override    public void rename_partition(final String db_name, final String tbl_name,        final List
part_vals, final Partition new_part) throws InvalidOperationException, MetaException, TException { // Call rename_partition without an environment context. rename_partition(db_name, tbl_name, part_vals, new_part, null); } private void rename_partition(final String db_name, final String tbl_name, final List
part_vals, final Partition new_part, final EnvironmentContext envContext) throws InvalidOperationException, MetaException, TException { startTableFunction("alter_partition", db_name, tbl_name); if (LOG.isInfoEnabled()) { LOG.info("New partition values:" + new_part.getValues()); if (part_vals != null && part_vals.size() > 0) { LOG.info("Old Partition values:" + part_vals); } } Partition oldPart = null; Exception ex = null; try { firePreEvent(new PreAlterPartitionEvent(db_name, tbl_name, part_vals, new_part, this)); if (part_vals != null && !part_vals.isEmpty()) { MetaStoreUtils.validatePartitionNameCharacters(new_part.getValues(), partitionValidationPattern); } oldPart = alterHandler.alterPartition(getMS(), wh, db_name, tbl_name, part_vals, new_part); // Only fetch the table if we actually have a listener Table table = null; for (MetaStoreEventListener listener : listeners) { if (table == null) { table = getMS().getTable(db_name, tbl_name); } AlterPartitionEvent alterPartitionEvent = new AlterPartitionEvent(oldPart, new_part, table, true, this); alterPartitionEvent.setEnvironmentContext(envContext); listener.onAlterPartition(alterPartitionEvent); } } catch (InvalidObjectException e) { ex = e; throw new InvalidOperationException(e.getMessage()); } catch (AlreadyExistsException e) { ex = e; throw new InvalidOperationException(e.getMessage()); } catch (Exception e) { ex = e; if (e instanceof MetaException) { throw (MetaException) e; } else if (e instanceof InvalidOperationException) { throw (InvalidOperationException) e; } else if (e instanceof TException) { throw (TException) e; } else { throw newMetaException(e); } } finally { endFunction("alter_partition", oldPart != null, ex, tbl_name); } return; }

  我们继续来看:  

  1、startTableFunction方法主要用来计数

  2、new_part.getValues()其实获取的是partition的具体列值信息,比如dt=20170830,那么获取的就是这个20170830

  3、随之通过validatePartitionNameCharacters校验partitionName是否合法。

  4、随后通过alterHandler.alterPartition进行partition的更改,但是为什么要用oldPart命名?已经修改了呀?(疑问)我们跟进去会发现,其调用了updatePartColumnStats方法:

private void updatePartColumnStats(RawStore msdb, String dbName, String tableName,      List
partVals, Partition newPart) throws MetaException, InvalidObjectException { dbName = HiveStringUtils.normalizeIdentifier(dbName); tableName = HiveStringUtils.normalizeIdentifier(tableName); String newDbName = HiveStringUtils.normalizeIdentifier(newPart.getDbName()); String newTableName = HiveStringUtils.normalizeIdentifier(newPart.getTableName()); Table oldTable = msdb.getTable(dbName, tableName); if (oldTable == null) { return; } try { String oldPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), partVals); String newPartName = Warehouse.makePartName(oldTable.getPartitionKeys(), newPart.getValues()); if (!dbName.equals(newDbName) || !tableName.equals(newTableName) || !oldPartName.equals(newPartName)) { msdb.deletePartitionColumnStatistics(dbName, tableName, oldPartName, partVals, null); } else { Partition oldPartition = msdb.getPartition(dbName, tableName, partVals); if (oldPartition == null) { return; } if (oldPartition.getSd() != null && newPart.getSd() != null) { List
oldCols = oldPartition.getSd().getCols(); if (!MetaStoreUtils.areSameColumns(oldCols, newPart.getSd().getCols())) { updatePartColumnStatsForAlterColumns(msdb, oldPartition, oldPartName, partVals, oldCols, newPart); } } } } catch (NoSuchObjectException nsoe) { LOG.debug("Could not find db entry." + nsoe); //ignore } catch (InvalidInputException iie) { throw new InvalidObjectException("Invalid input to update partition column stats." + iie); } }

  5、通过Warehouse.makePartName组装partition的原有和新的表达,比如:dt=20180830,新的为dataPart=20180830

  6、这里会有层判断,如果新的表达与旧的表达不同则删除原有meta信息,否则将会调用updatePartColumnStatsForAlterColumns进行meta元数据的更新。

  随后就木有了。。太晚了,碎觉啦,明天还要作死上班呢哈哈哈哈~

转载地址:http://iqhoa.baihongyu.com/

你可能感兴趣的文章
iOS开发之SceneKit框架--加载多个模型.dae/.scn文件
查看>>
iOS中block的探究
查看>>
Linux程序编写shell script的格式
查看>>
kali 执行apt-get upgrade后,终端无法打开的解决办法
查看>>
减低页面加载时间的方法
查看>>
BZOJ 2115: [Wc2011] Xor
查看>>
3054 高精度练习-文件操作
查看>>
[转]POI大数据量Excel解决方案
查看>>
python 启航
查看>>
5. RAMN备份与恢复
查看>>
(转)android:gravity和android:layout_gravity区别
查看>>
数学定理可以这样证明
查看>>
【转】【支付 . 技术控】最全最强解析:支付宝系统架构内部剖析(架构图)...
查看>>
MVC2 Area实现网站多级目录
查看>>
nacos作为配置中心
查看>>
OSX.PackageManager-Homebrew
查看>>
LeetCode:Merge k Sorted Lists
查看>>
20171017数据处理sql
查看>>
fiddler抓包HTTPS配置及代理设置
查看>>
for循环枚举法,全排列+dfs,补充浮点数注意事项
查看>>