RyuApp
RyuApp类是RYU封装好的APP基类,用户只需要继承该类,就可以方便地开发应用。而注册对应的observer和handler都使用@derocator的形式,使得开发非常的简单高效,这也是Python的优点之一吧。RyuApp类的定义在ryu/base/app_manager.py文件中。该文件实现了两个类RyuApp和AppManager。前者用于定义APP基类,为应用开发提供基本的模板,后者用于Application的管理,加载应用,运行应用,消息路由等功能。
app_manager.py文件中import了instpect和itertools module,从而使得开发更方便简洁。inspect模块提供了一些有用的方法,用于类型检测,获取内容,检测是否可迭代等功能。itertools则是一个关于迭代器的模块,可以提供丰富的迭代器类型,在数据处理上尤其有用。
_CONTEXT
这是一个极其难理解的概念。博主的理解是,_CONTEXT内存储着name:class的key value pairs。为什么需要存储这个内容?实际上这个_CONTEXT携带的信息是所有本APP需要依赖的APP。需要在启动本应用之前去启动,以满足依赖的,比如一个simple_switch.py的应用,如果没有OFPHandler应用作为数据收发和解析的基础的话,是无法运行的。具体文档如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
_CONTEXTS = {} """ A dictionary to specify contexts which this Ryu application wants to use. Its key is a name of context and its value is an ordinary class which implements the context. The class is instantiated by app_manager and the instance is shared among RyuApp subclasses which has _CONTEXTS member with the same key. A RyuApp subclass can obtain a reference to the instance via its __init__'s kwargs as the following. Example:: _CONTEXTS = { 'network': network.Network } def __init__(self, *args, *kwargs): self.network = kwargs['network'] """ |
_EVENTS
用于记录本应用会产生的event。但是当且仅当定义该event的语句在其他模块时才会被使用到。但是目前我还没有遇见过在哪里使用,如果你知道其正确的用法,恳请告知,相互学习。
self.__init__
__init__函数中初始化了许多重要的成员变量,如self.event_handler用于记录向外提供的事件处理句柄,而self.observer则刚好相反,用于通知app_manager本应用监听何种类型的事件。self.event是事件队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def __init__(self, *_args, **_kwargs): super(RyuApp, self).__init__() self.name = self.__class__.__name__ self.event_handlers = {} # ev_cls -> handlers:list self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.events = hub.Queue(128) if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: self.logger = logging.getLogger(self.name) self.CONF = cfg.CONF # prevent accidental creation of instances of this class outside RyuApp class _EventThreadStop(event.EventBase): pass self._event_stop = _EventThreadStop() self.is_active = True |
self.start
start函数将启动coroutine去处理_event_loop,并将其加入threads字典中,为什么名字叫threads呢?我也不知道。也许我理解错了?
self._event_loop
_event_loop函数用于启动事件处理循环,通过调用self.get_handlers(ev, state)函数来找到事件对应的handler,然后处理事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
def get_handlers(self, ev, state=None): """Returns a list of handlers for the specific event. :param ev: The event to handle. :param state: The current state. ("dispatcher") If None is given, returns all handlers for the event. Otherwise, returns only handlers that are interested in the specified state. The default is None. """ ev_cls = ev.__class__ handlers = self.event_handlers.get(ev_cls, []) if state is None: return handlers def _event_loop(self): while self.is_active or not self.events.empty(): ev, state = self.events.get() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev) |
event dispatch
应用中可以通过@set_ev_cls修饰符去监听某些事件。当产生event时,通过event去get observer,得到对应的观察者,然后再使用self.send_event函数去发送事件。在这里,实际上就是直接往self.event队列中put event。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
def _send_event(self, ev, state): self.events.put((ev, state)) def send_event(self, name, ev, state=None): """ Send the specified event to the RyuApp instance specified by name. """ if name in SERVICE_BRICKS: if isinstance(ev, EventRequestBase): ev.src = self.name LOG.debug("EVENT %s->%s %s" % (self.name, name, ev.__class__.__name__)) SERVICE_BRICKS[name]._send_event(ev, state) else: LOG.debug("EVENT LOST %s->%s %s" % (self.name, name, ev.__class__.__name__)) def send_event_to_observers(self, ev, state=None): """ Send the specified event to all observers of this RyuApp. """ for observer in self.get_observers(ev, state): self.send_event(observer, ev, state) |
其他函数如注册handler函数:register_handler,注册监听函数:register_observer等都是非常简单直白的代码,不再赘述。
AppManager
AppManager类是RYU应用的调度中心。用于管理应用的添加删除,消息路由等等功能。
首先从启动函数开始介绍,我们可以看到run_apps函数中的代码和前文提到的main函数语句基本一样。首先获取一个对象,然后加载对应的apps,然后获取contexts,context中其实包含的是本应用所需要的依赖应用。所以在调用instantiate_apps函数时,将app_lists内的application和contexts中的services都实例化,然后启动协程去运行这些服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@staticmethod def run_apps(app_lists): """Run a set of Ryu applications A convenient method to load and instantiate apps. This blocks until all relevant apps stop. """ app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = app_mgr.instantiate_apps(**contexts) webapp = wsgi.start_service(app_mgr) if webapp: services.append(hub.spawn(webapp)) try: hub.joinall(services) finally: app_mgr.close() |
load_apps
首先从创建一个apps_lists的生成器(个人理解应该是生成器而非迭代器)。在while循环中,每次pop一个应用进行处理,然后将其本身和其context中的内容添加到services中,再去调用get_dependent_services函数获取其依赖应用,最后将所有的依赖services添加到app_lists中,循环至最终app_lists内元素全都pop出去,完成application的加载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
def load_apps(self, app_lists): app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = map(lambda x: x.__module__, self.contexts_cls.values()) if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists]) |
create_contexts
context实例化函数将context中name:service class键值对的内容实例化成对应的对象,以便加入到services 列表中,从而得到加载。首先从列表中取出对应数据,然后判断是否时RyuApp的子类,是则实例化,否则直接赋值service class。load_app函数在读取的时候还会再次判断是否是RyuApp子类。
1 2 3 4 5 6 7 8 9 10 11 |
def create_contexts(self): for key, cls in self.contexts_cls.items(): if issubclass(cls, RyuApp): # hack for dpset context = self._instantiate(None, cls) else: context = cls() LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context return self.contexts |
instantiate_apps
此函数调用了self._instantiate函数,在_instantiate函数中又调用了register_app()函数,此函数将app添加到SERVICE_BRICKS字典之中,然后继续调用了ryu.controller.handler 中的 register_instance函数,最终完成了应用的注册。此后继续调用self._update_bricks函数完成了服务链表的更新,最后启动了所有的应用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
def instantiate_apps(self, *args, **kwargs): for app_name, cls in self.applications_cls.items(): self._instantiate(app_name, cls, *args, **kwargs) self._update_bricks() self.report_bricks() threads = [] for app in self.applications.values(): t = app.start() if t is not None: threads.append(t) return threads def _instantiate(self, app_name, cls, *args, **kwargs): # for now, only single instance of a given module # Do we need to support multiple instances? # Yes, maybe for slicing. #LOG.info('instantiating app %s of %s', app_name, cls.__name__) if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None: ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS) if app_name is not None: assert app_name not in self.applications app = cls(*args, **kwargs) register_app(app) assert app.name not in self.applications self.applications[app.name] = app return app |
_update_bricks
此函数完成了更新service_bricks的功能。首先从获取到service实例,然后再获取到service中的方法,若方法有callers属性,即使用了@set_ev_cls的装饰符,拥有了calls属性。(caller类中的ev_source和dispatcher成员变量描述了产生该event的source module, dispatcher描述了event需要在什么状态下才可以被分发。如:HANDSHAKE_DISPATCHER,CONFIG_DISPATCHER等。)最后调用register_observer函数注册了observer。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
def _update_bricks(self): for i in SERVICE_BRICKS.values(): for _k, m in inspect.getmembers(i, inspect.ismethod): if not hasattr(m, 'callers'): continue for ev_cls, c in m.callers.iteritems(): if not c.ev_source: continue brick = _lookup_service_brick_by_mod_name(c.ev_source) if brick: brick.register_observer(ev_cls, i.name, c.dispatchers) # allow RyuApp and Event class are in different module for brick in SERVICE_BRICKS.itervalues(): if ev_cls in brick._EVENTS: brick.register_observer(ev_cls, i.name, c.dispatchers) |
ryu.controller.handler.register_instance
以上的部分介绍了App的注册,observer的注册,handler的查找和使用,但是,始终没有提到handler在何处注册。实际上,handler的注册在register_instance部分完成了。为什么他的位置在handler文件,而不在app_manager文件呢?个人认为可能是为了给其他非Ryu APP的模块使用吧。
1 2 3 4 5 6 |
def register_instance(i): for _k, m in inspect.getmembers(i, inspect.ismethod): # LOG.debug('instance %s k %s m %s', i, _k, m) if _has_caller(m): for ev_cls, c in m.callers.iteritems(): i.register_handler(ev_cls, m) |
Conclusion
总体而言,RYU使用了协程,在很大程度上提高了单核性能。同时也使用了许多高效的语句和库,使得代码量非常精简易读。优势方面,RYU开发门槛低,性能好,稳定读强,并迎合OpenStack编写,适合用于数据中心等云场景。劣势方面,RYU还没有实现分布式版本,在大规模网络中只能使用多个单节点分担负载。实现细节上还存在细微的问题,如虽然提供了存储依赖关系的数据结构和获取依赖关系的函数,但是并没有指定一个默认的依赖关系。不过这一点其实并不算大问题,甚至不是问题,因为开发者可以手动去指定。
认真读完RYU底层的实现代码,觉得学习一门语言需要学习的内容太多,而只有真正去使用时,才会真正的学会和理解。严谨的逻辑,优雅的编码风格,清晰的模块划分能让程序的可读性更高,代码可复用性更强。如果从一个产品的角度讲,RYU算是一个不错的产品,小而美。没有ONOS,OpenDaylight那样庞大,但是作为一个纯SDN控制器而言,用户体验算是非常好的一个了。
写完这篇之后,估计这个学期就不会再写了,非科研狗非产品狗非bababala狗的渣硕要开始预习期末考试了。希望未来的我会更好。
References
- itertools:python关于迭代器的库,参考https://docs.python.org/2/library/itertools.html
- contextlib:参考https://docs.python.org/2/library/contextlib.html
- yield:类似于return,但是返回的是一个生成器,参考https://docs.python.org/2/library/contextlib.html
- decorator:参考https://wiki.python.org/moin/PythonDecorators
- coolshell:Python修饰器的函数式编程,参考http://coolshell.cn/articles/11265.html