OpenDaylight系列文章(八):深入理解MD-SAL(源码篇)


前面我们已经创建过RPC和Notification的小示例,本文将结合ODL Carbon版本的源码,带您深入理解这些示例背后的原理。DataStore将在后续的文章中做深入剖析,本文暂不介绍。

## Blueprint ##
前面的文章中已经简单介绍过Blueprint了,其实与spring配置xml文件区别并不是太大,这里重点说明MD-SAL对Blueprint的扩展以及如何根据接口找到相关实现类。

MD-SAL对Blueprint的扩展
首先我们汇总一些常用的扩展,很多我们在前面的Notification和RPC实验中已经遇到过。
1) Global RPC注册(第17行)
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
             xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">

<bean id="fooRpcService" class="org.opendaylight.app.FooRpcServiceImpl">
<!-- constructor args -->
</bean>

<odl:rpc-implementation ref="fooRpcService"/>
</blueprint>

2) Global RPC获取(第9行)
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
             xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">

<odl:rpc-service id="fooRpcService" interface="org.opendaylight.app.FooRpcService"/>

<bean id="bar" class="org.opendaylight.app.Bar">
<argument ref="fooRpcService"/>
</bean>
</blueprint>

3) Routed RPC注册(第17行)
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
             xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">

<bean id="fooRoutedRpcService" class="org.opendaylight.app.FooRoutedRpcServiceImpl">
<!-- constructor args -->
</bean>

<odl:routed-rpc-implementation id="fooRoutedRpcServiceReg" ref="fooRoutedRpcService"/>

<bean id="bar" class="org.opendaylight.app.Bar">
<argument ref="fooRoutedRpcServiceReg"/>
</bean>

</blueprint>

4) NotificationListenser订阅(第17行)
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
             xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">

<bean id="fooListener" class="org.opendaylight.app.FooNotificationListener">
<!-- constructor args -->
</bean>

<odl:notification-listener ref="fooListener"/>

</blueprint>


源码实现
MD-SAL对Blueprint扩展的源码在controller\opendaylight\blueprint\src\main\java\org\opendaylight\controller\blueprint目录下。
bundle启动在BlueprintBundleTracker.java中的start方法中:
public void start(BundleContext context) {
    LOG.info("Starting {}", getClass().getSimpleName());

    restartService = new BlueprintContainerRestartServiceImpl();

    bundleContext = context;

    registerBlueprintEventHandler(context);

    registerNamespaceHandler(context);

    bundleTracker = new BundleTracker<>(context, Bundle.ACTIVE, this);

    …
}

此处调用registerNamespaceHandler注册了一个ODL命名空间的解析,进入registerNamespaceHandler方法,可以看到实际注册的是OpendaylightNamespaceHandler类的一个实例。
在OpendaylightNamespaceHandler类中的parse函数,就是解析xml文件中odl命名空间的地方。

public Metadata parse(final Element element, final ParserContext context) {
    LOG.debug("In parse for {}", element);

    if (nodeNameEquals(element, RpcImplementationBean.RPC_IMPLEMENTATION)) {
        return parseRpcImplementation(element, context);
    } else if (nodeNameEquals(element, RoutedRpcMetadata.ROUTED_RPC_IMPLEMENTATION)) {
        return parseRoutedRpcImplementation(element, context);
    } else if (nodeNameEquals(element, RPC_SERVICE)) {
        return parseRpcService(element, context);
    } else if (nodeNameEquals(element, NotificationListenerBean.NOTIFICATION_LISTENER)) {
        return parseNotificationListener(element, context);
    } else if (nodeNameEquals(element, CLUSTERED_APP_CONFIG)) {
        return parseClusteredAppConfig(element, context);
    } else if (nodeNameEquals(element, SPECIFIC_SERVICE_REF_LIST)) {
        return parseSpecificReferenceList(element, context);
    } else if (nodeNameEquals(element, STATIC_REFERENCE)) {
        return parseStaticReference(element, context);
    } else if (nodeNameEquals(element, ACTION_SERVICE)) {
        return parseActionService(element, context);
    } else if (nodeNameEquals(element, ActionProviderBean.ACTION_PROVIDER)) {
        return parseActionProvider(element, context);
    }

    throw new ComponentDefinitionException("Unsupported standalone element: " + element.getNodeName());
}

以odl:rpc-implementation为例,进入了第一个分支parseRpcImplementation
private static Metadata parseRpcImplementation(final Element element, final ParserContext context) {
    registerRpcRegistryServiceRefBean(context);

    MutableBeanMetadata metadata = context.createMetadata(MutableBeanMetadata.class);
    metadata.setId(context.generateId());
    metadata.setScope(BeanMetadata.SCOPE_SINGLETON);
    metadata.setActivation(ReferenceMetadata.ACTIVATION_EAGER);
    metadata.setRuntimeClass(RpcImplementationBean.class);
    metadata.setInitMethod("init");
    metadata.setDestroyMethod("destroy");
    metadata.addProperty("bundle", createRef(context, "blueprintBundle"));
    metadata.addProperty("rpcRegistry", createRef(context, RPC_REGISTRY_NAME));
    metadata.addProperty("implementation", createRef(context, element.getAttribute(REF_ATTR)));

    if (element.hasAttribute(INTERFACE)) {
        metadata.addProperty("interfaceName", createValue(context, element.getAttribute(INTERFACE)));
    }

    LOG.debug("parseRpcImplementation returning {}", metadata);
    return metadata;
}


上面解析时,用到了RpcImplementationBean的init方法,我们可以看到对应的代码中最后是调用addRpcImplementation完成了Global RPC的注册。
其余的解析与之类似,一般在对应的init或者create方法中就能找到最后实际调用的方法。

根据接口找实现类
这里说明一下怎样根据接口去找实现类,后文分析代码实现时,就不再赘述如何找到实现类了。
在写Notification代码时,会引用到一个API:
org.opendaylight.controller.sal.binding.api.NotificationService
要找到实现类,可以先在相关Blueprint中搜索这个API,果然在controller\opendaylight\md-sal\sal-binding-broker\src\main\resources\org\opendaylight\blueprint\binding-broker.xml中有所发现:
<service ref="bindingNotificationProviderService" odl:type="default">
<interfaces>
  <value>org.opendaylight.controller.sal.binding.api.NotificationProviderService</value>
  <value>org.opendaylight.controller.sal.binding.api.NotificationService</value>
</interfaces>
</service>


通常注册为一个接口的实现类时,可能是类似这样的结构:
<reference id="notificationService" interface="org.opendaylight.controller.md.sal.binding.api.NotificationPublishService"/>

而上面的interfaces则将bindingNotificationProviderService注册为多个接口的实现类。
下一步就是要找到bindingNotificationProviderService是哪儿来的了,继续在当前文件中搜索,
就可以看到bean的生成过程,这里的class就是我们要寻找的实现类了。
<bean id="bindingNotificationProviderService" class="org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceWithInterestListeners">
<argument ref="bindingNotificationPublishAdapter"/>
<argument ref="bindingNotificationServiceAdapter"/>
<argument ref="domNotificationListenerRegistry"/>
</bean>


## Notification ##
订阅Notification
关于Notification,我们先从订阅说起,然后再说发送,这样更容易理解。
在上面的示例中,我们已经找到了Notification的实现类:HeliumNotificationProviderServiceWithInterestListeners, 这个类有着很复杂的继承和实现关系,为了便于理解,我们先把类之间的关系图整理出来,这样看代码的时候就不会太乱,下图3中实线表示extends,虚线表示implements。

1111.png

根据2.2中的源码,可以找到Notification注册的时候,是调用了HeliumNotificationServiceAdapter中的registerNotificationListener方法:
public ListenerRegistration<NotificationListener> registerNotificationListener(
        final NotificationListener listener) {
    return notificationService.registerNotificationListener(listener);
}


继续进入BindingDOMNotificationServiceAdapter
public <T extends NotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener) {
    final BindingDOMNotificationListenerAdapter domListener = new BindingDOMNotificationListenerAdapter(codec, listener);
    final ListenerRegistration<BindingDOMNotificationListenerAdapter> domRegistration =
    domNotifService.registerNotificationListener(domListener, domListener.getSupportedNotifications());
    return new ListenerRegistrationImpl<>(listener, domRegistration);
}


这里的domNotifService.registerNotificationListener 实际上是调用DOMNotificationRouter的registerNotificationListener
public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(final T listener, final Collection<SchemaPath> types) {
    …
    if (!types.isEmpty()) {
        final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b = ImmutableMultimap.builder();
        b.putAll(listeners);

        for (final SchemaPath t : types) {
            b.put(t, reg);
        }

        replaceListeners(b.build());
    }

    return reg;
}


从代码上看到,实际上订阅的过程就是将这个listener放到一个Map中,可以想象到publish的时候就是从这个Map中查找到对应的listener实现消息通知。

发送Notification
在前文的demo中,Notification最后只调用了一个publish方法就轻松实现了消息的发送,那么publish背后是怎样实现的呢?
根据上面的类图,publish方法调用的是HeliumNotificationProviderServiceAdapter中的publish方法,代码追下去,发现也进入了DOMNotificationRouter类,调用的是putNotification方法。
public ListenableFuture<?> putNotification(final DOMNotification notification) throws InterruptedException {
    final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers = listeners.get(notification.getType());
    if (subscribers.isEmpty()) {
        return NO_LISTENERS;
    }

    final long seq = disruptor.getRingBuffer().next();
    return publish(seq, notification, subscribers);
}


和我们刚才的想法一致,的确是从刚才的Map中找到了subscribers。下面要做的就是,将消息发送给这些subscribers。
这里有个特别之处,就是调用的disruptor.getRingBuffer().next()产生了一个序列号,为什么要产生序列号?disruptor又是干什么的?
在DOMNotificationRouter中,我们找到了disruptor赋值的地方:
private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
    this.executor = Preconditions.checkNotNull(executor);

    disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth, executor, ProducerType.MULTI, strategy);
    disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
    disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
    disruptor.start();
}


disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取;在无锁的情况下, 实现queue(环形,RingBuffer)的并发操作, 性能远高于BlockingQueue。
具体的理论知识可以参看http://ifeve.com/disruptor/
这里我们简单的理解为发送者给需要发送的消息贴上了唯一的序列号,并将这个消息放到一个高效率的环形队列中,这样消息就生产完成了。
有生产就有消费,消费者的定义是在disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS)这一行,这是一个匿名类,调用event.deliverNotification()将消息分发出去。
private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS = new EventHandler<DOMNotificationRouterEvent>() {
    @Override
    public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) throws Exception {
        event.deliverNotification();

    }
}

最后的处理是在DOMNotificationRouterEvent中对订阅者做循环发送:
void deliverNotification() {
LOG.trace("Start delivery of notification {}", notification);
for (ListenerRegistration<? extends DOMNotificationListener> r : subscribers) {
final DOMNotificationListener listener = r.getInstance();
if (listener != null) {
try {
LOG.trace("Notifying listener {}", listener);
listener.onNotification(notification);
LOG.trace("Listener notification completed");
} catch (Exception e) {
LOG.error("Delivery of notification {} caused an error in listener {}", notification, listener, e);
}
}
}
LOG.trace("Delivery completed");
}

Notification总结
上面的代码可以看到,Notification是顺序投递的,即必须前面的消息发送出去才能继续处理后面的消息,因此我们建议如果Notification消费者的逻辑过于复杂,需要在处理函数中单独开一个线程去处理,避免影响后续的消息投递。
哪些场景需要使用Notification呢,这里我们总结出几个典型的应用场景:
1) 消息的发送者和消费者需要解耦
2) 发送者不明确谁是消费者
3) 发送者不关心谁来消费
4) 发送者不关心消费者对消息的处理结果
5) 不同的消费者需要对消息做不同的处理
6) 消息的发送和接收是异步的

## RPC ##
RPC可以简单理解为一个点对点的消息,由消费者发送请求,提供者响应请求。这里需要注意一下:MD-SAL理论上只提供非同步的API,同步的API需要借助java.util.concurrent.Future来实现。
RPC的分类
RPC分为两种类型:
1) Global RPC:一个控制器上只有一个实现,如果注册多次,以最早的注册实现为准。前文的例子中就是一个典型的Global RPC。
2) Routed RPC:一个控制器上可以有多个实现,需要注册Path,不同的Path对应不同的实现。后面会讲解一个Routed RPC的示例。
这两种类型都支持集群跨节点调用,但Global RPC优先使用本节点的,只有在本节点未注册的情况下才会调用其他节点,而RoutedPRC则是根据Path去查找。
RPC Provider
下面讲解一下RPC Provider注册的大致流程:
controller\opendaylight\md-sal\sal-binding-broker\src\main\resources\org\opendaylight\blueprint\binding-broker.xml中找到实现类是HeliumRpcProviderRegistry

<bean id="bindingRpcRegistry" class="org.opendaylight.controller.md.sal.binding.compat.HeliumRpcProviderRegistry" >
<argument ref="bindingRpcServiceAdapter"/>
<argument ref="bindingRpcProviderAdapter"/>
</bean>

在这个类中有三个方法:
@Override
public <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(final Class<T> type, final T impl)
        throws IllegalStateException {
    return new CompositeRoutedRpcRegistration<>(type,impl,providerAdapter);
}

@Override
public <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T impl)
        throws IllegalStateException {
    final ObjectRegistration<T> reg = providerAdapter.registerRpcImplementation(type, impl);
    return new DelegatedRootRpcRegistration<>(type,reg);
}

@Override
public <T extends RpcService> T getRpcService(final Class<T> type) {
    return consumerRegistry.getRpcService(type);
}


addRoutedRpcImplementation:RoutedRpc注册时调用。
addRpcImplementation:GlobalRpc注册时调用。
getRpcService:消费者获取RPC服务时调用,不区分GlobalRpc和RoutedRpc都使用此接口获取RPC服务。

分析一下addRpcImplementation的代码(addRoutedRpcImplementation的处理类似,但多了一个Path处理)。
providerAdapter.registerRpcImplementation调用的是
BindingDOMRpcProviderServiceAdapter的registerRpcImplementation-> register方法:
private <S extends RpcService, T extends S> ObjectRegistration<T> register(final Class<S> type, final T implementation, final Collection<YangInstanceIdentifier> rpcContextPaths) {
    final Map<SchemaPath, Method> rpcs = codec.getRpcMethodToSchemaPath(type).inverse();

    final BindingDOMRpcImplementationAdapter adapter = new BindingDOMRpcImplementationAdapter(codec.getCodecRegistry(), type, rpcs, implementation);
    final Set<DOMRpcIdentifier> domRpcs = createDomRpcIdentifiers(rpcs.keySet(), rpcContextPaths);
    final DOMRpcImplementationRegistration<?> domReg = domRpcRegistry.registerRpcImplementation(adapter, domRpcs);
    return new BindingRpcAdapterRegistration<>(implementation, domReg);
}


domRpcRegistry.registerRpcImplementation进入DOMRpcRouter的registerRpcImplementation:
public synchronized <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(
        final T implementation, final Set<DOMRpcIdentifier> rpcs) {
    final DOMRpcRoutingTable oldTable = routingTable;
    final DOMRpcRoutingTable newTable = oldTable.add(implementation, rpcs);
    routingTable = newTable;

    listenerNotifier.execute(() -> notifyAdded(newTable, implementation));

    return new AbstractDOMRpcImplementationRegistration<T>(implementation) {
        @Override
        protected void removeRegistration() {
            removeRpcImplementation(getInstance(), rpcs);
        }
    };
}


这里主要做了两件事:
1) 更新routingTable,可以猜到这其实就是个Map,后面消费者获取服务的时候就是从routingTable里去查找的;
2) 调用notifyAdded会通知集群其他节点本节点的RoutingTable有更新,跟下代码瞧瞧:
private synchronized void notifyAdded(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
    for (Registration<?> l : listeners) {
        l.addRpc(newTable, impl);
    }
}


这里的listeners就是sal-remoterpc-connector这个bundle里的RpcListener类,看看addRpc干了些啥:

void addRpc(final DOMRpcRoutingTable newTable, final DOMRpcImplementation impl) {
        final T l = getInstance();
        if (!l.acceptsImplementation(impl)) {
            return;
        }

        final Map<SchemaPath, Set<YangInstanceIdentifier>> rpcs = Verify.verifyNotNull(newTable.getRpcs(l));
        final MapDifference<SchemaPath, Set<YangInstanceIdentifier>> diff = Maps.difference(prevRpcs, rpcs);

        final Collection<DOMRpcIdentifier> added = new ArrayList<>();
        for (Entry<SchemaPath, Set<YangInstanceIdentifier>> e : diff.entriesOnlyOnRight().entrySet()) {
            added.addAll(Collections2.transform(e.getValue(), i -> DOMRpcIdentifier.create(e.getKey(), i)));
        }
        for (Entry<SchemaPath, ValueDifference<Set<YangInstanceIdentifier>>> e : diff.entriesDiffering().entrySet()) {
            for (YangInstanceIdentifier i : Sets.difference(e.getValue().rightValue(), e.getValue().leftValue())) {
                added.add(DOMRpcIdentifier.create(e.getKey(), i));
            }
        }

        prevRpcs = rpcs;
        if (!added.isEmpty()) {
            l.onRpcAvailable(added);
        }
    }

在onRpcAvailable中最后调用tell方法将更新消息发送出去:
public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
    Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
    LOG.debug("Adding registration for [{}]", rpcs);

    rpcRegistry.tell(new AddOrUpdateRoutes(rpcs), ActorRef.noSender());
}


此处用到了两个技术点:
1) 使用了Akka的Actor模型,消息的接收者为RpcRegistry的handleCommand函数,根据不同的消息类型做处理,本次收到的消息为AddOrUpdateRoutes,即更新本地的缓存信息。
2) 将信息同步到集群其他节点也是通过Actor模型来传递的,不同节点之间信息的一致性则通过Gossip协议来保证。
对Actor模型和Gossip协议感兴趣的读者可自行搜索研究下,这里不做具体介绍了。

RPC Consumer
RPC Provider注册后,RPC Consumer消费时,首先需要获取RPC服务,通过HeliumRpcProviderRegistry的getRpcService进入BindingDOMRpcServiceAdapter,返回值是一个代理。

public <T extends RpcService> T getRpcService(final Class<T> rpcService) {
    Preconditions.checkArgument(rpcService != null, "Rpc Service needs to be specied.");
    return (T) proxies.getUnchecked(rpcService).getProxy();
}

RPC Consumer根据获取到的服务再调用到具体的RPC方法时,会使用这个代理的invoke函数找出具体的实现。
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {

    final RpcInvocationStrategy rpc = rpcNames.get(method);
    if (rpc != null) {
        if (method.getParameterTypes().length == 0) {
            return rpc.invokeEmpty();
        }
        if (args.length != 1) {
            throw new IllegalArgumentException("Input must be provided.");
        }
        return rpc.invoke((DataObject) args[0]);
    }

    if (isObjectMethod(method)) {
        return callObjectMethod(proxy, method, args);
    }
    throw new UnsupportedOperationException("Method " + method.toString() + "is unsupported.");
}


rpc.invoke调用到RpcServiceAdapter的invoke0
private ListenableFuture<RpcResult<?>> invoke0(final SchemaPath schemaPath, final NormalizedNode<?, ?> input) {
    final CheckedFuture<DOMRpcResult, DOMRpcException> result = delegate.invokeRpc(schemaPath, input);
    if(result instanceof LazyDOMRpcResultFuture) {
        return ((LazyDOMRpcResultFuture) result).getBindingFuture();
    }

    return transformFuture(schemaPath, result, codec.getCodecFactory());
}


这里的delegate.invokeRpc就进入了DOMRpcRouter的routingTable去查找RPC Provider的注册了。后面查找的过程就不再赘述了。

Routed RPC
前面说过,Routed RPC注册时调用addRoutedRpcImplementation方法,与Global RPC类似,因此这里就不再解析源码了,我们以一个示例来说明Routed RPC,便于大家更好的理解。

定义Yang文件
以源码目录controller\opendaylight\md-sal\samples\clustering-test-app\model\src\main\yang\car-purchase.yang为例来说明。这里定义了一个RPC buy-car,与Global RPC的定义很类似,但多了一个路由字段person,且这个字段必须是instance-identifier类型的。
typedef person-ref {
  type instance-identifier;
  description "A reference that points to an people:people/person in the data tree.";
}
identity person-context {
    description "A person-context is a classifier for person elements which allows an RPC to provide a service on behalf of a particular element in the data tree.";
}


rpc buy-car {
      description
        "buy a new car";
      input {
        leaf person {
          ext:context-reference "person:person-context";
          type person:person-ref;
          description "A reference to a particular person.";
        }

        leaf car-id {
          type car:car-id;
          description "identifier of car.";
        }
        leaf person-id {
          type person:person-id;
          description "identifier of person.";
        }
      }
    }


Routed RPC注册
controller\opendaylight\md-sal\samples\clustering-test-app\provider\src\main\resources\org\opendaylight\blueprint\ cluster-test-app.xml中注册了是一个Routed RPC,这个注册就是调用前面说的addRoutedRpcImplementation方法。

<bean id="purchaseCarProvider" class="org.opendaylight.controller.clustering.it.provider.PurchaseCarProvider" >
<property name="notificationProvider" ref="notificationService"/>
</bean>

<odl:routed-rpc-implementation id="carPurchaseRpcReg" ref="purchaseCarProvider"/>

但不要以为这样就完成了注册,这仅仅是第一步而已,此时在节点上调用buy-car是无法成功的,必须调用registerPath后才能完成Routed RPC注册。
下面我们继续看下是如何完成Routed RPC注册的。
首先,cluster-test-app.xml里还注册了一个Global RPC,将carPurchaseRpcReg作为一个属性传入。

<bean id="peopleProvider" class="org.opendaylight.controller.clustering.it.provider.PeopleProvider" >
<property name="dataProvider" ref="dataBroker"/>
<property name="rpcRegistration" ref="carPurchaseRpcReg"/>
</bean>

在peopleProvider中有一个addPerson的Global RPC,前面是根据入参的key去构造一个InstanceIdentifier<Person>类型的personId,当入库成功后,就用personId注册Routed RPC的Path,这样就完成注册了。
public Future<RpcResult<Void>> addPerson(final AddPersonInput input) {
log.info("RPC addPerson : adding person [{}]", input);

PersonBuilder builder = new PersonBuilder(input);
final Person person = builder.build();
final SettableFuture<RpcResult<Void>> futureResult = SettableFuture.create();

// Each entry will be identifiable by a unique key, we have to create that identifier
final InstanceIdentifier.InstanceIdentifierBuilder<Person> personIdBuilder =
    InstanceIdentifier.<People>builder(People.class)
        .child(Person.class, person.getKey());
final InstanceIdentifier<Person> personId = personIdBuilder.build();
// Place entry in data store tree
WriteTransaction tx = dataProvider.newWriteOnlyTransaction();
tx.put(LogicalDatastoreType.CONFIGURATION, personId, person, true);

Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
  @Override
  public void onSuccess(final Void result) {
    log.info("RPC addPerson : person added successfully [{}]", person);
    rpcRegistration.registerPath(PersonContext.class, personId);
    log.info("RPC addPerson : routed rpc registered for instance ID [{}]", personId);
    futureResult.set(RpcResultBuilder.<Void>success().build());
  }

  @Override
  public void onFailure(final Throwable t) {
    log.error(String.format("RPC addPerson : person addition failed [%s]", person), t);
    futureResult.set(RpcResultBuilder.<Void>failed()
        .withError(RpcError.ErrorType.APPLICATION, t.getMessage()).build());
  }
});


Routed RPC调用
假设ODL有三个集群节点A、B、C
在A节点上调用addPersion,添加一个名叫Jack的人,此时调用rpcRegistration.registerPath注册A节点就是PersonId为Jack的节点。
在B节点上调用addPersion,添加一个名叫Tom的人,此时调用rpcRegistration.registerPath注册B节点就是PersonId为Tom的节点。
在C节点上调用addPersion,添加一个名叫Paul的人,此时调用rpcRegistration.registerPath注册C节点就是PersonId为Paul节点。

2224.png

调用RPC时,只要填写的路由字段正确,从无论在哪个节点上调用buyCar方法都可以成功。
例如,在B节点上给Jack买一辆车牌号为10000的车,最后实际上是调用了A节点上的buyCar方法,参考代码如下:
public void buyCarExample()
{
    final PersonBuilder personBuilder = new PersonBuilder();
    personBuilder.setKey(new PersonKey(new PersonId("Jack")));
    final InstanceIdentifier.InstanceIdentifierBuilder<Person> personIdBuilder =
            InstanceIdentifier.<People> builder(People.class).child(Person.class, personBuilder.build().getKey());

    final BuyCarInputBuilder builder = new BuyCarInputBuilder();
    builder.setPerson(new PersonRef(personIdBuilder.build()));
    builder.setCarId(new CarId("10000"));
    builder.setPersonId(new PersonId("Jack"));

    carPurchaseService.buyCar(builder.build());
}


典型应用
Routed PRC在集群场景下使用较多,我们可以看看openflowplugin组件中下发流表的接口:

rpc add-flow {
    description "Adding flow to openflow device.";
    input {
        uses tr:transaction-metadata;
        leaf flow-ref {
            type types:flow-ref;
        }
        uses node-flow;
    }
    output {
        uses tr:transaction-aware;
    }
}

其中flow-ref的定义如下:
typedef flow-ref {
    type instance-identifier;
}


这个定义符合之前的描述,所以add-flow就是一个典型的Routed RPC。
实际应用的场景如下:

1414.png

如图5所示,ODL三个节点组成集群,OVS1、OVS2、OVS3分别和ODL A、ODL B、ODL C建立了OpenFlow连接。现在需要给OVS2下发一条流表,对于应用层(APP)来说,其实并不关心是从哪个节点下发的,可以在任意节点调用add-flow,例如图中在ODL A上给OVS2下发流表,实际上是使用了Routed RPC从ODL A路由到ODL B,再从ODL B下发给OVS2。

## 总结 ##

本章对MD-SAL中的Notification和RPC做了深入理解,由于篇幅所限,只能讲述主要的流程和原理,对于从事应用开发的小伙伴们只要大概知道原理即可,如果对底层实现有兴趣的小伙伴们就要自己去阅读源码细细品味了。

——未来网络 Zebra Decoder,2018.07.05出品

0 个评论

要回复文章请先登录注册