作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。
由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第四篇,控制节点的Master选举及ContextChain/Context服务实例化,个人认为是Openflowplugin中最核心一篇!
附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析
第三篇:(三)ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。
回顾上一篇核心观点:“在Switch完成Handshake后,Openflowplugin会为Switch创建各个Context对象(Device/Rpc/Statistics/Role),以及ContextChain对象。”引出本文核心问题:在多个控制器情况下,当Switch设置多个控制器(比如ovs:set-controller),那么每个控制器,Openflowplugin都会为switch创建ContextChain对象,那么哪个控制器节点会向上层提供Switch的服务?比如下发流表。
结论:Openflow协议支持多个控制器,控制器有角色有三类master、slave、equal。本文仅讨论Master/Slave,当控制器节点为Switch的Master,该控制器的Openflowplugin为上层应用提供该Switch的服务。
一般来说,为了控制层高可用使用master/slave模式;为了控制器负载均衡使用equal模式;
1.Master选举(Cluster Singleton Service)
在上一篇《ODL Openflowplugin Switch生命周期对象ContextChain创建源码分析》笔记最后留下了悬念,在ContextChainHolderImpl.createContextChain
方法的最后代码是Master选举的关键:
1 |
contextChain.registerServices(singletonServiceProvider); |
接下来我们详细剖析此行代码!
ContextChainImpl.registerServices
方法如下:
1 2 3 4 5 6 |
@Override public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { registration = Objects.requireNonNull(clusterSingletonServiceProvider .registerClusterSingletonService(this)); LOG.debug("Registered clustering services for node {}", deviceInfo); } |
方法是将当前ContextChainImpl
对象注册为ClusterSingletonService。这是opendaylight/mdsal的接口,它作用是在控制器集群节点中为这一个service进行选举,最终只会在选举出来的leader节点中运行此serivce,所以称为Singleton service。注意:可以看到ContextChain实现了ClusterSingletonService接口。
如果是集群情况,那么各个控制节点会选举,如果不是集群,那么当前节点就会成为master。
注:Opendaylight为集群环境提供了Singleton Service以及EntityOwnershipService,具体可以参考ODL官网,后续我会为此两服务展开深入剖析笔记!
根据Cluster Singleton Service的实现,当某个节点成为此service的leader,首先会在此节点中调用此service自身的instantiateServiceInstance
方法。所以,可以思考到Openflowplugin运行集群情况下,底层Switch必须设置所有集群节点为其控制器,这样在每个控制器都创建ContextChain对象都会注册Singleton Service,最终由底层决定某一个节点成某个switch的ContextChain服务的Leader并运行其instantiateServiceInstance方法进行实例化服务!
总结:Openflowplugin为每个Switch创建ContextChain,ContextChain作为一个Cluster Singleton service。本质上的Switch的控制器的Master选举,就是ContextChain的Cluster Singleton Service选举。
2.实例化ContextChain服务实例
如上面所述,当ContextChain作为一个Singleton service选举,成为Leader的节点就会运行其ContextChainImpl.instantiateServiceInstance
方法:
1 2 3 4 5 6 7 8 9 10 11 12 |
@Override @SuppressWarnings("checkstyle:IllegalCatch") public void instantiateServiceInstance() { try { contexts.forEach(OFPContext::instantiateServiceInstance); LOG.info("Started clustering services for node {}", deviceInfo); } catch (final Exception ex) { LOG.warn("Not able to start clustering services for node {}", deviceInfo); executorService.execute(() -> contextChainMastershipWatcher .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString())); } } |
可以看到方法逻辑是调用上一步传入的各个Context的instantiateServiceInstance方法。如果有任何异常都调用ContextChainHolderImpl.onNotAbleToStartMastership
方法(会销毁ContextChain及各个Context)。
而调用Context的instantiateServiceInstance方法,即GuardedContextImpl.instantiateServiceInstance
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public void instantiateServiceInstance() { if (monitor.enterIf(isStartable)) { try { LOG.info("Starting {} service for node {}", this, getDeviceInfo()); state = STARTING; delegate.instantiateServiceInstance(); state = RUNNING; } finally { monitor.leave(); } } else { throw new IllegalStateException("Service " + this + " has already been started"); } } |
实际上是调用:DeviceContext/RpcContext/StatisticsContext/RoleContext的instantiateServiceInstance方法。
先给出结论:当四个Context的instantiateServiceInstance方法都成功执行完成,当前控制器正式成为Switch的Master,并可通过“钩子”触发上层北向应用。
2.1 DeviceContext实例化服务
下面展开DeviceContextImpl.instantiateServiceInstance
具体过程。实例化DeviceContextImpl,需要执行很多逻辑。包括:
(1)lazyTransactionManagerInitialization()
,会创建TransactionChainManager
对象;创建DeviceFlowRegistryImpl
对象;创建DeviceGroupRegistryImpl
对象;创建DeviceMeterRegistryImpl
对象。
(2)解析portStatusMessages,将switch上的nodeConnector(port)写入operational YANG。注意:此时刚刚Handshake完成后,应该是没有port的,后面步骤会请求底层port信息。
(3)根据Device openflow版本号,选择不同的initializer(OF10DeviceInitializer或OF13DeviceInitializer)进行处似乎还,初始化会进行。
- 1.将switch node写入Operational YANG: inventory node;
- 2.解析capabilities, 写入deviceContext.deviceState;
- 3.发消息请求switch获取switch各个特性。包括:table/group/meter/port description等信息,并写到operational yang中
(4)调用DeviceFlowRegistryImpl.fill(), 读取ovs node的FlowCapableNode的节点的流(从YANG),注意DeviceFlowRegistryImpl对象是ofp中缓存了一份完整流表。
(5)读取FlowCapableNode完成后, 回调DeviceFlowRegistryCallback对象; 最终会调用ContextChainHolderImpl.onMasterRoleAcquired
方法, 传入参数状态是INITIAL_FLOW_REGISTRY_FILL。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
@Override @SuppressWarnings({"checkstyle:IllegalCatch"}) public void instantiateServiceInstance() { lazyTransactionManagerInitialization(); try { final List<PortStatusMessage> portStatusMessages = primaryConnectionContext .retrieveAndClearPortStatusMessages(); // 解析portStatusMessages,将switch上的nodeConnector(port)写入operational YANG portStatusMessages.forEach(this::writePortStatusMessage); submitTransaction(); } catch (final Exception ex) { throw new RuntimeException(String.format("Error processing port status messages from device %s: %s", deviceInfo.toString(), ex.toString()), ex); } /* 根据device openflow版本号,选择不同的initializer(OF10DeviceInitializer或OF13DeviceInitializer),初始化. 1.将ovs node写入operational YANG 2.解析capabilities, 写入deviceContext.deviceState 3.发消息请求switch获取table features/group features/meter features/port description等信息,并写到operational yang中 */ final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider .lookup(deviceInfo.getVersion()); if (initializer.isPresent()) { final Future<Void> initialize = initializer .get() .initialize(this, switchFeaturesMandatory, skipTableFeatures, writerProvider, convertorExecutor); try { initialize.get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { initialize.cancel(true); throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s", deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex); } catch (ExecutionException | InterruptedException ex) { throw new RuntimeException( String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex); } } else { throw new RuntimeException(String.format("Unsupported version %s for device %s", deviceInfo.getVersion(), deviceInfo.toString())); } // 调用DeviceFlowRegistryImpl.fill(), 读取ovs node的FlowCapableNode的节点(从YANG) final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill(); // 读取FlowCapableNode完成后, 回调DeviceFlowRegistryCallback对象; 最终会调用contextChainMastershipWatcher.onMasterRoleAcquired, 状态是INITIAL_FLOW_REGISTRY_FILL Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher), MoreExecutors.directExecutor()); } |
关注一下,所以当DeviceContext对象初始化完成后,最终调用ContextChainHolderImpl.onMasterRoleAcquired
方法。给出结论,当各个Context实例化服务实例最后都会调用此方法。当所有Context都完成调用后,说明Switch对应的ContextChain及Context完成实例化,并能提供服务。
2.2 RpcContext实例化服务
RpcContext的instantiateServiceInstance方法主要完成:
(1)主要是给rpcContext
注册各种rpc实现,用于上层调用(比如下发流表SalFlowService)。在RpcContext实例化过程中,会实例化并注册所有Rpc服务,提供给上层调用。具体RPC服务注册及使用,在下一篇笔记展开。
(2)在注册各种rpc实现后,调用ContextChainHolderImpl.onMasterRoleAcquired
方法,传入参数状态是RPC_REGISTRATION
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Override public void instantiateServiceInstance() { // 创建各个sal service(对设备的rpc操作等等), registerRpcServiceImplementation到自身rpcContextImpl. 包括:SalFlowServiceImpl // registers all OF services for role MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor); if (isStatisticsRpcEnabled && !deviceContext.canUseSingleLayerSerialization()) { // 注册统计相关的service MdSalRegistrationUtils.registerStatCompatibilityServices( this, deviceContext, notificationPublishService, convertorExecutor); } contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.RPC_REGISTRATION); } |
其中调用MdSalRegistrationUtils.registerServices
方法中可以看到实例化各种RPC Service并注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
public static void registerServices(@Nonnull final RpcContext rpcContext, @Nonnull final DeviceContext deviceContext, final ExtensionConverterProvider extensionConverterProvider, //ExtensionConverterManagerImpl, 是在openflowPluginProvider中new的 final ConvertorExecutor convertorExecutor) { // TODO: Use multipart writer provider from device context final MultipartWriterProvider multipartWriterProvider = MultipartWriterProviderFactory .createDefaultProvider(deviceContext); // create service instances final SalFlowServiceImpl salFlowService = new SalFlowServiceImpl(rpcContext, deviceContext, convertorExecutor); final FlowCapableTransactionServiceImpl flowCapableTransactionService = new FlowCapableTransactionServiceImpl(rpcContext, deviceContext); final SalAsyncConfigServiceImpl salAsyncConfigService = new SalAsyncConfigServiceImpl(rpcContext, deviceContext); final SalGroupServiceImpl salGroupService = new SalGroupServiceImpl(rpcContext, deviceContext, convertorExecutor); final SalMeterServiceImpl salMeterService = new SalMeterServiceImpl(rpcContext, deviceContext, convertorExecutor); // register routed service instances rpcContext.registerRpcServiceImplementation(SalEchoService.class, new SalEchoServiceImpl(rpcContext, deviceContext)); rpcContext.registerRpcServiceImplementation(SalFlowService.class, salFlowService); rpcContext.registerRpcServiceImplementation(FlowCapableTransactionService.class, flowCapableTransactionService); rpcContext.registerRpcServiceImplementation(SalAsyncConfigService.class, salAsyncConfigService); rpcContext.registerRpcServiceImplementation(SalMeterService.class, salMeterService); rpcContext.registerRpcServiceImplementation(SalGroupService.class, salGroupService); rpcContext.registerRpcServiceImplementation(SalTableService.class, new SalTableServiceImpl(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider)); rpcContext.registerRpcServiceImplementation(SalPortService.class, new SalPortServiceImpl(rpcContext, deviceContext, convertorExecutor)); rpcContext.registerRpcServiceImplementation(PacketProcessingService.class, new PacketProcessingServiceImpl(rpcContext, deviceContext, convertorExecutor)); rpcContext.registerRpcServiceImplementation(NodeConfigService.class, new NodeConfigServiceImpl(rpcContext, deviceContext)); rpcContext.registerRpcServiceImplementation(OpendaylightFlowStatisticsService.class, OpendaylightFlowStatisticsServiceImpl.createWithOook(rpcContext, deviceContext, convertorExecutor)); // register direct statistics gathering services rpcContext.registerRpcServiceImplementation(OpendaylightDirectStatisticsService.class, new OpendaylightDirectStatisticsServiceImpl(deviceContext.canUseSingleLayerSerialization() ? SingleLayerDirectStatisticsProviderInitializer .createProvider(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider) : MultiLayerDirectStatisticsProviderInitializer .createProvider(rpcContext, deviceContext, convertorExecutor, multipartWriterProvider))); // register flat batch services rpcContext.registerRpcServiceImplementation(SalFlatBatchService.class, new SalFlatBatchServiceImpl( new SalFlowsBatchServiceImpl(salFlowService, flowCapableTransactionService), new SalGroupsBatchServiceImpl(salGroupService, flowCapableTransactionService), new SalMetersBatchServiceImpl(salMeterService, flowCapableTransactionService) )); // register experimenter services rpcContext.registerRpcServiceImplementation(SalExperimenterMessageService.class, new SalExperimenterMessageServiceImpl(rpcContext, deviceContext, extensionConverterProvider)); rpcContext.registerRpcServiceImplementation(SalExperimenterMpMessageService.class, new SalExperimenterMpMessageServiceImpl(rpcContext, deviceContext, extensionConverterProvider)); //register onf extension bundles rpcContext.registerRpcServiceImplementation(SalBundleService.class, new SalBundleServiceImpl(new SalExperimenterMessageServiceImpl( rpcContext, deviceContext, extensionConverterProvider))); } |
2.3 StatisticsContext实例化服务
实例化StatisticsContext,会提供Statistics数据收集服务。当然这需要额外的性能消耗,我们可以关闭数据收集以提高性能,会在后续笔记具体展开。
在其instantiateServiceInstance方法中,会根据配置和Switch的特性,确定需要收集的数据类型,并最后调用gatherDynamicData
方法收集动态数据,当收集成功后回调InitialSubmitCallback
。
instantiateServiceInstance方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
@Override public void instantiateServiceInstance() { final List<MultipartType> statListForCollecting = new ArrayList<>(); // 根据device支持及config配置, 填入对应收集的数据类型. 意思应该是需要收集这些类型数据 if (devState.isTableStatisticsAvailable() && config.isIsTableStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPTABLE); } if (devState.isGroupAvailable() && config.isIsGroupStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPGROUPDESC); statListForCollecting.add(MultipartType.OFPMPGROUP); } if (devState.isMetersAvailable() && config.isIsMeterStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPMETERCONFIG); statListForCollecting.add(MultipartType.OFPMPMETER); } if (devState.isFlowStatisticsAvailable() && config.isIsFlowStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPFLOW); } if (devState.isPortStatisticsAvailable() && config.isIsPortStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPPORTSTATS); } if (devState.isQueueStatisticsAvailable() && config.isIsQueueStatisticsPollingOn()) { statListForCollecting.add(MultipartType.OFPMPQUEUE); } collectingStatType = ImmutableList.copyOf(statListForCollecting); // 根据设置的收集数据类型,进行数据收集(会请求switch), 成功后回调InitialSubmitCallback Futures.addCallback(gatherDynamicData(), new InitialSubmitCallback(), MoreExecutors.directExecutor()); } |
gatherDynamicData
方法收集动态数据:会判断switch是否在线(没有断连),然后根据刚刚在instantiateServiceInstance方法中设置的收集数据列表进行数据收集,会发请求到switch。在gatherDynamicData方法中,实现的是一次根据需要收集数据的类型,向Switch发送请求收集数据,并且在YANG中更新数据收集时间戳。(具体收集数据过程在另一篇笔记展开)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
private ListenableFuture<Boolean> gatherDynamicData() { // 数据收集开关 if (!isStatisticsPollingOn || !schedulingEnabled.get()) { LOG.debug("Statistics for device {} are not enabled.", getDeviceInfo().getNodeId().getValue()); return Futures.immediateFuture(Boolean.TRUE); } return this.lastDataGatheringRef.updateAndGet(future -> { // write start timestamp to state snapshot container // 写入收集开始时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang) StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceInfo, deviceContext); // recreate gathering future if it should be recreated final ListenableFuture<Boolean> lastDataGathering = Objects.isNull(future) || future.isCancelled() || future.isDone() ? Futures .immediateFuture(Boolean.TRUE) : future; // build statistics gathering future // 判断switch是否在线(没有断连),然后根据inita设置的收集数据列表进行数据收集 final ListenableFuture<Boolean> newDataGathering = collectingStatType.stream() .reduce(lastDataGathering, this::statChainFuture, (listenableFuture, asyn) -> Futures.transformAsync(listenableFuture, result -> asyn, MoreExecutors.directExecutor())); // write end timestamp to state snapshot container Futures.addCallback(newDataGathering, new FutureCallback<Boolean>() { @Override public void onSuccess(@Nonnull final Boolean result) { // 写入结束时间到 FlowCapableStatisticsGatheringStatus(flow-node-inventory.yang) StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, result); } @Override public void onFailure(final Throwable throwable) { if (!(throwable instanceof TransactionChainClosedException)) { StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceInfo, deviceContext, false); } } }, MoreExecutors.directExecutor()); return newDataGathering; }); } |
而回调对象InitialSubmitCallback中,当gatherDynamicData方法执行成功后会首先调用ContextChainHolderImpl.onMasterRoleAcquired
方法,且传入参数状态为INITIAL_GATHERING
,表示完成初始化的收集数据(第一次收集)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
private final class InitialSubmitCallback implements FutureCallback<Boolean> { @Override public void onSuccess(@Nullable final Boolean result) { contextChainMastershipWatcher .onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_GATHERING); // 如果使用ReconciliationFramework,在此先不会开启持续的数据收集服务 if (!isUsingReconciliationFramework) { continueInitializationAfterReconciliation(); } } @Override public void onFailure(@Nonnull final Throwable throwable) { contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, "Initial gathering statistics " + "unsuccessful: " + throwable.getMessage()); } } |
并且在这里会判断是否使用ReconciliationFramework。如果使用了,会调用continueInitializationAfterReconciliation
方法开启持续收集数据的服务。如果不使用,会持续收集信息。在调用方法中,会再次调用ContextChainHolderImpl.onMasterRoleAcquired
方法,传入参数状态为INITIAL_SUBMIT
。
需要注意,ReconciliationFramework是Openflowplugin额外扩展的实现用于Switch协商的应用,主要是提供有优先级的上层服务启动,是一个钩子感知底层ofp成为Switch的Master的一个实现,后面笔记会深入剖析。
在此InitialSubmitCallback
回调中如果使用ReconciliationFramework就不会立即开启持续收集数据,而是在ReconciliationFramework处理完后,才再调用continueInitializationAfterReconciliation
方法开启持续收集数据的服务。此处具体逻辑,可以在后面看到。
2.4 RoleContext实例化服务
初始化RoleContext服务,是向底层Switch发送消息,通知其当前控制节点成为其Master。具体实现中,当Openflow版本大于等于1.3,在这里会直接调用set-role rpc,通知Switch当前控制器节点是master(BECOMEMASTER)。
1 2 3 4 5 6 7 8 9 10 11 |
@Override public void instantiateServiceInstance() { /* 通知device成为master, 调用rpc set-role */ final ListenableFuture<RpcResult<SetRoleOutput>> future = sendRoleChangeToDevice(OfpRole.BECOMEMASTER); changeLastRoleFuture(future); // 回调contextChainHolderImpl.onMasterRoleAcquired() Futures.addCallback(future, new MasterRoleCallback(), MoreExecutors.directExecutor()); } |
完成set-role rpc调用成功后(BECOMEMASTER),会回调MasterRoleCallback
,最终会调用ContextChainHolderImpl.onMasterRoleAcquired
方法,且传入状态为MASTER_ON_DEVICE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
private final class MasterRoleCallback implements FutureCallback<RpcResult<SetRoleOutput>> { @Override public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) { // 回调,设置状态为MASTER_ON_DEVICE contextChainMastershipWatcher.onMasterRoleAcquired( deviceInfo, ContextChainMastershipState.MASTER_ON_DEVICE); LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo); } @Override public void onFailure(@Nonnull final Throwable throwable) { if (!(throwable instanceof CancellationException)) { contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory( deviceInfo, "Was not able to propagate MASTER role on device. Error: " + throwable.toString()); } } } |
3.控制器成为Switch的Master
实例化各个服务完成后,控制器正式成为Switch的Master。可以看到在各个context(device/rpc/statistics/role)初始化完成后,都会调用ContextChainHolderImpl.onMasterRoleAcquired
方法,且传入不同的状态。现在让我们来展开在onMasterRoleAcquired方法。onMasterRoleAcquired方法是判断Switch在控制节点完成各个服务实例化的核心。是Openflowplugin对上层应用提供触发钩子的核心方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Override public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) { Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) { if (contextChain.isMastered(mastershipState, true)) { Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), reconciliationFrameworkCallback(deviceInfo, contextChain), MoreExecutors.directExecutor()); } } else if (contextChain.isMastered(mastershipState, false)) { LOG.info("Role MASTER was granted to device {}", deviceInfo); ownershipChangeListener.becomeMaster(deviceInfo); deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); } }); } |
会根据是否使用了ReconciliationFramework,有两个判断方式ContextChain是否已经成为master。在这里我们可以回顾一下在StatisticsContext中实例化服务时,当使用ReconciliationFramework不会立即开启数据收集,即不会再次调用onMasterRoleAcquired方法并传入状态INITIAL_SUBMIT。
所以在onMasterRoleAcquired方法中有体现,在调用contextChain.isMastered
方法判断是否已经成为Master(各个服务实例化完成),在调用isMastered方法除了传入当前状态,还会传入第二个参数意味着是否在ReconciliationFramework的步骤中。
当使用了ReconciliationFramework且传入状态不是INITIAL_SUBMIT,证明是还在ReconciliationFramework的步骤中。当传入状态是INITIAL_SUBMIT,只会出现在调用StatisticsContextImpl.continueInitializationAfterReconciliation
方法,即不开启ReconciliationFramework(StatisticsContext中)情况,或者ReconciliationFramework处理完成后才会调用。
1 |
if (ownershipChangeListener.isReconciliationFrameworkRegistered() && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) |
当在ReconciliationFramework的步骤中,会传入第二个参数为true
,否则传入参数为false
。
3.1 判断控制器节点为Master
在contextChain.isMastered
方法中,会判断控制器节点是否成为Switch的Master。可以注意下面一个小细节,当在ReconciliationFramework步骤(传入参数true)时,如果已经是Master也不会马上改变ContextChain及Context状态为WORKING_MASTER,仅仅直接返回值为True表示isMaster,但是还没是WORKING_MASTER
。原因在于ReconciliationFramework会有额外处理,最后再会回调此方法,再设置为WORKING_MASTER
。需要结合后面笔记,这里算是埋下伏笔吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
@Override public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState, boolean inReconciliationFrameworkStep) { switch (mastershipState) { case INITIAL_SUBMIT: // statisticsContext LOG.debug("Device {}, initial submit OK.", deviceInfo); this.initialSubmitting.set(true); break; case MASTER_ON_DEVICE: // roleContext LOG.debug("Device {}, master state OK.", deviceInfo); this.masterStateOnDevice.set(true); break; case INITIAL_GATHERING: // statisticsContext LOG.debug("Device {}, initial gathering OK.", deviceInfo); this.initialGathering.set(true); break; case RPC_REGISTRATION: // rpcContext LOG.debug("Device {}, RPC registration OK.", deviceInfo); this.rpcRegistration.set(true); break; case INITIAL_FLOW_REGISTRY_FILL: // 是deviceContext.instantiateServiceInstance执行完传入状态 // Flow registry fill is not mandatory to work as a master LOG.debug("Device {}, initial registry filling OK.", deviceInfo); this.registryFilling.set(true); break; case CHECK: // no operation break; default: // no operation break; } // 几个context(deviceContext,rpcContext,statisticContext,roleContext)都初始化完成(各自instantiateServiceInstance执行完成), result才会true // ReconciliationFramework和initialSubmitting是互斥关系, 使用ReconciliationFramework,initialSubmitting就是false(可以看statisticsContextImpl中逻辑) final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get() && inReconciliationFrameworkStep || initialSubmitting.get(); if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) { // 不使用ReconciliationFramework情况下, deviceContext,rpcContext,statisticContext,roleContext都初始化完成后,就会进入此 LOG.info("Device {} is able to work as master{}", deviceInfo, registryFilling.get() ? "." : " WITHOUT flow registry !!!"); /* 1.设置contextChainState状态为WORKING_MASTER 2.设置各个context(deviceContext,rpcContext,statisticContext,roleContext)状态为WORKING_MASTER */ changeMastershipState(ContextChainState.WORKING_MASTER); } return result; } |
在isMastered方法中,会根据调用传入的状态,设置对应其类属性标识。且最都会一起判断各个标识位,当各个当各个context(device/rpc/statistics/role)都完成初始化,就会表示Openflowplugin中已经完成服务实例化,即已经成为Switch的Master,会返回true
。
关注一下判断Master逻辑:inReconciliationFrameworkStep和initialSubmitting是互斥关系(回顾上面的StatisticsContext流程)
1 |
final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get() && inReconciliationFrameworkStep || initialSubmitting.get(); |
最后如果不在ReconciliationFramework步骤中(第二个传入参数),会改变contextChain和各个context状态为WORKING_MASTER。
回到ContextChainHolderImpl.onMasterRoleAcquired
方法中:
1)如果使用ReconciliationFramework,且isMastered(方法返回true)情况下,会调用ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo)
方法。
且有额外的回调reconciliationFrameworkCallback,其会调用statisticsContext.continueInitializationAfterReconciliation
方法开启statisticsContext中的持续收集信息。注意在statisticsContext初始化时,会根据使用reconciliationFramework跳过开启持续收集信息,目的是在这里完成ReconciliationFramework调用后再开启。
2)不使用ReconciliationFramework时,且isMastered(方法返回true)情况下:会调用ownershipChangeListener.becomeMaster
;再调用deviceManager向inventory通知device node节点添加;
先给出结论,当完成Switch的ContextChain在leader节点上实例化后,会调用MastershipChangeServiceManager方法,其为ofp底层对上层应用提供的钩子触发服务。当实例化完成(isMasterd)后,触发上层应用。如果使用ReconciliationFramework,则调用becomeMasterBeforeSubmittedDS方法,否则调用becomeMaster方法。
由于篇幅问题,我们在下一笔记才继续深入MastershipChangeServiceManager。留下疑问:MastershipChangeServiceManager是如何作为钩子,为上层提供感知节点成为Switch的Master呢?
4.总结
在本文中,我可以了解到控制器节点作为Switch的Master选举是直接通过ODL controller底层提供的Cluster Singleton Service实现。
且我们深入看到在控制器节点选举成为Master后,会进行各个Context服务的实例化,包括Device,Rpc,Statistics,Role。只有当完成所有Context的实例化,Openflowplugin才能对北向应用提供此Switch的服务。
最后,提出一个观点:MastershipChangeServiceManager,其为上层感知节点成为Switch的Master提供钩子。让我们下篇再见!