编者按:RYU使用了协程,在很大程度上提高了单核性能。同时也使用了许多高效的语句和库,使得代码精简易读。本文以RYU真实运行流程作为主线,从main函数入手,详细讲述OFPHandler,Controller,RyuApp和AppManager等类的实现细节。
每接触一个控制器我都会习惯性的把控制器的源码读一读,走一走处理流程,RYU也不例外。本篇博文将从main函数入手,讲述RYU的ryuapp基类细节、app_manager类如何load apps,注册并运行application,Event的产生以及分发,还有最重要的应用ofp_handler。文章将以RYU真实运行流程作为主线,详细讲述RYU如何运作。如果文中出现理解错的地方,敬请指出,万分感谢!转载请声明原出处。
main()
RYU的main函数在ryu/cmd/manager.py文件中。main函数中CONF部分已经在在前一篇《RYU oslo学习总结》已经有所介绍,所以这次关注的重点的是后续部分,如app_manager如何工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
def main(args=None, prog=None): try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() if CONF.pid_file: import os with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behaivor, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts)) webapp = wsgi.start_service(app_mgr) if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) finally: app_mgr.close() |
首先从CONF文件中读取出app list。如果ryu-manager 命令任何参数,则默认应用为ofp_handler应用。紧接着实例化一个AppManager对象,调用load_apps函数将应用加载。调用create_contexts函数创建对应的contexts, 然后调用instantiate_apps函数将app_list和context中的app均实例化。启动wsgi架构,提供web应用。最后将所有的应用作为任务,作为coroutine的task去执行,joinall使得程序必须等待所有的task都执行完成才可以退出程序。最后调用close函数,关闭程序,释放资源。以下的部分将以主函数中出现的调用顺序为依据,展开讲解。
OFPHandler
上文说到,如果没有捕获Application输入,那么默认启动的应用是OFPHandler应用。该应用主要用于处理OpenFlow消息。在start函数初始化运行了一个OpenFlowController实例。OpenFlowController类将在后续介绍。
1 2 3 |
def start(self): super(OFPHandler, self).start() return hub.spawn(OpenFlowController()) |
OFPHandler应用完成了基本的消息处理,如hello_handler:用于处理hello报文,协议版本的协商。其处理并不复杂,但是值得注意的一点是装饰器:Decorator的使用。
1 2 3 4 5 |
@set_ev_handler(ofp_event.EventOFPHello, HANDSHAKE_DISPATCHER) def hello_handler(self, ev): self.logger.debug('hello ev %s', ev) msg = ev.msg datapath = msg.datapath |
Decorator
如果你已经了解Decorator,可以直接跳过本部分。装饰器是什么?Python Decorator coolshell上的介绍Python修饰器的函数式编程 Python Decorator可以看作是一种声明,一种修饰。以下举例参考自Coolshell。举例如下:
1 2 3 |
@decorator def foo(): pass |
实际上等同于foo = decorator(foo), 而且它还被执行了。举个例子:
1 2 3 4 5 6 |
def keyword(fn): print "you %s me!" % fn.__name__[::-1].upper() @keyword def evol(): pass |
运行之后,就会输出。。。
多个decorator:
1 2 3 4 |
@decorator_a @decorator_b def foo(): pass |
这相当于:
1 |
foo = decorator_a(decorator_b(foo)) |
而带参数的decorator:
1 2 3 |
@decorator(arg1, arg2) def foo(): pass |
相当于
1 |
foo = decorator(arg1,arg2)(foo) |
decorator(arg1,arg2)将生成一个decorator。
class式的 Decorator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class myDecorator(object): def __init__(self, fn): print "inside myDecorator.__init__()" self.fn = fn def __call__(self): self.fn() print "inside myDecorator.__call__()" @myDecorator def aFunction(): print "inside aFunction()" print "Finished decorating aFunction()" aFunction() |
@decorator使用时,__init__被调用,当function被调用是,执行__call__函数,而不执行function,所以在__call__函数中需要写出self.fn = fn,更多内容可以直接访问Python Decorator Library。
OpenFlowController
前一部分提到OFPHandle的start函数会将OpenFlowController启动。本小节介绍OpenFlowController类。该类的定义在ryu/cmd/controller.py文件中。OpenFlowController.__call__()函数启动了server_loop()函数,该函数实例化了hub.py中的StreamServer类,并将handler函数初始化为datapath_connection_factory函数,并调用serve_forever(),不断进行socket的监听。StreamServer定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ':' in listen_info[0]: self.server = eventlet.listen(listen_info, family=socket.AF_INET6) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle def serve_forever(self): while True: sock, addr = self.server.accept() spawn(self.handle, sock, addr) |
Datapath
Datapath类在RYU中极为重要,每当一个datapath实体与控制器建立连接时,就会实例化一个Datapath的对象。 该类中不仅定义了许多的成员变量用于描述一个datapath,还管理控制器与该datapath通信的数据收发。其中_recv_loop函数完成数据的接收与解析,事件的产生与分发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
@_deactivate def _recv_loop(self): buf = bytearray() required_len = ofproto_common.OFP_HEADER_SIZE count = 0 while self.is_active: ret = self.socket.recv(required_len) if len(ret) == 0: self.is_active = False break buf += ret while len(buf) >= required_len: (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) required_len = msg_len if len(buf) < required_len: break msg = ofproto_parser.msg(self, version, msg_type, msg_len, xid, buf) # 解析报文 # LOG.debug('queue msg %s cls %s', msg, msg.__class__) if msg: ev = ofp_event.ofp_msg_to_ev(msg) # 产生事件 self.ofp_brick.send_event_to_observers(ev, self.state) # 事件分发 dispatchers = lambda x: x.callers[ev.__class__].dispatchers handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] for handler in handlers: handler(ev) buf = buf[required_len:] required_len = ofproto_common.OFP_HEADER_SIZE # We need to schedule other greenlets. Otherwise, ryu # can't accept new switches or handle the existing # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 if count > 2048: count = 0 hub.sleep(0) |
@_deactivate修饰符作用在于在Datapath断开连接之后,将其状态is_active置为False。self.ofp_brick.send_event_to_observers(ev, self.state) 语句完成了事件的分发。self.brick的初始化语句可以在self.__init__函数中找到:
1 |
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event') |
由上可知,self.ofp_brick实际上是由service_brick(中文可以成为:服务链表?)中的“ofp_event”服务赋值的。在每一个app中,使用@set_ev_cls(ev_cls,dispatchers)时,就会将实例化ofp_event模块,执行文件中最后一句:
1 |
handler.register_service('ryu.controller.ofp_handler') |
register_service函数实体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def register_service(service): """ Register the ryu application specified by 'service' as a provider of events defined in the calling module. If an application being loaded consumes events (in the sense of set_ev_cls) provided by the 'service' application, the latter application will be automatically loaded. This mechanism is used to e.g. automatically start ofp_handler if there are applications consuming OFP events. """ frm = inspect.stack()[1] m = inspect.getmodule(frm[0]) m._SERVICE_NAME = service |
其中inspect.stack()[1]返回了调用此函数的caller, inspect.getmodule(frm[0])返回了该caller的模块,当前例子下,module=ofp_event。
我们可以通过ryu-manager --verbose来查看到输出信息,从而印证这一点。
1 2 3 4 5 6 7 8 9 |
muzi@muzi-OptiPlex-390:~/ryu/ryu/app$ ryu-manager --verbose loading app ryu.controller.ofp_handler instantiating app ryu.controller.ofp_handler of OFPHandler BRICK ofp_event CONSUMES EventOFPErrorMsg CONSUMES EventOFPEchoRequest CONSUMES EventOFPPortDescStatsReply CONSUMES EventOFPHello CONSUMES EventOFPSwitchFeatures |
所以当运行ofp_handler应用时,就会注册ofp_event service,为后续的应用提供服务。分发事件之后,还要处理自身订阅的事件,所以首先找到符合当前state的caller,然后调用handler。_caller类可以在handler.py文件中找到,包含dispatchers和ev_source两个成员变量。前者用于描述caller需要的state,后者是event产生者的模块名称。
对应的发送循环由_send_loop完成。self.send_p是一个深度为16的发送queue。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@_deactivate def _send_loop(self): try: while self.is_active: buf = self.send_q.get() self.socket.sendall(buf) finally: q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None # there might be threads currently blocking in send_q.put(). # unblock them by draining the queue. try: while q.get(block=False): pass except hub.QueueEmpty: pass |
serve函数完成了发送循环的启动和接受循环的启动。启动一个coroutine去执行self._send_loop(), 然后马上主动发送hello报文到datapath(可以理解为交换网桥:Bridge),最后执行self._recv_loop()。
1 2 3 4 5 6 7 8 9 10 11 12 |
def serve(self): send_thr = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) try: self._recv_loop() finally: hub.kill(send_thr) hub.joinall([send_thr]) |
而serve函数又在datapath_connection_factory函数中被调用。当然向外提供完整功能的API就是这个。所以在OpenFlowController类中可以看到在初始化server实例的时候,handler赋值为datapath_connection_factory。其中使用到的contextlib module具体内容不作介绍,读者可自行学习。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
def datapath_connection_factory(socket, address): LOG.debug('connected socket:%s address:%s', socket, address) with contextlib.closing(Datapath(socket, address)) as datapath: try: datapath.serve() except: # Something went wrong. # Especially malicious switch can send malformed packet, # the parser raise exception. # Can we do anything more graceful? if datapath.id is None: dpid_str = "%s" % datapath.id else: dpid_str = dpid_to_str(datapath.id) LOG.error("Error in the datapath %s from %s", dpid_str, address) raise |
到此为止,OFPHandler应用的功能实现介绍完毕。RYU启动时,需要启动OFPHandler,才能完成数据的收发和解析。更多的上层应用逻辑都是在此基础之上进行的。若要开发APP则需要继承RyuApp类,并完成observer监听事件,以及注册handler去完成事件处理。