zookeeper源码分析之一服务端启动过程( 五 )

public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); state = State.RUNNING; notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }3.2 集群启动
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } quorumPeer = new QuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }从上述代码可以看出,QuorumPeer的start()方法和join()方法是主流程 。
QuorumPeer继承了ZooKeeperThread,ZooKeeperThread继承自Thread,故QuorumPeer间接继承了Thread 。
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); super.start(); }3.2.1. 启动时先从内存数据库中恢复数据
private void loadDataBase() { try { zkDb.loadDataBase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only hAppen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try { acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version acceptedEpoch = epochOfZxid; LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch); writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); } if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } } catch(IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } }


推荐阅读