作者简介:陈卓文,国内某游戏公司私有云团队开发者,主要从事SDN/NFV开发。
由于篇幅问题,我们将“Openflowplugin中Switch生命周期”这个大问题拆分为几个篇章:Switch生命周期对象ContextChain创建;控制节点的Master选举及ContextChain/Context服务实例化;MastershipChangeService以ReconciliationFramework;控制节点成为Slave;Switch下线过程。
本文为Openflowplugin(0.6.2)源码分析第三篇,也是Openflowplugin为上层北向应用提供服务的关键逻辑的开篇!
附:
第一篇:(一)ODL OpenflowPlugin启动流程源码分析
第二篇:(二)ODL Openflowplugin Switch连上控制器Handshake过程源码分析读者约定:基本掌握Opendaylight的思想/有一定实践经验,想要深入理解openflowplugin源码/想对openflowplugin源码修改。
回顾第二篇笔记,在Switch连上控制器Handshake完成后,会调用ContextChainHolderImpl.deviceConnected
方法,会为Switch创建其对应的ContextChain
对象则创建。
1 |
createContextChain(connectionContext); |
先给出总结:在Openflowplugin中,每个Switch都有唯一的ContextChain
对象,管理其在Openflowplugin的生命周期。
1.创建生命周期管理对象ContextChain
为连上的switch创建contextChain
对象,通过调用ContextChainHolderImpl.createContextChain
方法。在创建ContextChain前,会先为Switch创建四个对象DeviceContext、RpcContext、StatisticsContext、RoleContext,四个对象封装了Switch的四种类型操作:Device基本信息、Rpc请求、Statistics数据收集、Master/Slava角色调用。最后,将四个Context对象都传入ContextChain中管理。在下面会详细展开,每个步骤。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
void createContextChain(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); final DeviceContext deviceContext = deviceManager.createContext(connectionContext); deviceContext.registerMastershipWatcher(this); LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); /* 创建此connection的rpcContextImpl对象 */ final RpcContext rpcContext = rpcManager.createContext(deviceContext); rpcContext.registerMastershipWatcher(this); LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); /* 创建此connetion的statisticsContextImpl对象 */ final StatisticsContext statisticsContext = statisticsManager .createContext(deviceContext, ownershipChangeListener.isReconciliationFrameworkRegistered()); statisticsContext.registerMastershipWatcher(this); LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); /* 创建此connection的RoleContextImpl对象 会创建SalRoleServiceImpl对象(包含roleService对象) */ final RoleContext roleContext = roleManager.createContext(deviceContext); roleContext.registerMastershipWatcher(this); LOG.debug("Role" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); /* 1.创建ContextChainImpl对象 2.设置deviceRemovedHandler 3.添加contextImpl到索引 */ final ContextChain contextChain = new ContextChainImpl(this, connectionContext, executorService); contextChain.registerDeviceRemovedHandler(deviceManager); contextChain.registerDeviceRemovedHandler(rpcManager); contextChain.registerDeviceRemovedHandler(statisticsManager); contextChain.registerDeviceRemovedHandler(roleManager); contextChain.registerDeviceRemovedHandler(this); contextChain.addContext(deviceContext); //会context被装进GuardedContextImpl contextChain.addContext(rpcContext); contextChain.addContext(statisticsContext); contextChain.addContext(roleContext); // 索引contextChain contextChainMap.put(deviceInfo, contextChain); // 创建完contextChainImpl说明已经成功连上, 从connecting索引中去掉这设备 connectingDevices.remove(deviceInfo); LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); // 效果是:设置ConnectionAdapterImpl中设置packetIn filter为false, 上面创建deviceContext开始时会设置值为true, 为了创建contextChain时不要接受PacketIn? deviceContext.onPublished(); contextChain.registerServices(singletonServiceProvider); } |
ContextChainHolderImpl.createContextChain
方法是Switch连上控制器Handshake完成后的核心调用,这个方法主要逻辑:
1.1 创建DeviceContext
创建deviceContext,且传入自身对象(ContextChainHolderImpl)引用
1 2 |
final DeviceContext deviceContext = deviceManager.createContext(connectionContext); deviceContext.registerMastershipWatcher(this); |
创建deviceContext
处理:
- 设置
ConnectionAdapterImpl
对象中设置packetIn filter为true,目的是为了过滤后续过程中收到底层switch的packetIn message;- 注:可以看到在
ContextChainHolderImpl.createContextChain
方法后面会将packetIn filter恢复为false
- 注:可以看到在
- 创建
OutboundQueueHandler
对象并传入对象引用到connectionAdapterImpl
和connectionContext
OutboundQueueHandler
对象用于Packet output先到Queue,再到ConnectionAdapterImpl
对象。另外一篇:《Openflowplugin outbound机制》会展开,在此不深入;
- 创建
DeviceContextImpl
对象 - 创建
OpenflowProtocolListenerFullImpl
对象,并传入引用到connectionAdapterImpl
对象(setMessageListener
和setAlienMessageListener
),用于处理底层switch传递给控制器的所有message。- 注意:在上两篇笔记中,当switch连上控制器时调用
ConnectionManagerImpl.onSwitchConnected()
方法,会同样会传入一个MessageListener对象OpenflowProtocolListenerInitialImpl
,此对象仅能出来Hello message,在Handshake阶段使用。而此时Handshake已经结束,重新传入OpenflowProtocolListenerFullImpl
用于处理所有message。
- 注意:在上两篇笔记中,当switch连上控制器时调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
@Override public DeviceContext createContext(@Nonnull final ConnectionContext connectionContext) { LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), connectionContext.getDeviceInfo().getNodeId()); // 过滤掉packetIn消息 connectionContext.getConnectionAdapter().setPacketInFiltering(true); final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion()); connectionContext.setOutboundQueueProvider(outboundQueueProvider); // 注册OutboundQueueHandler,用于output先到OutboundQueueHandler再会到ConnectionAdapterImpl final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration = connectionContext.getConnectionAdapter().registerOutboundQueueHandler( outboundQueueProvider, config.getBarrierCountLimit().getValue(), TimeUnit.MILLISECONDS.toNanos(config.getBarrierIntervalTimeoutLimit().getValue())); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); final DeviceContext deviceContext = new DeviceContextImpl( connectionContext, dataBroker, messageSpy, translatorLibrary, convertorExecutor, config.isSkipTableFeatures(), hashedWheelTimer, config.isUseSingleLayerSerialization(), deviceInitializerProvider, config.isEnableFlowRemovedNotification(), config.isSwitchFeaturesMandatory(), contextChainHolder); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); deviceContext.setNotificationPublishService(notificationPublishService); // 索引deviceContext deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext); updatePacketInRateLimiters(); // 创建message监听器 final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionContext.getConnectionAdapter(), deviceContext); connectionContext.getConnectionAdapter().setMessageListener(messageListener); connectionContext.getConnectionAdapter().setAlienMessageListener(messageListener); return deviceContext; } |
1.2 创建RpcContext
创建rpcContext,且传入自身对象(ContextChainHolderImpl)引用
1 2 |
final RpcContext rpcContext = rpcManager.createContext(deviceContext); rpcContext.registerMastershipWatcher(this); |
创建rpcContext处理:
- 创建
RpcContextImpl
对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public RpcContext createContext(final @Nonnull DeviceContext deviceContext) { final RpcContextImpl rpcContext = new RpcContextImpl( rpcProviderRegistry, config.getRpcRequestsQuota().getValue(), deviceContext, extensionConverterProvider, //ExtensionConverterManagerImpl, 是在openflowPluginProvider中new的 convertorExecutor, notificationPublishService, config.isIsStatisticsRpcEnabled()); // 索引rpcContext contexts.put(deviceContext.getDeviceInfo(), rpcContext); return rpcContext; } |
1.3 创建StatisticsContext
创建statisticsContext,且传入自身对象(ContextChainHolderImpl)引用
1 2 |
final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext, ownershipChangeListener.isReconciliationFrameworkRegistered()); statisticsContext.registerMastershipWatcher(this); |
创建statisticsContext处理:
- 创建
MultipartWriterProvider
对象- 注意这里传入了deviceContext,是为了write到device的operational yang。
deviceContextImpl
中有写入device对应yang需要的TransactionChain
- 注意这里传入了deviceContext,是为了write到device的operational yang。
- 创建
StatisticsContextImpl
对象- 传入了
MultipartWriterProvider
对象,目的是StatisticsContextImpl
收集的数据可以写入device operational yang节点
- 传入了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
@Override public StatisticsContext createContext(@Nonnull final DeviceContext deviceContext, final boolean useReconciliationFramework) { // 创建MultipartWriterProvider final MultipartWriterProvider statisticsWriterProvider = MultipartWriterProviderFactory .createDefaultProvider(deviceContext); final StatisticsContext statisticsContext = new StatisticsContextImpl<>( deviceContext, converterExecutor, statisticsWriterProvider, executorService, config, !isStatisticsFullyDisabled && config.isIsStatisticsPollingOn(), useReconciliationFramework); contexts.put(deviceContext.getDeviceInfo(), statisticsContext); return statisticsContext; } |
1.4 创建RoleContext
创建roleContext,且传入自身对象(ContextChainHolderImpl)引用
1 2 |
final RoleContext roleContext = roleManager.createContext(deviceContext); roleContext.registerMastershipWatcher(this); |
创建roleContext处理:
- 创建
RoleContextImpl
对象; - 创建
SalRoleServiceImpl
对象,传入RoleContextImpl
。用于后续步骤,设置Switch的Master/Slave role。
1 2 3 4 5 6 7 8 9 10 11 12 |
@Override public RoleContext createContext(@Nonnull final DeviceContext deviceContext) { final DeviceInfo deviceInfo = deviceContext.getDeviceInfo(); final RoleContextImpl roleContext = new RoleContextImpl( deviceContext.getDeviceInfo(), timer, CHECK_ROLE_MASTER_TIMEOUT, config); // 给roleContext设置salRoleService. 创建salRoleServiceImpl会创建RoleService roleContext.setRoleService(new SalRoleServiceImpl(roleContext, deviceContext)); contexts.put(deviceInfo, roleContext); return roleContext; } |
1.5 创建ContextChain
在创建4个context后,创建ContextChainImpl
对象。然后向ContextChainImpl
对象注册/加入各个对象:
1.且给ContextChainImpl
对象注册registerDeviceRemovedHandler,用于后续Switch下线过程清除Manger中各个Context的索引。
2.给ContextChainImpl
对象添加context(addContext
)。
3.最后将ContextChainImpl
对象保存到ContextChainHolderImpl.contextChainMap
中,相当于一个索引。ContextChainHolderImpl
维护了各个Switch的ContextChain对象。
1 2 3 4 5 6 7 8 9 10 11 12 |
final ContextChain contextChain = new ContextChainImpl(this, connectionContext, executorService); contextChain.registerDeviceRemovedHandler(deviceManager); contextChain.registerDeviceRemovedHandler(rpcManager); contextChain.registerDeviceRemovedHandler(statisticsManager); contextChain.registerDeviceRemovedHandler(roleManager); contextChain.registerDeviceRemovedHandler(this); contextChain.addContext(deviceContext); contextChain.addContext(rpcContext); contextChain.addContext(statisticsContext); contextChain.addContext(roleContext); // 索引contextChain contextChainMap.put(deviceInfo, contextChain); |
注意addContext
是把传入的deviceContext/rpcContext/statisticsContext/roleContext封装到GuardedContextImpl
对象。GuardedContextImpl
对象仅做多一层封装,会记录具体Context的状态等。
1 2 3 4 |
@Override public <T extends OFPContext> void addContext(@Nonnull final T context) { contexts.add(new GuardedContextImpl(context)); } |
最后,完成创建ContextChainImpl
对象后,将设备从connectingDevices
索引中删除。表示已经在控制器connected。
1 |
connectingDevices.remove(deviceInfo); |
且设置ConnectionAdapterImpl中设置packetIn filter为false:
1 2 3 4 |
deviceContext.onPublished(); //onPublished方法具体: //primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); |
注意在上面创建deviceContext时会设置为ture。表示在contextChain创建过程中不处理packetIn message,完成创建contextChain后表示connect完成。
1.6 开始选举Master/Slave
上文基本展开了ContextChainHolderImpl.createContextChain
方法详细讲解,唯独缺少最后一行代码,这也是最关键一行!调用ContextChain的registerServices方法:
1 |
contextChain.registerServices(singletonServiceProvider); |
在展开这一行代码的逻辑之前,我们先回顾一下全文!可以看到,在Switch完成Handshake后,Openflowplugin会为switch创建ContextChain对象(包括各个Context)。考虑到多个控制器情况下,当Switch设置多个控制器(比如ovs:set-controller),那么哪个控制器能够对Switch操作,比如下发流表。因为在每个控制器,Openflowplugin都会为switch创建ContextChain对象。
Openflow协议,支持多个控制器,且控制器有角色有三类master、slave、equal。为了控制层高可用使用master/slave模式;为了控制器负载均衡使用equal模式;
在此,我们不讨论equal模式,我们讨论Master/slave模型。Openflowplugin默认是以集群模型实现的,假设存在三个控制器节点,那么就会选举出一个节点作为某一个Switch的Master(不同Switch,Master不一定相同);如果Openflowplugin作为单点控制器运行,那么它就是所有连上的Switch的Master。
假设在三节点的集群情况下,Switch连上所有控制节点,此时每个控制节点都有Switch的ContextChain对象,那么哪个节点是该Switch的Master?关键就是上面的这一行代码!由于篇幅问题,我们在下篇展开!
2.总结
在Switch完成Handshake后,Openflowplugin会为Switch创建各个Context对象(Device/Rpc/Statistics/Role),以及ContextChain对象。