上一篇(https://www.sdnlab.com/16316.html)简单分析了openflowjava到openflowplugin(介绍的hello消息),本篇介绍如何从openflowplugin到openflowjava,即介绍flowmod消息。
一、ModelDrivenSwitchImpl核心类
在上篇中提到了类ModelDrivenSwitchImpl。可以说这个类是openflow交换机的抽象。几乎所有操作都需要调用类的方法,比如:addflow,addgroup等。简单看一下构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext sessionContext) { super(identifier, sessionContext); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService(); version = sessionContext.getPrimaryConductor().getVersion(); final NotificationProviderService rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService(); //绑定session和设置服务 这里的session就是上一篇提到的session,此处会把session // 与某一个底层交换机进行绑定 rpcTaskContext = new OFRpcTaskContext(); rpcTaskContext.setSession(sessionContext); rpcTaskContext.setMessageService(messageService); rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService); //超时时间默认是1分钟 rpcTaskContext.setMaxTimeout(maxTimeout); rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); //设置rpc线程池,这个池中有10个线程,目的下发流表等 rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool()); rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager().getMessageSpy()); } |
二、Flowmod下发流程
Flowmod分为三种,add、update、remove。这三种类型操作类似,这里以addflow为例进行分析。
代码addflow非常简单,但是有几点需要说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
@Override public Future<RpcResult<AddFlowOutput>> addFlow(final AddFlowInput input) { LOG.debug("Calling the FlowMod RPC method on MessageDispatchService"); // use primary connection SwitchConnectionDistinguisher cookie = null; //构造flowmod和barrier消息,表现形式是task OFRpcTask<AddFlowInput, RpcResult<UpdateFlowOutput>> task = OFRpcTaskFactory.createAddFlowTask(rpcTaskContext, input, cookie); //将task放到线程中执行 这个地方线程就是构造函数中提到的那个具有10个线程的线程池 ListenableFuture<RpcResult<UpdateFlowOutput>> result = task.submit(); // result是UpdateFlowOutput类型,返回值要求AddFlowOutput,因此需要将result // 进行转换。无论是updateflow,removeflow都需要进行一次转换。 // 此处的Future.transform是一个接口,我们只需要实现回调函数即可第二个参数 return Futures.transform(result, OFRpcFutureResultTransformFactory.createForAddFlowOutput()); } public static Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>> createForAddFlowOutput() { return new Function<RpcResult<UpdateFlowOutput>,RpcResult<AddFlowOutput>>(){ @Override public RpcResult<AddFlowOutput> apply(RpcResult<UpdateFlowOutput> input){ // ODL在代码中进行封装统一,所有flowmod操作(Add,Update,Remove),结果均以 // 都是以UpdateFlowOutput形式返回 // 在调用方法getResult时候,如果结果已经存在则立即返回,如果没有结果则进行等待 UpdateFlowOutput updateFlowOutput = input.getResult(); // 将updateflowoutput转成AddFlowOutput AddFlowOutputBuilder addFlowOutput = new AddFlowOutputBuilder(); addFlowOutput.setTransactionId(updateFlowOutput.getTransactionId()); AddFlowOutput result = addFlowOutput.build(); RpcResult<AddFlowOutput> rpcResult = assembleRpcResult(input, result); LOG.debug(MSG_ADD_FLOW_RPC); return rpcResult; } }; } |
上面简单分析了一下最外层逻辑处理,归为两点:
1)创建返回类型UpdateOutput的task并提交到线程池。
2) 接收返回类型UpdateOutput的future并且转成对应的Output,如AddOutput、RemoveOutput。
下面开始flowmod下发流程分析,逻辑层次比较深入还是先放一张流程图:
由上图可知,flowmod下发流程层次很深,这里只分析一下几个方法:
1、toFlowModInput方法,该方法可以说是将业务数据转成标准flowmod消息入口。如果需要扩展flowmod消息,则需要从这个函数着手修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
private static FlowModInputBuilder toFlowModInput(Flow flow, short version, BigInteger datapathid) { // 此处代码采用的设计模式--建造者设计模式 FlowModInputBuilder flowMod = new FlowModInputBuilder(); // 填充flowmod消息头 salToOFFlowCookie(flow, flowMod); salToOFFlowCookieMask(flow, flowMod); salToOFFlowTableId(flow, flowMod); salToOFFlowCommand(flow, flowMod); salToOFFlowIdleTimeout(flow, flowMod); salToOFFlowHardTimeout(flow, flowMod); salToOFFlowPriority(flow, flowMod); salToOFFlowBufferId(flow, flowMod); salToOFFlowOutPort(flow, flowMod); salToOFFlowOutGroup(flow, flowMod); // convert and inject flowFlags FlowFlagReactor.getInstance().convert(flow.getFlags(), version, flowMod, datapathid); // 下面的match域和instructions域往往是我们最关心的因为扩展flowmod消息就是修改这两个地方 这个地方需要特别注意!!! MatchReactor.getInstance().convert(flow.getMatch(), version, flowMod, datapathid); if (flow.getInstructions() != null) { flowMod.setInstruction(toInstructions(flow, version, datapathid)); flowMod.setAction(getActions(version, datapathid, flow)); } flowMod.setVersion(version); return flowMod; } |
代码MatchReactor.getInstance().convert(…),最终会调用到MatchConvertorImpl.convert这个函数中,就有我们常见的协议字段,如:inport,ipv4等。这个函数主要是组建oxm。这些内容没有什么逻辑,只要按照标准协议填写即可,不再累赘阐述。
方法toInstructions,getActions也类似,按照标准协议填写即可。
以上内容主要想说明,当我们扩展flowmod消息的时候,能够知道在哪里修改即可。
2、方法MessageDispatchServiceImpl.flowMod
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
@Override public Future<RpcResult<UpdateFlowOutput>> flowMod(……) { LOG.debug("Calling OFLibrary flowMod"); Future<RpcResult<Void>> response = null; try {// 下发flowmod,返回值为Future,这个future是一个RPC,在下面代码会进行解析 response = getConnectionAdapter(cookie).flowMod(input); } catch (ConnectionException e) { return RpcResultUtil.getRpcErrorFuture(e); } // 将future放到线程中,如果接收到返回值,则调用apply方法。Apply方法会把RPC结果转成UpdateOutput类型 ListenableFuture<RpcResult<UpdateFlowOutput>> xidResult = Futures.transform( JdkFutureAdapters.listenInPoolThread(response), new Function<RpcResult<Void>, RpcResult<UpdateFlowOutput>>() { @Override public RpcResult<UpdateFlowOutput> apply(final RpcResult<Void> inputArg) { // 实例化UpdateFlowOutput,将RPC结果放到这个实例中,返回给业务层,由业务进行后续处理(如果是addflow则后续处理就是把UpdateFlowOutput转成AddFlowOutput) UpdateFlowOutputBuilder flowModOutput = new UpdateFlowOutputBuilder(); BigInteger bigIntXid = BigInteger.valueOf(input.getXid()); flowModOutput.setTransactionId(new TransactionId(bigIntXid)); UpdateFlowOutput result = flowModOutput.build(); RpcResult<UpdateFlowOutput> rpcResult = RpcResultBuilder .<UpdateFlowOutput>status(inputArg.isSuccessful()) .withResult(result).withRpcErrors(inputArg.getErrors()) .build(); return rpcResult; } }); return xidResult; } |
3、简单分析netty中write方法,此方法功能是将数据包发到网络中。
在说write方法之前,简单说一下netty背景知识吧。netty为了降低应用程开发帮我们做了很多东西,比如说常见的tcp粘包、丢包问题,最简编解码,异步IO等。应用程序基本不要关心这些底层内容,不像C/C++语言开发时,需要由应用程序自己处理粘包,丢包以及自己构造异步IO,这些内容如果处理不好,会带来很大问题。
然而关于编解码问题,netty只是做了最基本的序列化工作,比如说分割符,定长编解码等,对于咱们Openflow协议这种编解码,netty是没有提供的,而且也不应该由netty提供。
我们来看一下write方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; // byte数组用于保存序列化后数据 try { if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); try { encode(ctx, cast, buf); // encode这个接口用于序列化操作,也就是有openflowjava实现,实际指向为OFEncoder.encode。 } finally { ReferenceCountUtil.release(cast); } if (buf.isReadable()) { ctx.write(buf, promise); // 如果上面序列化操作正确,则会进入此分支,netty会把bytebuf放到队列中,等待时机将数据发送到网络中。 } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } …. } |
针对上面的ctx.write(buf, promise);这行代码再多说两句,这个函数只是将消息放到netty队列中,择机发到网络中。那么择机发送是什么时候发送呢?一般有两个条件:
1、队列已满。
2、超时。
然而应用程序若想立即将数据包发送到网络中则需要调用方法writeAndFlush。在这里讨论这些内容,主要因为某些应用要求实时性比较高的时候,所以可以考虑这种方式。
三、再谈序列化OFEncoder.encode
在上一篇中,我们提到了序列化,这个在简单过一下吧。OFEncoder是序列化入口,所有序列化操作都需要调用该类的方法,或者是调用encode方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Override protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out) throws Exception { LOGGER.trace("Encoding"); try { // 根据报文内容,在找序列化工厂中查找对应序列化对象,然后进行序列化操作。 // 其中参数wrapper是就是来自netty流水线处理后的数据 // 参数out就是用于保存序列化后数据 serializationFactory.messageToBuffer(wrapper.getMsg().getVersion(), out, wrapper.getMsg()); if(wrapper.getMsg() instanceof FlowModInput){ statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT); } statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS); } catch(Exception e) { LOGGER.warn("Message serialization failed ", e); statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL); Future<Void> newFailedFuture = ctx.newFailedFuture(e); wrapper.getListener().operationComplete(newFailedFuture); out.clear(); return; } } |
关于序列化工厂注册内容,请参考上一篇文章(https://www.sdnlab.com/16316.html)。
四、Session存储关系图
控制器所有操作都是建立在session基础之上的,换句话说就是建立在连接之上的,下图就是session相关组织结构图。
1.单例模式保存所有session对象
SessionManagerOFImpl是一个单例,保存当前有效session会话。从hello握手成功session一直存在,直到交换机与控制器断链后。
2.SessionListener有两个方法,onSessionAdded,onSessionRemoved。这两方法从便可知道用于管理添加、删除session对象。这两个方法主要是操作HashMap即ConcurrentHashMap<SwitchSessionKeyOF, SessionContext> sessionLot。这个hashmap中每一个元素都是一个有效的连接,也就是说有一个元素存在就对应着一个交换机。
3.Hashmap中键值value是SessionContext,这个上下文包含了一个ModelDrivenSwitchImpl,实则是ODL对交换机的抽象。也就是说ODL用类ModelDrivenSwitchImpl来表示一个交换机。抽象出来的ModelDrivenSwitchImpl进行再次包装,插入到SessionContext的notifaction队列当中。以便能够处理后续的notification消息。
4.在ModelDrivenSwitchImpl中有一个属性OFRpcTaskContext,这个属性是关联SessionContext以及Notification服务等。控制器给交换机发送的消息中大部分都是采用RPC形式,所以rpcTaskContext是至关重要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
protected ModelDrivenSwitchImpl(final NodeId nodeId, final InstanceIdentifier<Node> identifier, final SessionContext sessionContext) { super(identifier, sessionContext); this.nodeId = nodeId; messageService = sessionContext.getMessageDispatchService(); version = sessionContext.getPrimaryConductor().getVersion(); final NotificationProviderService rpcNotificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService(); // rpcTaskContext实例化 rpcTaskContext = new OFRpcTaskContext(); rpcTaskContext.setSession(sessionContext); // 关联session rpcTaskContext.setMessageService(messageService); //关联消息分发服务 //关联notifaction服务 rpcTaskContext.setRpcNotificationProviderService(rpcNotificationProviderService); rpcTaskContext.setMaxTimeout(maxTimeout); rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); //设置线程池,默认线程池中有10个线程 rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool()); rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager().getMessageSpy()); } |
以上全部内容基本上(加上前一篇)就是ODL代码从上到下,从下到上,openflow消息处理流程。有些内容分析不到位,请各位网友多多批评指出,欢迎大家及时讨论。
作者简介:
徐小冰:毕业于河北大学,主要从事嵌入式软件开发,虚拟化,SDN。目前基于ODL和Open vSwitch进行二次开发,希望与广大网友一起探讨学习。作者系OpenDaylihgt群(194240432)资深活跃用户,@IT难人
--------------华丽的分割线------------------
本文系《SDNLAB原创文章奖励计划》投稿文章,该计划旨在鼓励广大从业人员在SDN/NFV/Cloud网络领域创新技术、开源项目、产业动态等方面进行经验和成果的文字传播、分享、交流。有意向投稿的同学请通过官方唯一指定投稿通道进行文章投递,投稿细则请参考《SDNLAB原创文章奖励计划》