一.DataStorage
DataStorage继承Storage。
DataStorage对应Hdfs的dfs.data.dir目录。
DataNode启动时,检查恢复状态转换dfs.data.dir下所有的StorageDirectory。
二.DataStorage类图
三.详细描述
org.apache.hadoop.hdfs.server.protocol.NamespaceInfo:NamespaceInfo is returned by the name-node in reply to a data-node handshake.当我们配置完整个Hadoop系统时,做的第一件事就是
$bin/hadoop namenode -format
format后的NameNode产生一个namespaceId。当DataNode启动时,会向NameNode handshake,NameNode返回这个namespaceId。
NamespaceInfo nsInfo = handshake();
DataStorage:
参考代码中的步骤阅读。状态分析-----恢复操作-----状态转换
关于“一致性”的说明:参考http://zy19982004.iyunv.com/blog/1876706第三张图,/update /rollback /finalize指令后,StorageDirectory下会存在不同的文件夹,当整个文件夹是某个中间状态时,就是“不一致”。举例:/update开始后,“一致”状态指的是current和previous都存在,也就是/update的123步都完成了;如果123中的中间某一步发生了问题或者还没有进行,造成了previous.tmp存在current不存在的情况,就是“不一致”,造成了previous.tmp存在current存在也是不一致,都需要doRecover。
四.DataStorage源码时序图
五.DataStorage源码
package org.apache.hadoop.hdfs.server.datanode;
public class DataStorage extends Storage {
//DataStorag独特属性
private String storageID;
DataStorage() {
super(NodeType.DATA_NODE);
storageID = "";
}
// DataNode.start时触发此方法
void recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs,
StartupOption startOpt) throws IOException {
// 1.在doTransition之前首先对每个SorageDirectory进行分析,检查其是否处于“一致状态”
// 如果处于“不一致状态“,format or doRecover
this.storageID = "";
this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
// 1.1 调用Storage.analyzeStorage进行状态分析
curState = sd.analyzeStorage(startOpt);
// 1.2 调用format或者Storage.doRecover进行恢复操作
switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
// ignore this storage
it.remove();
continue;
case NOT_FORMATTED: // format
format(sd, nsInfo);
break;
default: // recovery part is common
sd.doRecover(curState);
}
} catch (IOException ioe) {
sd.unlock();
throw ioe;
}
// 1.3 "一致状态"的StorageDirectory才会加入到Storage
addStorageDir(sd);
dataDirStates.add(curState);
}
// 2. doTransition 状态转换
// 根据DataNode启动参数做upgrade or rollback,其它正常启动
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt);
}
// 3. Update 每个 StorageDirectory的VERSION
this.writeAll();
}
void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
sd.clearDirectory(); // create directory
this.layoutVersion = FSConstants.LAYOUT_VERSION;
this.namespaceID = nsInfo.getNamespaceID();
this.cTime = 0;
// store storageID as it currently is
sd.write();
}
//重写,DataNode增加storageID到VERSION
protected void setFields(Properties props, StorageDirectory sd)
throws IOException {
super.setFields(props, sd);
props.setProperty("storageID", storageID);
}
//重写,DataNode增加storageID到VERSION
protected void getFields(Properties props, StorageDirectory sd)
throws IOException {
super.getFields(props, sd);
String ssid = props.getProperty("storageID");
storageID = ssid;
}
/**
* Analize which and whether a transition of the fs state is required and
* perform it if necessary. Rollback if previousLV >= LAYOUT_VERSION &&
* prevCTime <= namenode.cTime Upgrade if this.LV > LAYOUT_VERSION ||
* this.cTime < namenode.cTime Regular startup if this.LV = LAYOUT_VERSION
* && this.cTime = namenode.cTime
*/
// 2 doTransition 状态转换
// 根据DataNode启动参数做upgrade or rollback,其它正常启动
private void doTransition( StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt ) throws IOException {
if (startOpt == StartupOption.ROLLBACK)
//2.1 doRollback
doRollback(sd, nsInfo); // rollback if applicable
sd.read();
...
if (this.layoutVersion > FSConstants.LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
//2.2 doUpgrade
doUpgrade(sd, nsInfo); // upgrade
return;
}
}
// 2.1 doRollback
// 1.current-----> removed.tmp
// 2.previous---->current
// 3.删除removed.tmp
void doRollback(StorageDirectory sd, NamespaceInfo nsInfo)
throws IOException {
rename(curDir, tmpDir);
rename(prevDir, curDir);
deleteDir(tmpDir);
}
// 2.2 doUpgrade
// 1.current-----> previous.tmp
// 2.重建current:创建previous.tmp到current的硬链接,写入VERSION
// 3.previous.tmp----->previous
void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo)
throws IOException {
HardLink hardLink = new HardLink();
rename(curDir, tmpDir);
linkBlocks(tmpDir, curDir, this.getLayoutVersion(), hardLink);
sd.write();
rename(tmpDir, prevDir);
}
void finalizeUpgrade() throws IOException {
for (Iterator<StorageDirectory> it = storageDirs.iterator(); it
.hasNext();) {
doFinalize(it.next());
}
}
// 1.previous---->finalized.tmp
// 2.删除finalized.tmp
void doFinalize(StorageDirectory sd) throws IOException {
rename(prevDir, tmpDir);
new Daemon(new Runnable() {
public void run() {
deleteDir(tmpDir);
}
}).start();
}
}
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com