作者简介:鸿哥,硕士研究生,国内某通信设备公司软件研发工程师,主要从事云计算、SDN技术开发
一、架构及操作介绍
OVSDB南向插件主要负责与底层设备(OVSDB node)打交道,采用OVSDB管理协议,每一个底层设备都被当成一个OVSDB node,而OVSDB南向插件被当成OVSDB manager,对于OVSDB manager而言,OVSDB manager与OVSDB node的连接有主动和被动两种方式。比如,在ovs设备上输入命令:ovs-vsctl set-manager tcp:192.168.7.103:6640 表示OVSDB manager处于被动连接方式,而输入命令:ovs-vsctl set-manager ptcp:6640 表示OVSDB manager处于主动连接方式。下面分别对于上述两个命令做介绍:
1、ovs上输入ovs-vsctl set-manager tcp::6640 OVSDB manager采用被动模式,在odl控制器当中会发生以下一些系列操作:
- OVSDB node主动的连接OVSDB manager
- OVSDB Southbound plugin接受连接之后向md-sal的operational数据库添加OVSDB node节点
- OVSDB Southbound plugin发出请求,以获取OVSDB node的Schema和database
- OVSDB Southbound plugin使用Schema和database信息构造一个monitor request(注册监听器,用于监听OVSDB node数据库的变化,一旦OVSDB node上数据库有变化,马上会通过rpc的方式回调,这样OVSDB Southbound plugin就能实时知道OVSDB node上的数据库变化)。
2、ovs上输入ovs-vsctl set-manager ptcp:6640 OVSDB manager采用主动模式
- 1) 通过北向RESTCONF接口或者其他的OpenDaylight 插件添加一个OVSDB node节点,比如通过如下的restconf接口添加一个OVSDB node
- 因为在OVSDB Southbound plugin中监听了md-sal的configuration数据库,所以上一个步骤的添加数据操作被OVSDB Southbound plugin到了
- OVSDB Southbound plugin主动地连接OVSDB node,使用的是IP和PORT
- OVSDB Southbound plugin同时将OVSDB node数据添加到md-sal的operational数据库
- OVSDB Southbound plugin发出请求,以获取OVSDB node的Schema和database
- OVSDB Southbound plugin使用Schema和database信息构造一个monitor request(注册监听器,用于监听OVSDB node数据库的变化,一旦OVSDB node上数据库有变化,马上回通过rpc的方式回调,这样OVSDB Southbound plugin就能实时知道OVSDB node上的数据库变化)。
3、OVSDB manager主动连接模式下,添加bridge或者port,OVSDB Southbound plugin操作流程为:
- 通过北向RESTCONF接口或者其他的OpenDaylight 插件添加一个bridge或者port,其实修改是的config下的数据库
- OVSDB Southbound plugin因为注册了数据改变监听器,所以能够收到config下的数据改变通知
- OVSDB Southbound plugin构造transactions去更新OVSDB node的对应ovsdb数据库信息。
- OVSDB node反馈数据更新结果
- OVSDB Southbound plugin映射修改信息到md-sal的operational数据库
4、OVSDB manager被动连接模式下,接受OVSDB node数据库数据修改的通知
- 假如 OVSDB node数据库数据被修改,比如通过采用ovs-vsctl set等命令在ovs命令行修改数据库
- OVSDB node会构造一个数据修改的通知
- OVSDB Southbound plugin接收数据修改的通知之后相对于的修改md-sal的operational数据库
二、Yang 数据模型
在ovsdb-release-lithium-sr3\southbound\southbound-api当中有ovsdb.yang文件,该文件中定义了大部分ovsdb的数据表信息,但有些odl认为不关心的数据表没有定义,具体定义的各个数据信息可以参考/usr/share/openvswitch/vswitch.ovsschema的schema,或者通过
1 2 |
ovs-vsctl list open_vswitch ovs-vsctl list bridge |
进行查看。
三、相关依赖
Ovsdb-Southbound模块是依赖library模块进行ovsdb网络通信的,library尽量做到只跟ovsdb协议有关,与实际的ovsdb交互无关,包括序列化协议指令、调用底层netty框架等等。
四、代码实现
Ovsdb-Southbound的入口在southbound-impl文件夹的SouthboundImplModule.java文件的createInstance函数,在此函数当中new出一个SouthboundProvider对象,并在该类当中的onSessionInitiated进行各项初始化,注意在Ovsdb-Southbound以实现基于odl控制器集群代码编写,所以见到EntityOwnershipState不要感到奇怪。
在onSessionInitiated函数当中,先是new出TransactionInvokerImpl辅助类,主要是将Transaction的读写操作进行下封装,将很多读写操作放入到一个队列中处理,实现TransactionChain事物链。然后new出OvsdbConnectionManager,在OvsdbConnectionManager类中,主要负责通过ovsdb协议维护交换机与控制器的tcp连接。
由于交换机设备与控制器的连接既可以采用主动模式也可以采用被动模式,所以Ovsdb-Southbound在处理与设备的ovsdb连接当中实现了两套连接代码,在onSessionInitiated新建的OvsdbDataChangeListener就是专门处理控制器主动连接ovsdb设备的代码,可以简单先分析下,在类OvsdbDataChangeListener当中直接监听了CONFIGURATION下的/NetworkTopology/Topology/Node数据树,当在控制器上采用restconf下发连接ovsdb设备的命令之后,会引起/NetworkTopology/Topology/Node数据树的变化,进而进入到OvsdbDataChangeListener的onDataChanged函数。
在此函数中处理了odl控制器主动连接设备,更新数据,断开连接等等
接下来就是注册ovsdb的ovsdb-southbound-provider候选人,一旦判断当前控制器的ownershipState数据就绪,并且集群已经有主控制器(但不是自己)之后,开始注册ovsdb连接监听器,此时odl控制器是以被动形式等待设备来连接控制器。否则等待SouthboundPluginInstanceEntityOwnershipListener的ownershipChanged事件。
此时new出来的OvsdbConnectionService是在ovsdb的library模块,需要说明的是library模块作为ovsdb协议的抽象,包装了很多处理ovsdb协议细节类,调用者只需要传入具体如何建立连接的OvsdbConnectionManager对象给它即可,然后调用
ovsdbConnection.startOvsdbManager(SouthboundConstants.DEFAULT_OVSDB_PORT);开始注册建立tcp连接的handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public synchronized boolean startOvsdbManager(final int ovsdbListenPort) { if (!singletonCreated) { new Thread() { @Override public void run() { ovsdbManager(ovsdbListenPort); } }.start(); singletonCreated = true; return true; } else { return false; } } |
从上述代码可以看出新建连接是单例形式,即注册的控制器与设备连接handler(被动模式)只能有一个,同时还支持SSL加密安全。
一旦有设备主动连接过来,handleNewPassiveConnection函数就会被调用,最终会进入listener.connected(client);此时建立的连接由library模块通告给Ovsdb-Southbound模块(也就是刚刚注册给library的OvsdbConnectionManager对象)。
转到OvsdbConnectionManager类,connected函数被调用,
1 2 3 4 5 6 |
public void connected(@Nonnull final OvsdbClient externalClient) { OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient); // Register Cluster Ownership for ConnectionInfo registerEntityForOwnership(client); } |
对于OvsdbConnectionManager来说,tcp连接实例OvsdbConnectionInstance已经拿到,同时还会进行createTransactInvokers。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void createTransactInvokers() { if (transactInvokers == null) { try { transactInvokers = new HashMap<>(); List<String> databases = getDatabases().get(); for (String database : databases) { DatabaseSchema dbSchema = getSchema(database).get(); if (dbSchema != null) { transactInvokers.put(dbSchema, new TransactInvokerImpl(this,dbSchema)); } } } catch (InterruptedException | ExecutionException e) { LOG.warn("Exception attempting to createTransactionInvokers {}: {}",connectionInfo,e); } } } |
在其中的getDatabases函数当中,其实是调用library当中的list_dbs协议指令(已被序列后的RPC指令)。
1 2 3 4 |
@Override public ListenableFuture<List<String>> getDatabases() { return rpc.list_dbs(); } |
并且调用getSchema获取ovsdb数据表。最后调用registerEntityForOwnership函数将当前odl控制器连接实例ovsdbConnectionInstance注册为odl集群连接的候选人。
一旦ovsdbConnectionInstance的集群选举出当前odl控制器的ovsdbConnectionInstance实例为主连接,则进入OvsdbConnectionManager的handleOwnershipChange函数,并调用ovsdbConnectionInstance.registerCallbacks()函数,此函数很关键。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public void registerCallbacks() { if ( this.callback == null) { if (this.initialCreateData != null ) { this.updateConnectionAttributes(); } try { //获取Database List<String> databases = getDatabases().get(); if (databases != null && !databases.isEmpty()) { this.callback = new OvsdbMonitorCallback(this,txInvoker); for (String database : databases) { //获取DatabaseSchema DatabaseSchema dbSchema = getSchema(database).get(); if (dbSchema != null) { //下发监控所有dbSchema表的monitor指令 monitorAllTables(database, dbSchema); } else { LOG.warn("No schema reported for database {} for key {}",database,connectionInfo); } } } else { LOG.warn("No databases reported for key {}",connectionInfo); } } catch (InterruptedException | ExecutionException e) { LOG.warn("Exception attempting to registerCallbacks {}: {}",connectionInfo,e); } } } |
在上述monitorAllTables当中,就是下发监听数据库所有表的monitor指令,这样一旦设备上的ovsdb数据库发生变化,就会自动以update的形式上报给控制器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
private void monitorAllTables(String database, DatabaseSchema dbSchema) { Set<String> tables = dbSchema.getTables(); if (tables != null) { List<MonitorRequest> monitorRequests = Lists.newArrayList(); for (String tableName : tables) { GenericTableSchema tableSchema = dbSchema.table(tableName, GenericTableSchema.class); Set<String> columns = tableSchema.getColumns(); MonitorRequestBuilder<GenericTableSchema> monitorBuilder = MonitorRequestBuilder.builder(tableSchema); for (String column : columns) { monitorBuilder.addColumn(column); } monitorRequests.add(monitorBuilder.with(new MonitorSelect(true, true, true, true)).build()); } this.callback.update(monitor(dbSchema, monitorRequests, callback),dbSchema); } else { LOG.warn("No tables for schema {} for database {} for key {}",dbSchema,database,connectionInfo); } } |
在这this.callback.update函数当中有点迷惑性,这里深入分析下,首先是tables是从database当中取出来的,目的就是为了下面的dbSchema.table拿到column(列字段),然后monitorRequests是需要监控的table请求指令列表。而monitor函数当中做了两件事,一是调用registerCallback进行了回调注册,另外一件事是拿回monitor响应result并进行数据格式转换transformingCallback。
重点看registerCallback(monitorHandle, callback, dbSchema);函数,此函数当中将monitorHandle、callback、schema放入hashmap,并且使用setupUpdateListener函数注册设备发送update事件的监听器,一旦有设备发出update事件上来,就会调用OvsdbRPC.Callback当中的update函数,进而使引起monitorCallBack.update(updates, callbackContext.schema);的调用。
由monitorCallBack.update(updates, callbackContext.schema);进入到OvsdbMonitorCallback类的update函数,所以会调用
txInvoker.invoke(new OvsdbOperationalCommandAggregator(key, result, dbSchema));该函数只是将该消息丢给TransactionInvokerImpl类的队列,最后由TransactionInvokerImpl类的线程run函数进行处理,比如写入datastore数据库等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public void run() { while (runTask.get()) { forgetSuccessfulTransactions(); try { List<TransactionCommand> commands = extractCommands(); for (TransactionCommand command: commands) { final ReadWriteTransaction transaction = chain.newReadWriteTransaction(); recordPendingTransaction(command, transaction); command.execute(transaction); Futures.addCallback(transaction.submit(), new FutureCallback<Void>() { @Override public void onSuccess(final Void result) { successfulTransactionQueue.offer(transaction); } @Override public void onFailure(final Throwable throwable) { // NOOP - handled by failure of transaction chain } }); } } catch (Exception e) { LOG.warn("Exception invoking Transaction: ", e); } } } |
五、总结
Ovsdb协议用于设备与odl控制器交互,比如创建bridge、创建port等,目前还有些ovsdb字段并未反馈到odl控制器,可以按照需要自行添加。