该文章内容发布已经超过一年,请注意检查文章中内容是否过时。
在前一章中,我们了解了不同收集器中的指标样本是如何被监听器添加进去的。接下来,我们将归纳指标监听器 的创建位置,及它们对应统计的指标。
通过之前的分析,我们已经知道指标 注册事件多播器(RegistryMetricsEventMulticaster)中定义了并绑定了服务注册相关的指标。这种绑定操作同样存在于其它几个简单指标事件多播器(SimpleMetricsEventMulticaster)的几个实现中。
RegistrySubDispatcher (服务注册指标转发器)注册了服务注册相关指标:
MetadataSubDispatcher(元数据指标转发器)注册应用元数据相关指标
应用推送元数据相关计数 (APPLICATION_PUSH_…)
应用订阅元数据相关计数 (APPLICAITON_SUBSCRIBE_…)
服务订阅元数据相关计数 (SERVICE_SUBSCRIBE_…)
ConfigCenterSubDispatcher (配置中心指标转发器) 注册配置中心配置更新次数指标
DefaultSubDispatcher (默认转发器) 注册核心RPC调用次数指标
MetricsDispatcher
MetricsDispatcher 较为特殊,它负责 ApplicationModel 下所有 MetricsCollector(前文中提到的指标收集器) 的初始化注册工作,并将它们添加到自己的监听器列表中。
public class MetricsDispatcher extends SimpleMetricsEventMulticaster {
@SuppressWarnings({"rawtypes"})
public MetricsDispatcher(ApplicationModel applicationModel) {
ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
ExtensionLoader<MetricsCollector> extensionLoader = applicationModel.getExtensionLoader(MetricsCollector.class);
if (extensionLoader != null) {
List<MetricsCollector> customizeCollectors = extensionLoader
.getActivateExtensions();
for (MetricsCollector customizeCollector : customizeCollectors) {
beanFactory.registerBean(customizeCollector);
}
customizeCollectors.forEach(this::addListener);
}
}
}
需要注意,以上几个实现均继承自 SimpleMetricsEventMulticaster,因此它们都具有注册监听、转发事件的能力。它们将自己注册到对应领域的指标 Collector 中,并在收到指标事件时转发到自己注册的监听器中。
//SimpleMetricsEventMulticaster
public void addListener(MetricsListener<?> listener) {
listeners.add(listener);
}
public void publishEvent(MetricsEvent event) {
if (event instanceof EmptyEvent) {
return;
}
if (validateIfApplicationConfigExist(event)) return;
for (MetricsListener listener : listeners) {
if (listener.isSupport(event)) {
listener.onEvent(event);
}
}
}
//...
SubDispatcher 和 Collector 之间的对应关系:
剩下的问题就是这些监听器是如何被触发的。
可以发现三大中心的指标转发器都是在它们对应的Collector中创建的:
public ConfigCenterMetricsCollector(ApplicationModel applicationModel) {
...
super.setEventMulticaster(new ConfigCenterMetricsDispatcher(this));
...
}
public MetadataMetricsCollector(ApplicationModel applicationModel) {
...
super.setEventMulticaster(new MetadataMetricsEventMulticaster(this));
...
}
public RegistryMetricsCollector(ApplicationModel applicationModel) {
...
super.setEventMulticaster(new RegistryMetricsEventMulticaster(this));
...
}
这意味这想要通过它们发布事件,需要通过它们对应的 Collector
来访问。
如前文所述, MetricsDispatcher 在初始化时会尝试获取并加载所有 MetricsCollector 的SPI拓展,
三大中心的MetricsCollector (Metadata/Registry/ConfigCenter)也会在这里被初始化,并添加为 MetricsDispatcher 的监听器:
public MetricsDispatcher(ApplicationModel applicationModel) {
...
ExtensionLoader<MetricsCollector> extensionLoader = applicationModel.getExtensionLoader(MetricsCollector.class);
...
for (MetricsCollector customizeCollector : customizeCollectors) {
beanFactory.registerBean(customizeCollector);
}
customizeCollectors.forEach(this::addListener);
}
对于 MetricsDispatcher,它由 MetricsEventBus 创建。而 MetricsEventBus 自身作为指标相关消息的总线,会接收所有指标消息,并将它们转发给监听者。
MetricsEvenetBus 提供了三个方法来发布指标事件:
publish(MetricsEvent event)
,将事件发布给所有订阅者,只发布一次且不关心事件处理结果post(MetricsEvent event, Supplier<T> targetSupplier)
,将事件发布给所有订阅者,并根据是否产生异常判断事件成功或失败,调用 MetricsDispatcher 发布对应的事件。post(MetricsEvent event, Supplier<T> targetSupplier, Function<T, Boolean> trFunction)
,额外的 trFunction 可用于通过业务结果判断事件成功或失败。 targetSupplier
为业务操作函数,泛型T为业务结果类型。这三个方法均会通过 MetricsDispatcher 来转发事件。
在之前的分析中,我们知道 MetricsDispatcher 创建了所有 MetricsCollector 拓展,并将它们注册为自己的监听者。
因此,当 MetricsEventBus 接收到发布的信息时,它会将信息转发到所有 MetricsCollector 中。对于 CombMetricsCollector 的实现,它们又会调用自己创建的 MetricsEventMulticaster 再次转发消息,到具体指标的监听器。
之后,这些监听器就会根据自己的逻辑修改Collector中的指标计数。
接下来,我们将寻找指标事件发布的源头。
通过前文的分析,我们知道 MetricsEventBus 是所有指标事件发布的入口。具体来说,它有以下的用法:
我们将逐个分析每个用法。
AbstractDirectory
AbstractDirectory 在修改 Invoker 状态相关的操作完成后都会通过 MetricsEventBus 发布 refreshDirectoryEvent(服务目录更新事件,类型为 RegistryEvent ),将当前目录中各种状态 Invoker 实例的最新数量作为附件添加到 RegistryEvent 中。
//AbstractDirectory
public void recoverDisabledInvoker(Invoker<T> invoker) {
...
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}
protected void setInvokers(BitList<Invoker<T>> invokers) {
...
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}
protected void setInvokers(BitList<Invoker<T>> invokers) {
...
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
}
private Map<MetricsKey, Map<String, Integer>> getSummary() {
Map<MetricsKey, Map<String, Integer>> summaryMap = new HashMap<>();
//目录中可用的Invoker数量
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_VALID, groupByServiceKey(getValidInvokers()));
//目录中不可用的Invoker数量
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_DISABLE, groupByServiceKey(getDisabledInvokers()));
//目录中等待重连的Invoker数量
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_TO_RECONNECT, groupByServiceKey(getInvokersToReconnect()));
summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_ALL, groupByServiceKey(getInvokers()));
return summaryMap;
}
...
该事件最终会由 RegistryMetricsCollector 中的 RegistryMetricsDispatcher 转发到关系该事件的监听器中。事件和监听器之间通过 MetricsKey匹配 。
最终,MetricsKey 为 **DIRECTORY_METRIC_NUM_VALID**
的监听器会处理这个事件,并更新 Collector中的计数。
//DIRECTORY_METRIC_NUM_VALID 对应的 Listener。
MetricsCat APPLICATION_DIRECTORY_POST = new MetricsCat(MetricsKey.DIRECTORY_METRIC_NUM_VALID, (key, placeType, collector) -> AbstractMetricsListener.onEvent(key,
event ->
{
Map<MetricsKey, Map<String, Integer>> summaryMap = event.getAttachmentValue(ATTACHMENT_DIRECTORY_MAP);
summaryMap.forEach((metricsKey, map) ->
map.forEach(
(k, v) -> collector.setNum(metricsKey, event.appName(), k, v)));
}
));
这样,服务目录中不同状态 Invoker 的计数就通过 RegistryMetricsCollector 更新到了 ServiceStatComposite 中。
ServiceConfig
当通过 ServiceConfig 导出、注册一个服务时,它会发布一个服务导出事件。
//ServiceConfig
protected synchronized void doExport() {
...
doExportUrls();
exported();
}
private void doExportUrls() {
...
MetricsEventBus.post(RegistryEvent.toRsEvent(module.getApplicationModel(), getUniqueServiceName(), protocols.size() * registryURLs.size()),
//该函数会被同步执行,如果抛出异常则触发 MetricsEvent 的 onError方法,否则触发 onFinish
() -> {
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
if (!serverService) {
repository.registerService(pathKey, interfaceClass);
}
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
return null;
}
);
providerModel.setServiceUrls(urls);
}
事件发布时,该事件会被转发到 RegistryMetricsCollector,触发对应的 Listener 增加 SERVICE_REGISTER_METRIC_REQUESTS (当前服务级注册请求总数)的计数,然后执行定义的 provider 函数。根据是否抛出异常,之后执行 onError 方法 或 onFinish 方法,增加 SERVICE_REGISTER_METRIC_REQUESTS_FAILED (当前服务级注册请求失败总数)或SERVICE_REGISTER_METRIC_REQUESTS_SUCCEED (当前服务级注册请求成功总数) 的计数。
DefaultApplicationDeployer
它在应用部署过程中,初始化配置中心时,发布配置发生改变的事件。
private void startConfigCenter() {
...
compositeDynamicConfiguration.addConfiguration(
prepareEnvironment(configCenter)
);
...
}
private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
...
// Add metrics
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, configCenter.getConfigFile(), configCenter.getGroup(),configCenter.getProtocol(), ConfigChangeType.ADDED.name(), configMap.size()));
if (isNotEmpty(appGroup)) {
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, appConfigFile, appGroup, configCenter.getProtocol(), ConfigChangeType.ADDED.name(), appConfigMap.size()));
}
...
}
在 prepareEnvironment
方法中,会按配置中心设置的组(group)和当前应用程序的名称作为组名发布两次事件。该事件会被转发到 ConfigCenterMetricsCollector,增加 CONFIGCENTER_METRIC_TOTAL (配置中心配置变化次数)的计数。
ApolloDynamicConfiguration
动态配置功能的 Apollo 配置中心实现。
//ApolloDynamicConfiguration
@Override
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
...
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, event.getKey(), event.getGroup(),ConfigCenterEvent.APOLLO_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
}
当Apollo配置中心的配置发生变化时,它的 onChange
方法会被触发,并在最后发布一个 ConfigCenterEvent。该事件最终转发到ConfigCenterMetricsCollector 中,同样增加 CONFIGCENTER_METRIC_TOTAL 的计数。
NacosDynamicConfiguration
动态配置功能的 Nacos 配置中心实现。
//NacosDynamicConfiguration
@Override
public void innerReceive(String dataId, String group, String configInfo) {
...
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, event.getKey(), event.getGroup(),
ConfigCenterEvent.NACOS_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
}
当Nacos配置中心的配置发生变化时,它的 innerReceive
方法被触发,发布一个 ConfigCenterEvent。它的处理流程和 ApolloDynamicConfiguration 一致,最终增加 CONFIGCENTER_METRIC_TOTAL 的计数。
ZookeeperDataListener
动态配置功能的 Zookeeper 实现。
//ZookeeperDataListener
@Override
public void dataChanged(String path, Object value, EventType eventType) {
...
MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, configChangeEvent.getKey(), configChangeEvent.getGroup(),
ConfigCenterEvent.ZK_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
}
在指标收集层面,它的行为和前文中两个配置中心一致,此处不详细展开三个配置中心具体实现的异同。
AbstractMetadataReport
元数据报告接口的抽象实现。它的三个实现 (NacosMetadataReport、RedisMetadataReport、ZookeeperMetadataReport)均使用它的指标事件逻辑。
当订阅新服务并获取它的元数据时,它会发布一个 MetadataEvent 触发相关指标的修改。
private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
...
MetricsEventBus.post(metadataEvent, () ->
{
boolean result = true;
try {
...
doStoreProviderMetadata(providerMetadataIdentifier, data);
saveProperties(providerMetadataIdentifier, data, true, !syncReport);
...
} catch (Exception e) {
...
result = false;
}
return result;
}, aBoolean -> aBoolean
);
}
在前文中,我们提到 MetricsEventBus.post 的第二个参数是实际要进行的业务操作,第三个参数则是根据业务操作返回值判断操作是否成功的逻辑。
此处的业务操作是尝试存储目标服务的元数据。执行操作之前,会先发布事件,最终增加 STORE_PROVIDER_METADATA (尝试存储服务元数据次数)的计数。如果产生异常,会增加 STORE_PROVIDER_METADATA_ERROR (存储服务元数据失败) 的计数,否则增加STORE_PROVIDER_METADATA_SUCCEED(存储服务元数据成功) 的计数。