博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Elasticsearch 6.1.0 启动过程
阅读量:6079 次
发布时间:2019-06-20

本文共 50191 字,大约阅读时间需要 167 分钟。

开篇

 这篇文章主要目的是想梳理下elasticserach在启动过程中的核心步骤,宏观上讲解清楚elasticsearch启动过程中都做了哪些事情。

 原本想通过流程图来进行画,后来网上有人通过xmind来分析整个过程,发现也能够讲解的非常清楚,因此同样采用xmind来自上而下讲解整个过程。

启动过程图

说明:

  • 1.通过XMind记录ES启动流程的整个过程。
  • 2.阅读顺序从上往下,标红色旗子的部分是核心流程
  • 3.核心流程我概括为:配置加载;Bootstrap 初始化; Bootstrap setup过程;Bootstrap start过程。
  • 4.每个步骤当中细分下去很多逻辑,这里只讲解能够串联整个过程的逻辑。

elasticsearch 启动过程

配置加载过程

Bootstrap 初始化

  • Elasticsearch的一个重要作用是解析命令参数。
  • 执行带 -h 参数的Elasticsearch启动命令。
  • Elasticsearch的构造函数如下所示,跟帮助信息是一致的。
// elasticsearch启动命令帮助     Elasticsearch() {        super("starts elasticsearch", () -> {});         versionOption = parser.acceptsAll(Arrays.asList("V", "version"),            "Prints elasticsearch version information and exits");        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),            "Starts Elasticsearch in the background")            .availableUnless(versionOption);        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),            "Creates a pid file in the specified path on start")            .availableUnless(versionOption)            .withRequiredArg()            .withValuesConvertedBy(new PathConverter());        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),            "Turns off standard output/error streams logging in console")            .availableUnless(versionOption)            .availableUnless(daemonizeOption);    }

Elasticsearch.main过程

// elasticsearch启动入口函数    public static void main(final String[] args) throws Exception {        System.setSecurityManager(new SecurityManager() {            @Override            public void checkPermission(Permission perm) {            }        });        LogConfigurator.registerErrorListener();        final Elasticsearch elasticsearch = new Elasticsearch();        int status = main(args, elasticsearch, Terminal.DEFAULT);        if (status != ExitCodes.OK) {            exit(status);        }    }    // 调用elasticsearch对象的main函数    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {        return elasticsearch.main(args, terminal);    }
  • 1.创建 SecurityManager 安全管理器
  • 2.LogConfigurator.registerErrorListener() 注册侦听器
  • 3.创建Elasticsearch对象
  • 4.进入elasticsearch.main()过程。

elasticsearch.main过程

说明:

  • Elasticsearch继承关系如上图。
  • elasticsearch.main()过程当中会调用EnvironmentAwareCommand、Command等类。
class Elasticsearch extends EnvironmentAwareCommand {    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {        return elasticsearch.main(args, terminal);    }}public abstract class Command implements Closeable {    public final int main(String[] args, Terminal terminal) throws Exception {        if (addShutdownHook()) {            shutdownHookThread = new Thread(() -> {                try {                    this.close();                } catch (final IOException e) {                    try (                        StringWriter sw = new StringWriter();                        PrintWriter pw = new PrintWriter(sw)) {                        e.printStackTrace(pw);                        terminal.println(sw.toString());                    } catch (final IOException impossible) {                    }                }            });            Runtime.getRuntime().addShutdownHook(shutdownHookThread);        }        beforeMain.run();        try {            mainWithoutErrorHandling(args, terminal);        } catch (OptionException e) {            return ExitCodes.USAGE;        } catch (UserException e) {            return e.exitCode;        }        return ExitCodes.OK;    }    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {        final OptionSet options = parser.parse(args);        if (options.has(helpOption)) {            printHelp(terminal);            return;        }        if (options.has(silentOption)) {            terminal.setVerbosity(Terminal.Verbosity.SILENT);        } else if (options.has(verboseOption)) {            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);        } else {            terminal.setVerbosity(Terminal.Verbosity.NORMAL);        }        execute(terminal, options);    }}
  • 5.elasticsearch.main()直接进入Command.main()方法。
  • 6.Command.main()给Runtime类添加一个hook线程,该线程作用是:当Runtime异常关闭时打印异常信息。
  • 7.Command.mainWithoutErrorHandling 方法,根据命令行参数打印或者设置参数,然后执行命令。
  • 8.进入EnvironmentAwareCommand.execute()方法。

EnvironmentAwareCommand.execute过程

public abstract class EnvironmentAwareCommand extends Command {    protected void execute(Terminal terminal, OptionSet options) throws Exception {        final Map
settings = new HashMap<>(); for (final KeyValuePair kvp : settingOption.values(options)) { if (kvp.value.isEmpty()) { throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty"); } if (settings.containsKey(kvp.key)) { final String message = String.format( Locale.ROOT, "setting [%s] already set, saw [%s] and [%s]", kvp.key, settings.get(kvp.key), kvp.value); throw new UserException(ExitCodes.USAGE, message); } settings.put(kvp.key, kvp.value); } putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); execute(terminal, options, createEnv(terminal, settings)); } protected Environment createEnv(final Terminal terminal, final Map
settings) throws UserException { final String esPathConf = System.getProperty("es.path.conf"); if (esPathConf == null) { throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set"); } return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf)); } protected abstract void execute(Terminal terminal, OptionSet options, Environment env) throws Exception;}
  • 9.EnvironmentAwareCommand.execute,确保 es.path.data, es.path.home, es.path.logs 等参数已设置,否则从 System.properties 中读取。
  • 10.createEnv最后返回一个 Environment 对象,包含plugins,bin,lib,modules等目录下的文件信息
  • 11.进入elasticsearch的execute()方法。

prepareEnvironment过程

public class InternalSettingsPreparer {    public static Environment prepareEnvironment(Settings input, Terminal terminal,                                                  Map
properties, Path configPath) { // just create enough settings to build the environment, to get the config dir Settings.Builder output = Settings.builder(); initializeSettings(output, input, properties); Environment environment = new Environment(output.build(), configPath); if (Files.exists(environment.configFile().resolve("elasticsearch.yaml"))) { throw new SettingsException("elasticsearch.yaml was deprecated in 5.5.0 and must be renamed to elasticsearch.yml"); } if (Files.exists(environment.configFile().resolve("elasticsearch.json"))) { throw new SettingsException("elasticsearch.json was deprecated in 5.5.0 and must be converted to elasticsearch.yml"); } output = Settings.builder(); // start with a fresh output Path path = environment.configFile().resolve("elasticsearch.yml"); if (Files.exists(path)) { try { output.loadFromPath(path); } catch (IOException e) { throw new SettingsException("Failed to load settings from " + path.toString(), e); } } // re-initialize settings now that the config file has been loaded initializeSettings(output, input, properties); finalizeSettings(output, terminal); environment = new Environment(output.build(), configPath); // we put back the path.logs so we can use it in the logging configuration file output.put(Environment.PATH_LOGS_SETTING.getKey(), environment.logsFile().toAbsolutePath().normalize().toString()); return new Environment(output.build(), configPath); }}public class Environment { public Environment(final Settings settings, final Path configPath) { final Path homeFile; if (PATH_HOME_SETTING.exists(settings)) { homeFile = PathUtils.get(PATH_HOME_SETTING.get(settings)).normalize(); } else { throw new IllegalStateException(PATH_HOME_SETTING.getKey() + " is not configured"); } if (configPath != null) { configFile = configPath.normalize(); } else { configFile = homeFile.resolve("config"); } pluginsFile = homeFile.resolve("plugins"); List
dataPaths = PATH_DATA_SETTING.get(settings); final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); if (DiscoveryNode.nodeRequiresLocalStorage(settings)) { if (dataPaths.isEmpty() == false) { dataFiles = new Path[dataPaths.size()]; dataWithClusterFiles = new Path[dataPaths.size()]; for (int i = 0; i < dataPaths.size(); i++) { dataFiles[i] = PathUtils.get(dataPaths.get(i)); dataWithClusterFiles[i] = dataFiles[i].resolve(clusterName.value()); } } else { dataFiles = new Path[]{homeFile.resolve("data")}; dataWithClusterFiles = new Path[]{homeFile.resolve("data").resolve(clusterName.value())}; } } else { if (dataPaths.isEmpty()) { dataFiles = dataWithClusterFiles = EMPTY_PATH_ARRAY; } else { final String paths = String.join(",", dataPaths); throw new IllegalStateException("node does not require local storage yet path.data is set to [" + paths + "]"); } } if (PATH_SHARED_DATA_SETTING.exists(settings)) { sharedDataFile = PathUtils.get(PATH_SHARED_DATA_SETTING.get(settings)).normalize(); } else { sharedDataFile = null; } List
repoPaths = PATH_REPO_SETTING.get(settings); if (repoPaths.isEmpty()) { repoFiles = EMPTY_PATH_ARRAY; } else { repoFiles = new Path[repoPaths.size()]; for (int i = 0; i < repoPaths.size(); i++) { repoFiles[i] = PathUtils.get(repoPaths.get(i)); } } // this is trappy, Setting#get(Settings) will get a fallback setting yet return false for Settings#exists(Settings) if (PATH_LOGS_SETTING.exists(settings)) { logsFile = PathUtils.get(PATH_LOGS_SETTING.get(settings)).normalize(); } else { logsFile = homeFile.resolve("logs"); } if (PIDFILE_SETTING.exists(settings)) { pidFile = PathUtils.get(PIDFILE_SETTING.get(settings)).normalize(); } else { pidFile = null; } binFile = homeFile.resolve("bin"); libFile = homeFile.resolve("lib"); modulesFile = homeFile.resolve("modules"); Settings.Builder finalSettings = Settings.builder().put(settings); finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile); if (PATH_DATA_SETTING.exists(settings)) { finalSettings.putList(PATH_DATA_SETTING.getKey(), dataPaths); } finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString()); this.settings = finalSettings.build(); }}
  • 12.createEnv最后返回一个 Environment 对象。

elasticsearch.execute过程

class Elasticsearch extends EnvironmentAwareCommand {    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {        if (options.nonOptionArguments().isEmpty() == false) {                   }        if (options.has(versionOption)) {            terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())                    + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()                    + ", JVM: " + JvmInfo.jvmInfo().version());            return;        }        final boolean daemonize = options.has(daemonizeOption);        final Path pidFile = pidfileOption.value(options);        final boolean quiet = options.has(quietOption);        try {            init(daemonize, pidFile, quiet, env);        } catch (NodeValidationException e) {        }    }    void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)        throws NodeValidationException, UserException {        try {            Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);        } catch (BootstrapException | RuntimeException e) {        }    }}
  • 13.Elasticsearch.execute ,读取daemonize, pidFile,quiet 的值,并 确保配置的临时目录(temp)是有效目录
  • 14.进入Bootstrap初始化阶段

Bootstrap init过程

static void init(            final boolean foreground,            final Path pidFile,            final boolean quiet,            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {        BootstrapInfo.init();        INSTANCE = new Bootstrap();        final SecureSettings keystore = loadSecureSettings(initialEnv);        final Environment environment = createEnvironment(foreground,                                                           pidFile, keystore, initialEnv.settings(), initialEnv.configFile());        try {            LogConfigurator.configure(environment);        } catch (IOException e) {            throw new BootstrapException(e);        }        if (environment.pidFile() != null) {            try {                PidFile.create(environment.pidFile(), true);            } catch (IOException e) {                throw new BootstrapException(e);            }        }        final boolean closeStandardStreams = (foreground == false) || quiet;        try {            if (closeStandardStreams) {                final Logger rootLogger = ESLoggerFactory.getRootLogger();                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);                if (maybeConsoleAppender != null) {                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);                }                closeSystOut();            }            checkLucene();            Thread.setDefaultUncaughtExceptionHandler(                new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));            INSTANCE.setup(true, environment);            try {                IOUtils.close(keystore);            } catch (IOException e) {                throw new BootstrapException(e);            }            INSTANCE.start();            if (closeStandardStreams) {                closeSysError();            }        } catch (NodeValidationException | RuntimeException e) {            throw e;        }    }
  • 15.创建Bootstrap对象, INSTANCE = new Bootstrap()。
  • 16.初始化Bootstrap对象,INSTANCE.setup(true, environment)。
  • 17.启动Bootstrap对象INSTANCE.start()。

Bootstrap new过程

Bootstrap() {        keepAliveThread = new Thread(new Runnable() {            @Override            public void run() {                try {                    keepAliveLatch.await();                } catch (InterruptedException e) {                    // bail out                }            }        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");        keepAliveThread.setDaemon(false);        // keep this thread alive (non daemon thread) until we shutdown        Runtime.getRuntime().addShutdownHook(new Thread() {            @Override            public void run() {                keepAliveLatch.countDown();            }        });    }
  • 18.创建Bootstrap对象,该类构造函数会创建一个用户线程添加到Runtime Hook中,进行 countDown 操作。

Bootstrap setup过程

private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {        Settings settings = environment.settings();        try {            spawner.spawnNativePluginControllers(environment);        } catch (IOException e) {            throw new BootstrapException(e);        }        initializeNatives(                environment.tmpFile(),                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));        // initialize probes before the security manager is installed        initializeProbes();        if (addShutdownHook) {            Runtime.getRuntime().addShutdownHook(new Thread() {                @Override                public void run() {                    try {                        IOUtils.close(node, spawner);                        LoggerContext context = (LoggerContext) LogManager.getContext(false);                        Configurator.shutdown(context);                    } catch (IOException ex) {                        throw new ElasticsearchException("failed to stop node", ex);                    }                }            });        }        try {            JarHell.checkJarHell();        } catch (IOException | URISyntaxException e) {        }        IfConfig.logIfNecessary();        try {            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));        } catch (IOException | NoSuchAlgorithmException e) {        }        node = new Node(environment) {            @Override            protected void validateNodeBeforeAcceptingRequests(                final BootstrapContext context,                final BoundTransportAddress boundTransportAddress, List
checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } }; }
  • 19.为每个插件生成生成本机控制类,spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。
  • 20.初始化本地资源 initializeNatives()。
  • 21.使用 JarHell 检查重复的 jar 文件 JarHell.checkJarHell()。
  • 22.创建 node 节点 new Node(environment),核心!!!

Bootstrap start过程

private void start() throws NodeValidationException {        node.start();        keepAliveThread.start();    }
  • 23.启动node对象,node.start()。
  • 24.启动守护进程,keepAliveThread.start()。

node创建过程

protected Node(final Environment environment, Collection
> classpathPlugins) { try { Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); try { nodeEnvironment = new NodeEnvironment(tmpSettings, environment); resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { } final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings); Logger logger = Loggers.getLogger(Node.class, tmpSettings); final String nodeId = nodeEnvironment.nodeId(); tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId); final JvmInfo jvmInfo = JvmInfo.jvmInfo(); this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins); this.settings = pluginsService.updatedSettings(); localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId()); this.environment = new Environment(this.settings, environment.configFile()); final List
> executorBuilders = pluginsService.getExecutorBuilders(settings); final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); DeprecationLogger.setThreadContext(threadPool.getThreadContext()); resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext())); final List
> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings()); final List
additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter()); for (final ExecutorBuilder
builder : threadPool.builders()) { additionalSettings.addAll(builder.getRegisteredSettings()); } client = new NodeClient(settings, threadPool); ..................}
  • 25.创建一个 NodeEnvironment 对象保存节点环境信息,如各种数据文件的路径
  • 26.读取JVM信息
  • 27.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件
  • 28.创建一个最终的 Environment 对象
  • 29.创建线程池 ThreadPool 后面各类对象基本都是通过线程来提供服务,这个线程池可以管理各类线程
  • 30.创建 节点客户端 NodeClient
protected Node(final Environment environment, Collection
> classpathPlugins) { .................. final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class)); AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class)); final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter); scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings()); resourcesToClose.add(resourceWatcherService); final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))); List
clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); clusterService.addListener(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, listener::onNewInfo); final UsageService usageService = new UsageService(settings);}
  • 31.创建各种服务类对象 ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService,这些服务类是的功能可以根据名称做一个大概的判断。
protected Node(final Environment environment, Collection
> classpathPlugins) { .................. ModulesBuilder modules = new ModulesBuilder(); // plugin modules must be added here, before others or we can get crazy injection errors... for (Module pluginModule : pluginsService.createGuiceModules()) { modules.add(pluginModule); } final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); modules.add(new GatewayModule()); ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService); modules.add(actionModule); final RestController restController = actionModule.getRestController(); final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController); Collection
>> customMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class) .stream().map(Plugin::getCustomMetaDataUpgrader) .collect(Collectors.toList()); Collection
>> indexTemplateMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexTemplateMetaDataUpgrader) .collect(Collectors.toList()); Collection
> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class) .stream().map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders); final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader); new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService); final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); final Consumer
httpBind; final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { httpServerTransport = networkModule.getHttpServerTransportSupplier().get(); httpBind = b -> { b.bind(HttpServerTransport.class).toInstance(httpServerTransport); }; } else { httpBind = b -> { b.bind(HttpServerTransport.class).toProvider(Providers.of(null)); }; httpServerTransport = null; } final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), clusterModule.getAllocationService()); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService);
  • 32.ModulesBuilder类加入各种模块 ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
protected Node(final Environment environment, Collection
> classpathPlugins) { .................. modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService); b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService); b.bind(BigArrays.class).toInstance(bigArrays); b.bind(ScriptService.class).toInstance(scriptModule.getScriptService()); b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry()); b.bind(IngestService.class).toInstance(ingestService); b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), responseCollectorService)); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays, scriptModule.getScriptService())); b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService())); b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class).toInstance( new PeerRecoverySourceService(settings, transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class).toInstance( new PeerRecoveryTargetService(settings, threadPool, transportService, recoverySettings, clusterService)); } httpBind.accept(b); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); } ); injector = modules.createInjector(); clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class)); List
pluginLifecycleComponents = pluginComponents.stream() .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream() .map(injector::getInstance).collect(Collectors.toList())); resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key
>() {}), () -> clusterService.localNode().getId()); if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodes()); } success = true; } catch (IOException ex) { } finally { } }
  • 33.elasticsearch里面的组件基本都进行进行了模块化管理,elasticsearch对guice进行了封装通过ModulesBuilder类构建es的模块.

node启动过程

public Node start() throws NodeValidationException {        if (!lifecycle.moveToStarted()) {            return this;        }        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));        logger.info("starting ...");        pluginLifecycleComponents.forEach(LifecycleComponent::start);        injector.getInstance(MappingUpdatedAction.class).setClient(client);        injector.getInstance(IndicesService.class).start();        injector.getInstance(IndicesClusterStateService.class).start();        injector.getInstance(SnapshotsService.class).start();        injector.getInstance(SnapshotShardsService.class).start();        injector.getInstance(RoutingService.class).start();        injector.getInstance(SearchService.class).start();        nodeService.getMonitorService().start();        final ClusterService clusterService = injector.getInstance(ClusterService.class);        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);        nodeConnectionsService.start();        clusterService.setNodeConnectionsService(nodeConnectionsService);        injector.getInstance(ResourceWatcherService.class).start();        injector.getInstance(GatewayService.class).start();        Discovery discovery = injector.getInstance(Discovery.class);        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);        // Start the transport service now so the publish address will be added to the local disco node in ClusterService        TransportService transportService = injector.getInstance(TransportService.class);        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));        transportService.start();        assert localNodeFactory.getNode() != null;        assert transportService.getLocalNode().equals(localNodeFactory.getNode())            : "transportService has a different local node than the factory provided";        final MetaData onDiskMetadata;        try {            // we load the global state here (the persistent part of the cluster state stored on disk) to            // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.            if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {                onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();            } else {                onDiskMetadata = MetaData.EMPTY_META_DATA;            }            assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null        } catch (IOException e) {            throw new UncheckedIOException(e);        }        validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata),              transportService.boundAddress(), pluginsService            .filterPlugins(Plugin            .class)            .stream()            .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));        clusterService.addStateApplier(transportService.getTaskManager());        // start after transport service so the local disco is known        discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService        clusterService.start();        assert clusterService.localNode().equals(localNodeFactory.getNode())            : "clusterService has a different local node than the factory provided";        transportService.acceptIncomingRequests();        discovery.startInitialJoin();        // tribe nodes don't have a master so we shouldn't register an observer         s        final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);        if (initialStateTimeout.millis() > 0) {            final ThreadPool thread = injector.getInstance(ThreadPool.class);            ClusterState clusterState = clusterService.state();            ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService,                                                               null, logger, thread.getThreadContext());            if (clusterState.nodes().getMasterNodeId() == null) {                logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);                final CountDownLatch latch = new CountDownLatch(1);                observer.waitForNextChange(new ClusterStateObserver.Listener() {                    @Override                    public void onNewClusterState(ClusterState state) { latch.countDown(); }                    @Override                    public void onClusterServiceClose() {                        latch.countDown();                    }                    @Override                    public void onTimeout(TimeValue timeout) {                        logger.warn("timed out while waiting for initial discovery state - timeout: {}",                            initialStateTimeout);                        latch.countDown();                    }                }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);                try {                    latch.await();                } catch (InterruptedException e) {                    throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");                }            }        }        if (NetworkModule.HTTP_ENABLED.get(settings)) {            injector.getInstance(HttpServerTransport.class).start();        }        if (WRITE_PORTS_FILE_SETTING.get(settings)) {            if (NetworkModule.HTTP_ENABLED.get(settings)) {                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);                writePortsFile("http", http.boundAddress());            }            TransportService transport = injector.getInstance(TransportService.class);            writePortsFile("transport", transport.boundAddress());        }        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);        return this;    }
  • 34.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
  • 35.IndicesService:索引管理
    IndicesClusterStateService:跨集群同步

SnapshotsService:负责创建快照

SnapshotShardsService:此服务在数据和主节点上运行,并控制这些节点上当前快照的分片。
RoutingService:侦听集群状态,当它收到集群改变事件将验证集群状态,路由表可能会更新
SearchService:搜索服务
MonitorService:监控
NodeConnectionsService:此组件负责在节点添加到群集状态后连接到节点,并在删除它们时断开连接。 此外,它会定期检查所有连接是否仍处于打开状态,并在需要时还原它们。 请注意,如果节点断开/不响应ping,则此组件不负责从群集中删除节点。 这是由NodesFaultDetection完成的。 主故障检测由链接MasterFaultDetection完成。
ResourceWatcherService:通用资源观察器服务
GatewayService:网关

  • 36.集群发现与监控等,启动 HttpServerTransport, 绑定服务端口。

线程池

public ThreadPool(final Settings settings, final ExecutorBuilder
... customBuilders) { super(settings); assert Node.NODE_NAME_SETTING.exists(settings); final Map
builders = new HashMap<>(); final int availableProcessors = EsExecutors.numberOfProcessors(settings); final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200)); builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1)); builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); for (final ExecutorBuilder
builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); } this.builders = Collections.unmodifiableMap(builders); threadContext = new ThreadContext(settings); final Map
executors = new HashMap<>(); for (@SuppressWarnings("unchecked") final Map.Entry
entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); this.executors = unmodifiableMap(executors); this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName( settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); }

线程池类型 ThreadPoolType

fixed(固定):fixed线程池拥有固定数量的线程来处理请求,在没有空闲线程时请求将被挂在队列中。queue_size参数可以控制在没有空闲线程时,能排队挂起的请求数

fixed_auto_queue_size:此类型为实验性的,将被更改或删除,不关注
scaling(弹性):scaling线程池拥有的线程数量是动态的,这个数字介于core和max参数的配置之间变化。keep_alive参数用来控制线程在线程池中空闲的最长时间
direct:此类线程是一种不支持关闭的线程,就意味着一旦使用,则会一直存活下去.

一些重要的线程池

generic:用于通用的请求(例如:后台节点发现),线程池类型为 scaling。

index:用于index/delete请求,线程池类型为 fixed, 大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
search:用于count/search/suggest请求。线程池类型为 fixed, 大小的为 int((处理器数量 3) / 2) +1,队列大小为1000。*
get:用于get请求。线程池类型为 fixed,大小的为处理器数量,队列大小为1000。
analyze:用于analyze请求。线程池类型为 fixed,大小的1,队列大小为16
write:用于单个文档的 index/delete/update 请求以及 bulk 请求,线程池类型为 fixed,大小的为处理器数量,队列大小为200,最大线程数为 1 + 处理器数量。
snapshot:用于snaphost/restore请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
warmer:用于segment warm-up请求。线程池类型为 scaling,线程保持存活时间为5分钟,最大线程数为min(5, (处理器数量)/2)。
refresh:用于refresh请求。线程池类型为 scaling,线程空闲保持存活时间为5分钟,最大线程数为min(10, (处理器数量)/2)。
listener:主要用于Java客户端线程监听器被设置为true时执行动作。线程池类型为 scaling,最大线程数为min(10, (处理器数量)/2)。

PluginsService插件服务

public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";    public static final String ES_PLUGIN_POLICY = "plugin-security.policy";    public static PluginInfo readFromProperties(final Path path) throws IOException {        final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);        final Properties props = new Properties();        try (InputStream stream = Files.newInputStream(descriptor)) {            props.load(stream);        }        final String name = props.getProperty("name");        if (name == null || name.isEmpty()) {            throw new IllegalArgumentException(                    "property [name] is missing in [" + descriptor + "]");        }        final String description = props.getProperty("description");        if (description == null) {            throw new IllegalArgumentException(                    "property [description] is missing for plugin [" + name + "]");        }        final String version = props.getProperty("version");        if (version == null) {            throw new IllegalArgumentException(                    "property [version] is missing for plugin [" + name + "]");        }        final String esVersionString = props.getProperty("elasticsearch.version");        if (esVersionString == null) {            throw new IllegalArgumentException(                    "property [elasticsearch.version] is missing for plugin [" + name + "]");        }        final Version esVersion = Version.fromString(esVersionString);        if (esVersion.equals(Version.CURRENT) == false) {            final String message = String.format(                    Locale.ROOT,                    "plugin [%s] is incompatible with version [%s]; was designed for version [%s]",                    name,                    Version.CURRENT.toString(),                    esVersionString);            throw new IllegalArgumentException(message);        }        final String javaVersionString = props.getProperty("java.version");        if (javaVersionString == null) {            throw new IllegalArgumentException(                    "property [java.version] is missing for plugin [" + name + "]");        }        JarHell.checkVersionFormat(javaVersionString);        JarHell.checkJavaVersion(name, javaVersionString);        final String classname = props.getProperty("classname");        if (classname == null) {            throw new IllegalArgumentException(                    "property [classname] is missing for plugin [" + name + "]");        }        final String hasNativeControllerValue = props.getProperty("has.native.controller");        final boolean hasNativeController;        if (hasNativeControllerValue == null) {            hasNativeController = false;        } else {            switch (hasNativeControllerValue) {                case "true":                    hasNativeController = true;                    break;                case "false":                    hasNativeController = false;                    break;                default:                    final String message = String.format(                        Locale.ROOT,                        "property [%s] must be [%s], [%s], or unspecified but was [%s]",                        "has_native_controller",                        "true",                        "false",                        hasNativeControllerValue);                    throw new IllegalArgumentException(message);            }        }        final String requiresKeystoreValue = props.getProperty("requires.keystore", "false");        final boolean requiresKeystore;        try {            requiresKeystore = Booleans.parseBoolean(requiresKeystoreValue);        } catch (IllegalArgumentException e) {            throw new IllegalArgumentException("property [requires.keystore] must be [true] or [false]," +                                               " but was [" + requiresKeystoreValue + "]", e);        }        return new PluginInfo(name, description, version, classname, hasNativeController, requiresKeystore);    }
  • 读取模块的配置文件 plugin-descriptor.properties,解析出内容并存储到 Map中。
  • 分别校验 name, description, version, elasticsearch.version, java.version, classname, extended.plugins, has.native.controller, requires.keystore 这些配置项,缺失或者不按要求则抛出异常。
  • 根据配置项构造一个 PluginInfo 对象返回。

参考文章

转载地址:http://qyegx.baihongyu.com/

你可能感兴趣的文章
默认虚拟主机设置
查看>>
php中的短标签 太坑人了
查看>>
[译] 可维护的 ETL:使管道更容易支持和扩展的技巧
查看>>
### 继承 ###
查看>>
数组扩展方法之求和
查看>>
astah-professional-7_2_0安装
查看>>
函数是对象-有属性有方法
查看>>
uva 10107 - What is the Median?
查看>>
Linux下基本栈溢出攻击【转】
查看>>
c# 连等算式都在做什么
查看>>
使用c:forEach 控制5个换行
查看>>
java web轻量级开发面试教程摘录,java web面试技巧汇总,如何准备Spring MVC方面的面试...
查看>>
使用ansible工具部署ceph
查看>>
linux系列博文---->深入理解linux启动运行原理(一)
查看>>
Android反编译(一) 之反编译JAVA源码
查看>>
结合当前公司发展情况,技术团队情况,设计一个适合的技术团队绩效考核机制...
查看>>
python-45: opener 的使用
查看>>
cad图纸转换完成的pdf格式模糊应该如何操作?
查看>>
Struts2与Struts1区别
查看>>
网站内容禁止复制解决办法
查看>>