zookeeper源码分析之一服务端启动过程( 三 )


1. 解析配置文件,默认的配置文件为上一级目录
config/zookeeper.properties或者config/zookeeper.cfg /** * Parse a ZooKeeper configuration file * @param path the patch of the configuration file * @throws ConfigException error processing configuration */ public void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path);try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path);Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); }parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); }if (dynamicConfigFileStr!=null) { try {Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr); try { dynamicCfg.load(inConfig); if (dynamicCfg.getProperty("version") != null) { throw new ConfigException("dynamic file shouldn't have version inside"); } String version = getVersionFromFilename(dynamicConfigFileStr); // If there isn't any version associated with the filename, // the default version is 0. if (version != null) { dynamicCfg.setProperty("version", version); } } finally { inConfig.close(); } setupQuorumPeerConfig(dynamicCfg, false); } catch (IOException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); }File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix); if (nextDynamicConfigFile.exists()) { try {Properties dynamicConfigNextCfg = new Properties(); FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);try { dynamicConfigNextCfg.load(inConfigNext); } finally { inConfigNext.close(); } boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) { String key = entry.getKey().toString().trim();if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; break; } } lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical); } catch (IOException e) { LOG.warn("NextQuorumVerifier is initiated to null"); } } } }2. 启动安排清除任务
// Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();调用start方法:
/** * Validates the purge configuration and schedules the purge task. Purge * task keeps the most recent <code>snapRetainCount</code> number of * snapshots and deletes the remaining for every <code>purgeInterval</code> * hour(s). * <p> * <code>purgeInterval</code> of <code>0</code> or * <code>negative integer</code> will not schedule the purge task. * </p> ** @see PurgeTxnLog#purge(File, File, int) */ public void start() { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }从上面代码可以看到,清除工作启动了一个定时器timer,PurgeTask继承实现了TimeTask(一个可以被定时器安排执行一次或者多次的task),PurgeTask的实现如下:
static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run() { LOG.info("Purge task started."); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occured while purging.", e); } LOG.info("Purge task completed."); } }调用purge方法:
/** * Purges the snapshot and logs keeping the last num snapshots and the * corresponding logs. If logs are rolling or a new snapshot is created * during this process, these newest N snapshots or any data logs will be * excluded from current purging cycle. * * @param dataDir the dir that has the logs * @param snapDir the dir that has the snapshots * @param num the number of snapshots to keep * @throws IOException */ public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); retainNRecentSnapshots(txnLog, snaps); }


推荐阅读