HA

HA

解决的问题:hdfs nn单点故障

主备切换过程

AutomaticFailover

Automated Failover 当active namenode崩溃的时候,自动将standby namenode切换成active namenode。

Ann:active nn,Snn:standby nn

服务的启停原因,所有牵扯情况

过程简述:

zkfc

  1. zkfc进程,监控

    1. monitor dohealthcheck()监控周期是1s一次。监控到异常的时候,会enterState(),变更heal monitor的状态
    2. 当需要failover。gracefulFailover()
      1. standy zkfc执行failover,
      2. 让old active 退出election,doCedeActive()
        1. elector.quitElection() 与zookeeper交互,删除对应的znode
      3. 等待election将本地node变成active
        1. election的逻辑
      4. 让old active 重新加入election
  2. nn切换状态

    1. 内部状态转移 setStateInternal()

      1. prepareToExitState()

        1. standby 停止当前正在建的checkpoint线程,并组织新的生成
      2. exitState()

        1. stopActiveServices

          1. 停以下线程

            1. stopSecretmanager

            2. 中断leasmonitor线程

              leasmonitor是周期性根据时间戳更新租约的,默认2s

            3. 中断nnEditLogRoller线程

              获得editlog的transactions。

            4. 中断LazyPersistFileScrubbe线程

              周期性检查lazyPersist文件。删除丢失的块在命名空间的信息

          2. 关闭editlog,更新fsimage

          3. 清空集中式缓存的信息

          4. 清理blockmanager持有的needpending的一些命令,standby不许维护块的副本情况

            1. 清除blockmanager维护的需要datanode,cache或者uncache的blocks信息

            2. 停止告诉datanode哪些block需要cache

            3. 清除,保存有一些决定的队列信息

              1
              2
              3
              4
              5
              6
              7
              8
              9
              10
              11
              12
              13
              >                     /**
              > * Clear all queues that hold decisions previously made by
              > * this NameNode.
              > */
              > public void clearQueues() {
              > neededReplications.clear();
              > pendingReplications.clear();
              > excessReplicateMap.clear();
              > invalidateBlocks.clear();
              > datanodeManager.clearPendingQueues();
              > postponedMisreplicatedBlocks.clear();
              > };
              >

>

            ​

   2. stop standby service

      1. 停止editLogTailer线程
      2. 停止checkpoint线程

3. enterState()根据状态启动服务

   1. active 启动服务
      1. editlog初始化并recoverjn上的log
         - 生成新的epoch,createNewUniqueEpoch()
         - recover in-progess的log
         - 更新editlog,同步log入jni
      2. 初始化quota,并进行check
      3. 启动NameNodeResourceMonitor监控线程
      4. 启动NameNodeEditLogRoller线程获取transactions
   2. standby
      1. disable quota check
      2. 启动checkpoint、editLogTailer线程
  1. client选择Ann

  2. Snn同步Ann数据

详细过程:

1.Ann的状态监控
  • 监控线程的启动调用过程

    ZKFailoverCtroller.run()

    ZKFailoverCtroller. doRun()

    ZKFailoverController.initHM()

    HealthMonitor.start();

    MonitorDaemon.start()

    MonitorDaemon.run();

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //MonitorDaemon
    public void run() {
    while (shouldRun) {
    try {
    //等待连接到service
    loopUntilConnected();
    //监控服务状态,并进行相应处理。nn不可用,抛出异常信息
    //默认1s做一次check
    doHealthChecks();
    } catch (InterruptedException ie) {
    Preconditions.checkState(!shouldRun,
    "Interrupted but still supposed to run");
    }
    }
    }

    doHealthChecks()经过一系列的调用后,会调用NameNode.monitorHealth(),用于监控NameNode可用状态。当NN没有资源可用时,抛出异常。

2.服务不可用,Ann释放锁

active nn释放锁,standby nn zkfc进程获得锁。通过packet与zookeeper通信,根据path删除znode

  1. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {
    HAServiceStatus status = null;
    boolean healthy = false;
    try {
    status = proxy.getServiceStatus();
    proxy.monitorHealth();
    healthy = true;
    //监测到异常
    } catch (Throwable t) {
    if (isHealthCheckFailedException(t)) {
    //SERVICE_UNHEALTHY : The service is running but unhealthy
    enterState(State.SERVICE_UNHEALTHY);
    } else {
    RPC.stopProxy(proxy);
    proxy = null;
    //SERVICE_NOT_RESPONDING : The service is not responding to health check RPCs
    enterState(State.SERVICE_NOT_RESPONDING);
    Thread.sleep(sleepAfterDisconnectMillis);
    return;
    }
    }
    }

    //Callback ??Syn(List<callback>)
    private synchronized void enterState(State newState) {
    //state = State.INITIALIZING
    if (newState != state) {
    state = newState;
    synchronized (callbacks) {
    for (Callback cb : callbacks) {
    cb.enteredState(newState);
    }
    }
    }
    }

    public void enteredState(HealthMonitor.State newState) {
    setLastHealthState(newState);
    //Check the current state of the service, and join the election if it should be in the election.
    recheckElectability();
    }
    }
    -----------------------------------------
    /**
    * This class implements a simple library to perform leader election on top of
    * Apache Zookeeper. Using Zookeeper as a coordination service, leader election
    * can be performed by atomically creating an ephemeral lock file (znode) on
    * Zookeeper. The service instance that successfully creates the znode becomes
    * active and the rest become standbys. <br/>
    * This election mechanism is only efficient for small number of election
    * candidates (order of 10's) because contention on single znode by a large
    * number of candidates can result in Zookeeper overload. <br/>
    * The elector does not guarantee fencing (protection of shared resources) among
    * service instances. After it has notified an instance about becoming a leader,
    * then that instance must ensure that it meets the service consistency
    * requirements. If it cannot do so, then it is recommended to quit the
    * election. The application implements the {@link ActiveStandbyElectorCallback}
    * to interact with the elector
    */
    ActiveStandbyElector.java
    用来自动替换znode。适合候选人不是很多的情况
    ----------------------------------------

    ActiveStandbyElector. quitElection(true);

    // If active is gracefully going back to standby mode, remove our permanent znode so no one fences us.
    /**
    * Try to delete the "ActiveBreadCrumb" node when gracefully giving up
    * active status.
    * If this fails, it will simply warn, since the graceful release behavior
    * is only an optimization.
    */
    ActiveStandbyElector.tryDeleteOwnBreadCrumbNode()
    //zkBreadCrumbPath

    //zkClient.delete(path, version) 通过发送packet根据路径删除zookeeper上的znode

之后,Active NN上的ZKFC会失去ZookeeperService上的Active NN锁。而Standby NN上的ZKFC一直在尝试获取该锁,此时,Standby NN上的ZKFC就获得了该锁,当Standby NN上的ZKFC获取Active NN锁的时候,会将NN切换成Active。

3.nn切换状态

过程

  1. 状态转移

    1. enterState()根据状态启动服务

      1. active

        • 获取mostRecentSegmentTxId,并+1赋予新最大值,防止原先的Ann往jni中写log

          防止原先的Ann往jni中写log,有拥有这个mostRecentSegmentTxId的NameNode才可以往Journal Node写数据

        • recover in-progress logs。并同步所有jn的log

          原来Active NN写EditLog过程中发生了主从切换,那么处在不同jn上的EditLog的数据可能不一致,需要把不同JournalNode上的EditLog同步一致,并且finalized。

        • startLogSegment。让切换成Active的NN拥有写日志功能。

      2. standby

1.相关方法源码

setStateInternal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Internal method to move from the existing state to a new state.
* @param context HA context
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
protected final void setStateInternal(final HAContext context, final HAState s)
throws ServiceFailedException {
prepareToExitState(context);
s.prepareToEnterState(context);
context.writeLock();
try {
exitState(context);
context.setState(s);
s.enterState(context);
s.updateLastHATransitionTime();
} finally {
context.writeUnlock();
}
}

prepareToStopStandbyServices()

1
2
3
4
5
6
void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) {
standbyCheckpointer.cancelAndPreventCheckpoints(
"About to leave standby state");
}
}

stopActiveServices()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void stopActiveServices() {
LOG.info("Stopping services started for active state");
writeLock();
try {
stopSecretManager();
leaseManager.stopMonitor();
if (nnrmthread != null) {
((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
nnrmthread.interrupt();
}
if (edekCacheLoader != null) {
edekCacheLoader.shutdownNow();
}
if (nnEditLogRoller != null) {
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
nnEditLogRoller.interrupt();
}
if (lazyPersistFileScrubber != null) {
((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
lazyPersistFileScrubber.interrupt();
}
if (dir != null && getFSImage() != null) {
if (getFSImage().editLog != null) {
getFSImage().editLog.close();
}
// Update the fsimage with the last txid that we wrote
// so that the tailer starts from the right spot.
getFSImage().updateLastAppliedTxIdFromWritten();
}
if (cacheManager != null) {
cacheManager.stopMonitorThread();
cacheManager.clearDirectiveStats();
}
if (blockManager != null) {
blockManager.getDatanodeManager().clearPendingCachingCommands();
blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
}
} finally {
writeUnlock("stopActiveServices");
}
}

stopStandbyServices()

1
2
3
4
5
6
7
8
9
10
11
12
13
/** Stop services required in standby state */
void stopStandbyServices() throws IOException {
LOG.info("Stopping services started for standby state");
if (standbyCheckpointer != null) {
standbyCheckpointer.stop();
}
if (editLogTailer != null) {
editLogTailer.stop();
}
if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
getFSImage().editLog.close();
}
}

startActiveServices()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* Start services required in active state
* @throws IOException
*/
void startActiveServices() throws IOException {
startingActiveService = true;
LOG.info("Starting services required for active state");
writeLock();
try {
FSEditLog editLog = getFSImage().getEditLog();

if (!editLog.isOpenForWrite()) {
// During startup, we're already open for write during initialization.
editLog.initJournalsForWrite();
// May need to recover
editLog.recoverUnclosedStreams();

LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs");
editLogTailer.catchupDuringFailover();

blockManager.setPostponeBlocksFromFuture(false);
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();

// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {
LOG.info("Reprocessing replication and invalidation queues");
blockManager.initializeReplQueues();
}

if (LOG.isDebugEnabled()) {
LOG.debug("NameNode metadata after re-processing " +
"replication and invalidation queues during failover:\n" +
metaSaveAsString());
}

long nextTxId = getFSImage().getLastAppliedTxId() + 1;
LOG.info("Will take over writing edit logs at txnid " +
nextTxId);
editLog.setNextTxId(nextTxId);

getFSImage().editLog.openForWrite(getEffectiveLayoutVersion());
}

// Initialize the quota.
dir.updateCountForQuota();
// Enable quota checks.
dir.enableQuotaChecks();
if (haEnabled) {
// Renew all of the leases before becoming active.
// This is because, while we were in standby mode,
// the leases weren't getting renewed on this NN.
// Give them all a fresh start here.
leaseManager.renewAllLeases();
}
leaseManager.startMonitor();
startSecretManagerIfNecessary();

//ResourceMonitor required only at ActiveNN. See HDFS-2914
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();

nnEditLogRoller = new Daemon(new NameNodeEditLogRoller(
editLogRollerThreshold, editLogRollerInterval));
nnEditLogRoller.start();

if (lazyPersistFileScrubIntervalSec > 0) {
lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
lazyPersistFileScrubIntervalSec));
lazyPersistFileScrubber.start();
} else {
LOG.warn("Lazy persist file scrubber is disabled,"
+ " configured scrub interval is zero.");
}

cacheManager.startMonitorThread();
blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
if (provider != null) {
edekCacheLoader = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Warm Up EDEK Cache Thread #%d")
.build());
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
} finally {
startingActiveService = false;
checkSafeMode();
writeUnlock("startActiveServices");
}
}

startStandbyServices()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Start services required in standby state
*
* @throws IOException
*/
void startStandbyServices(final Configuration conf) throws IOException {
LOG.info("Starting services required for standby state");
if (!getFSImage().editLog.isOpenForRead()) {
// During startup, we're already open for read.
getFSImage().editLog.initSharedJournalsForRead();
}

blockManager.setPostponeBlocksFromFuture(true);

// Disable quota checks while in standby.
dir.disableQuotaChecks();
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
if (standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start();
}
}
2.生成新的epoch

保证原来的Active NN已经不再往qjournal上写数据了的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void recoverUnfinalizedSegments() throws IOException {
Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
//创建epoch的信息
}

/**
* Fence any previous writers, and obtain a unique epoch number
* for write-access to the journal nodes.
*
* @return the new, unique epoch number
*/
Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch(){
}

​ Epoch解决了我们所说的问题,StandbyNN向每个JournalNode发送getJournalState RPC请求,JN返回自己的lastPromisedEpoch。QuorumJournalManager收到大多数JN返回的lastPromisedEpoch,在其中选择最大的一个,然后加1作为当前QJM的epoch,同时通过发送newEpoch RPC把这个新的epoch写到qjournal上。因为在这之后每次QuorumJournalManager在向qjournal执行写相关操作(startLogSegment(),logEdits(), finalizedLogSegment()等)的时候,都要把自己的epoch作为参数传递过去,写相关操作到达每个JournalNode端会比较如果传过来的epoch如果小于JournalNode端存储的lastPromisedEpoch,那么这次写相关操作会被拒绝。如果大多数JournalNode都拒绝了这次写相关操作,这次操作就失败了。回到我们目前的逻辑中,在主从切换时,原来的Standby NN把epoch+1了之后,原来的Active NN的epoch就肯定比这个小了,那么如果它再向qjournal写日志就会被拒绝。因为qjournal不接收比lastPromisedEpoch小的QJM写日志。

​ JN收到newEpoch RPC之后:JN检查来自QJM的这个epoch和自己存储的lastPromisedEpoch:如果来自writer的epoch小于lastPromisedEpoch,那么说明不允许这个writer向JNs写数据了,抛出异常,writer端收到异常response,那么达不到大多数的successresponse,就不会有写qjournal的权限了。(其实这个过程就是Paxos算法里面选主的过程)。

3.recover in-progress logs

接着上面的代码,Standby已经通过createNewUniqueEpoch()来fencing原来的Active,这个RPC请求除了会返回epoch,还会返回最后一个logsegment的txid。因为只有最后一个log segment可能需要恢复。这个recover算法就是Paxos算法的一个实例(instance),目的是使得分布在不同JN上的log segment的数据达成一致

​ 接下来就开始recoverUnclosedSegment()恢复算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    /**
* Run recovery/synchronization for a specific segment.
* Postconditions:
* <ul>
* <li>This segment will be finalized on a majority
* of nodes.</li>
* <li>All nodes which contain the finalized segment will
* agree on the length.</li>
* </ul>
*
* @param segmentTxId the starting txid of the segment
* @throws IOException
*/
private void recoverUnclosedSegment(long segmentTxId) throws IOException {
Preconditions.checkArgument(segmentTxId > 0);
LOG.info("Beginning recovery of unclosed segment starting at txid " +
segmentTxId);

// Step 1. Prepare recovery
//QJM向JNs问segmentTxId对应的segment的长度和finalized/in-progress状况;JNs返回这些信息。(对应Paxos算法的Phase 1a和Phase 1b)
QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
loggers.prepareRecovery(segmentTxId);
Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
"prepareRecovery(" + segmentTxId + ")");
LOG.info("Recovery prepare phase complete. Responses:\n" +
QuorumCall.mapToString(prepareResponses));

//在每个JN的返回信息中通过SegmentRecoveryComparator比较,选择其中最好的一个log segment作为后面同步log的标准。选择的策略在 SegmentRecoveryComparator中。

// Determine the logger who either:
// a) Has already accepted a previous proposal that's higher than any
// other
//
// OR, if no such logger exists:
//
// b) Has the longest log starting at this transaction ID

// TODO: we should collect any "ties" and pass the URL for all of them
// when syncing, so we can tolerate failure during recovery better.
Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
prepareResponses.entrySet(), SegmentRecoveryComparator.INSTANCE);
AsyncLogger bestLogger = bestEntry.getKey();
PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();

// Log the above decision, check invariants.
if (bestResponse.hasAcceptedInEpoch()) {
LOG.info("Using already-accepted recovery for segment " +
"starting at txid " + segmentTxId + ": " +
bestEntry);
} else if (bestResponse.hasSegmentState()) {
LOG.info("Using longest log: " + bestEntry);
} else {
// None of the responses to prepareRecovery() had a segment at the given
// txid. This can happen for example in the following situation:
// - 3 JNs: JN1, JN2, JN3
// - writer starts segment 101 on JN1, then crashes before
// writing to JN2 and JN3
// - during newEpoch(), we saw the segment on JN1 and decide to
// recover segment 101
// - before prepare(), JN1 crashes, and we only talk to JN2 and JN3,
// neither of which has any entry for this log.
// In this case, it is allowed to do nothing for recovery, since the
// segment wasn't started on a quorum of nodes.

// Sanity check: we should only get here if none of the responses had
// a log. This should be a postcondition of the recovery comparator,
// but a bug in the comparator might cause us to get here.
for (PrepareRecoveryResponseProto resp : prepareResponses.values()) {
assert !resp.hasSegmentState() :
"One of the loggers had a response, but no best logger " +
"was found.";
}

LOG.info("None of the responders had a log to recover: " +
QuorumCall.mapToString(prepareResponses));
return;
}

SegmentStateProto logToSync = bestResponse.getSegmentState();
assert segmentTxId == logToSync.getStartTxId();

// Sanity check: none of the loggers should be aware of a higher
// txid than the txid we intend to truncate to
for (Map.Entry<AsyncLogger, PrepareRecoveryResponseProto> e :
prepareResponses.entrySet()) {
AsyncLogger logger = e.getKey();
PrepareRecoveryResponseProto resp = e.getValue();

if (resp.hasLastCommittedTxId() &&
resp.getLastCommittedTxId() > logToSync.getEndTxId()) {
throw new AssertionError("Decided to synchronize log to " + logToSync +
" but logger " + logger + " had seen txid " +
resp.getLastCommittedTxId() + " committed");
}
}

//向JNs发送acceptRecovery RPC请求(对应Paxos算法的Phase 2a)
//JN收到这个acceptRecovery RPC之后,使自己的log与syncFromUrl同步,并持久化这个logsegment和epoch
//如果收到大多数的JNs的success response,那么这个同步操作成功。(对应Paxos算法的Phase 2b)

URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);

//根据最优的log同步所有jn
QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
"acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");

// If one of the loggers above missed the synchronization step above, but
// we send a finalize() here, that's OK. It validates the log before
// finalizing. Hence, even if it is not "in sync", it won't incorrectly
// finalize.
QuorumCall<AsyncLogger, Void> finalize =
loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
String.format("finalizeLogSegment(%s-%s)",
logToSync.getStartTxId(),
logToSync.getEndTxId()));
}

​ 经历了上面的QourumJournalManager.recoverUnfinalizedSegment()过程,不完整的logsegment都是完整的了,接下来就是调用EditLogTailer.doTailEdits(),原来Standby NN先去和原来ActiveNN同步EditLog,然后把EditLog执行,这时两台NN内存数据才真正一致。这里会调用QuorumJournalManager.selectInputStreams()从JNs中读取 EditLog。而且目前HDFS中只有finalizededit log才能被Standby NN读取并执行。在Standby NN从JNs读取EditLog时,首先向所有的JN节点发送getEditLogManifest() RPC去读取大于某一txid并且已经finalizededit log segment,收到大多数返回success,则把这些log segment整合成一个RedundantEditLogInputStream,然后Standby NN只要向其中的一台JN读取数据就行了。

至此原来的Standby NN的工作就结束了,那么它就正式变成了Active NN,接下来就是正常的记录日志的工作了。

4. startLogSegment

​ 也是初始化QuorumOutputStream的过程。

​ QJM向JNs发送startLogSegmentRPC调用,如果收到多数success response则成功,用这个AsynaLogSet构造QuorumOutputStream用于写log。

active nn更新edit_log

  1. 初始化QuorumOutputStream

在ActiveState.enterState()阶段已经完成

  1. 更新EditLog

    通过下面的调用把Log写到QuorumOutputStream的doublebuffer里面。由QuorumOutputStream实现更新。

  2. 同步Log

    flushAndSync()通过AsyncLoggerSet.sendEdits()调用Journal RPC把对应的日志写到JNs,同样是大多数successresponse即认为成功。如果大多数返回失败的话,这次logSync操作失败,那么NameNode会abort,因为没法正常写日志了。

4.client选择Active nn

帮助Client选择哪个节点是主节点

在failover的时候先尝试connect第一个url

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* A FailoverProxyProvider implementation which allows one to configure two URIs
* to connect to during fail-over. The first configured address is tried first,
* and on a fail-over event the other address is tried.
*/
public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
/**
* Lazily initialize the RPC proxy object.
*/
@Override
public synchronized ProxyInfo<T> getProxy() {
AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
if (current.namenode == null) {
try {
current.namenode = factory.createProxy(conf,
current.address, xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException e) {
LOG.error("Failed to create RPC proxy to NameNode", e);
throw new RuntimeException(e);
}
}
return new ProxyInfo<T>(current.namenode, current.address.toString());
}
}
5.Snn启动时同步Ann元数据

Active NN启动后,Standby NN可以通过这两个脚本启动

1
2
3
> >  bin/hdfs namenode -bootstrapStandby
> > sbin/hadoop-daemon.sh start namenode
> >

第一个脚本用于初始化StandbyNN,其功能如下:

  1. 和nn1通信,获取namespace metadata和checkpointedfsimage;
  2. 从JN中获取EditLog

    但是脚本会在下列情况下失效:JN没有初始化成功,不能提供EditLog。

    注意:FSImage中封装了EditLog,HA中EditLog的存储空间在JN中。

1
2
3
4
5
6
7
8
9
10
11
/**
* Thread which runs inside the NN when it's in Standby state,
* periodically waking up to take a checkpoint of the namespace.
* When it takes a checkpoint, it saves it to its local
* storage and then uploads it to the remote NameNode.
*/
public class StandbyCheckpointer {
//checkpointThread
}

// 当检测到更新的时候,会将EditLog更新下载到本地同时进行合并成FSImage,并将最新的FSImage增量更新到Active NN上。

×

纯属好玩

扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

文章目录
  1. 1. HA
    1. 1.1. 主备切换过程
      1. 1.1.1. AutomaticFailover
        1. 1.1.1.1. 过程简述:
        2. 1.1.1.2. 详细过程:
          1. 1.1.1.2.1. 1.Ann的状态监控
          2. 1.1.1.2.2. 2.服务不可用,Ann释放锁
          3. 1.1.1.2.3. 3.nn切换状态
            1. 1.1.1.2.3.1. 1.相关方法源码
            2. 1.1.1.2.3.2. 2.生成新的epoch
            3. 1.1.1.2.3.3. 3.recover in-progress logs
            4. 1.1.1.2.3.4. 4. startLogSegment
          4. 1.1.1.2.4. 4.client选择Active nn
          5. 1.1.1.2.5. 5.Snn启动时同步Ann元数据
,