天天看點

Ryu的安裝+使用+源碼分析+二層交換機

ryu的安裝

安裝RYU,需要安裝一些python的套件:

python-eventlet

python-routes

python-webob

python-paramiko

安裝RYU主要有兩種方式:

1、pip安裝

pip install ryu
           

2、下載下傳源檔案安裝

Ryu的安裝+使用+源碼分析+二層交換機
git clone git://github.com/osrg/ryu.git
cd ryu
sudo pip install -r tools/pip-requires
sudo python setup.py install 
           

ryu的使用

進入ryu目錄,輸入

ryu-manager yourapp.py

運作相應的app

Ryu的安裝+使用+源碼分析+二層交換機

比如:

ryu-manager simple_switch.py
           
Ryu的安裝+使用+源碼分析+二層交換機

ryu的源碼分析

下面介紹ryu/ryu目錄下的主要目錄内容。

base

ryu ryu base

Ryu的安裝+使用+源碼分析+二層交換機

base中有一個非常重要的檔案:app_manager.py,其作用是RYU應用的管理中心。用于加載RYU應用程式,接受從APP發送過來的資訊,同時也完成消息的路由。

其主要的函數有app注冊、登出、查找、并定義了RYUAPP基類,定義了RYUAPP的基本屬性。包含name, threads, events, event_handlers和observers等成員,以及對應的許多基本函數。如:start(), stop()等。

這個檔案中還定義了AppManager基類,用于管理APP。定義了加載APP等函數。不過如果僅僅是開發APP的話,這個類可以不必關心

class RyuApp(object):
    """
    The base class for Ryu applications.*#ryu應用程式的基類*
    
*ryuapp子類執行個體化ryu-manager加載所有ryu應用程式子產品的請求,_init_應該調用ryuapp,_init
_帶着一些參數。在_init_中發送一些事件是不合法的*
    RyuApp subclasses are instantiated after ryu-manager loaded
    all requested Ryu application modules.
    __init__ should call RyuApp.__init__ with the same arguments.
    It's illegal to send any events in __init__.
    
*執行個體屬性“name”是ryu程式中用于消息發送的類的名稱*
    The instance attribute 'name' is the name of the class used for
    message routing among Ryu applications.  (Cf. send_event)
    It's set to __class__.__name__ by RyuApp.__init__.
    It's discouraged for subclasses to override this.
    """

    _CONTEXTS = {}
    *"""
    ryu應用程式希望使用字典指定的上下文呢。關鍵是上下文的名字和值是一個普通類繼承上下文。這個類被執行個體化app_manager和執行個體在衆多ryuapp子類中共享包含_contexts member帶有相同的key。一個ryuapp子類可以通過執行個體獲得對執行個體的引用*
    A dictionary to specify contexts which this Ryu application wants to use.
    Its key is a name of context and its value is an ordinary class
    which implements the context.  The class is instantiated by app_manager
    and the instance is shared among RyuApp subclasses which has _CONTEXTS
    member with the same key.  A RyuApp subclass can obtain a reference to
    the instance via its __init__'s kwargs as the following.

    Example::

        _CONTEXTS = {
            'network': network.Network
        }

        def __init__(self, *args, *kwargs):
            self.network = kwargs['network']
    """

    _EVENTS = []
    """
    *一系列事件類由ryuapp子類産生。這應該是指定的目前僅當事件類都從ryuapp類不同的python子產品定義*
    A list of event classes which this RyuApp subclass would generate.
    This should be specified if and only if event classes are defined in
    a different python module from the RyuApp subclass is.
    """

    OFP_VERSIONS = None*設定openflow版本*
    """
   *列出該應用支援openflow的版本。預設的是架構支援的所有版本*
    A list of supported OpenFlow versions for this RyuApp.
    The default is all versions supported by the framework.

    Examples::

        OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION,
                        ofproto_v1_2.OFP_VERSION]
*如果在系統内加載多個應用,則使用交叉點版本*
    If multiple Ryu applications are loaded in the system,
    the intersection of their OFP_VERSIONS is used.
    """

    @classmethod
    def context_iteritems(cls):
        """
        Return iterator over the (key, contxt class) of application context
        """
        return iter(cls._CONTEXTS.items())

    def __init__(self, *_args, **_kwargs):
        super(RyuApp, self).__init__()
        self.name = self.__class__.__name__
        self.event_handlers = {}        # ev_cls -> handlers:list
        self.observers = {}     # ev_cls -> observer-name -> states:set
        self.threads = []
        self.main_thread = None
        self.events = hub.Queue(128)
        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
        if hasattr(self.__class__, 'LOGGER_NAME'):
            self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
        else:
            self.logger = logging.getLogger(self.name)
        self.CONF = cfg.CONF

        # prevent accidental creation of instances of this class outside RyuApp
        class _EventThreadStop(event.EventBase):
            pass
        self._event_stop = _EventThreadStop()
        self.is_active = True

    def start(self):
        """
        Hook that is called after startup initialization is done.
        *當初始化完成後被調用*
        """
        self.threads.append(hub.spawn(self._event_loop))

    def stop(self):
        if self.main_thread:
            hub.kill(self.main_thread)
        self.is_active = False
        self._send_event(self._event_stop, None)
        hub.joinall(self.threads)

    def set_main_thread(self, thread):
        """
        Set self.main_thread so that stop() can terminate it.

        Only AppManager.instantiate_apps should call this function.
        """
        self.main_thread = thread

    def register_handler(self, ev_cls, handler):
        assert callable(handler)
        self.event_handlers.setdefault(ev_cls, [])
        self.event_handlers[ev_cls].append(handler)

    def unregister_handler(self, ev_cls, handler):
        assert callable(handler)
        self.event_handlers[ev_cls].remove(handler)
        if not self.event_handlers[ev_cls]:
            del self.event_handlers[ev_cls]

    def register_observer(self, ev_cls, name, states=None):
        states = states or set()
        ev_cls_observers = self.observers.setdefault(ev_cls, {})
        ev_cls_observers.setdefault(name, set()).update(states)

    def unregister_observer(self, ev_cls, name):
        observers = self.observers.get(ev_cls, {})
        observers.pop(name)

    def unregister_observer_all_event(self, name):
        for observers in self.observers.values():
            observers.pop(name, None)

    def observe_event(self, ev_cls, states=None):
        brick = _lookup_service_brick_by_ev_cls(ev_cls)
        if brick is not None:
            brick.register_observer(ev_cls, self.name, states)

    def unobserve_event(self, ev_cls):
        brick = _lookup_service_brick_by_ev_cls(ev_cls)
        if brick is not None:
            brick.unregister_observer(ev_cls, self.name)

    def get_handlers(self, ev, state=None):
        """Returns a list of handlers for the specific event.

        :param ev: The event to handle.
        :param state: The current state. ("dispatcher")
                      If None is given, returns all handlers for the event.
                      Otherwise, returns only handlers that are interested
                      in the specified state.
                      The default is None.
        """
        ev_cls = ev.__class__
        handlers = self.event_handlers.get(ev_cls, [])
        if state is None:
            return handlers

        def test(h):
            if not hasattr(h, 'callers') or ev_cls not in h.callers:
                # dynamically registered handlers does not have
                # h.callers element for the event.
                return True
            states = h.callers[ev_cls].dispatchers
            if not states:
                # empty states means all states
                return True
            return state in states

        return filter(test, handlers)

    def get_observers(self, ev, state):
        observers = []
        for k, v in self.observers.get(ev.__class__, {}).items():
            if not state or not v or state in v:
                observers.append(k)

        return observers

    def send_request(self, req):
        """
        Make a synchronous request.
        Set req.sync to True, send it to a Ryu application specified by
        req.dst, and block until receiving a reply.
        Returns the received reply.
        The argument should be an instance of EventRequestBase.
        産生一個同步請求。設定req.sync為真,發送到一個指定的ryu應用通過req.dst,然後直到收到一個回複block。傳回接收到的回複,參數應該被evenrequestbase執行個體。
        """

        assert isinstance(req, EventRequestBase)
        req.sync = True
        req.reply_q = hub.Queue()
        self.send_event(req.dst, req)
        # going to sleep for the reply
        return req.reply_q.get()

    def _event_loop(self):
        while self.is_active or not self.events.empty():
            ev, state = self.events.get()
            self._events_sem.release()
            if ev == self._event_stop:
                continue
            handlers = self.get_handlers(ev, state)
            for handler in handlers:
                try:
                    handler(ev)
                except hub.TaskExit:
                    # Normal exit.
                    # Propagate upwards, so we leave the event loop.
                    raise
                except:
                    LOG.exception('%s: Exception occurred during handler processing. '
                                  'Backtrace from offending handler '
                                  '[%s] servicing event [%s] follows.',
                                  self.name, handler.__name__, ev.__class__.__name__)

    def _send_event(self, ev, state):
        self._events_sem.acquire()
        self.events.put((ev, state))

    def send_event(self, name, ev, state=None):
        """
        Send the specified event to the RyuApp instance specified by name.
        *發送指定的事件給ryuapp通過指定的名稱執行個體化*
        """

        if name in SERVICE_BRICKS:
            if isinstance(ev, EventRequestBase):
                ev.src = self.name
            LOG.debug("EVENT %s->%s %s",
                      self.name, name, ev.__class__.__name__)
            SERVICE_BRICKS[name]._send_event(ev, state)
        else:
            LOG.debug("EVENT LOST %s->%s %s",
                      self.name, name, ev.__class__.__name__)

    def send_event_to_observers(self, ev, state=None):
        """
        Send the specified event to all observers of this RyuApp.
        *發送指定的事件給ryuapp的監測者*
        """

        for observer in self.get_observers(ev, state):
            self.send_event(observer, ev, state)


    def reply_to_request(self, req, rep):
        """
        *通過send_request發送同步請求回複。第一個參數是evenrequesbase的執行個體,第二個參數是eventreplybase執行個體*
        Send a reply for a synchronous request sent by send_request.
        The first argument should be an instance of EventRequestBase.
        The second argument should be an instance of EventReplyBase.
        """

        assert isinstance(req, EventRequestBase)
        assert isinstance(rep, EventReplyBase)
        rep.dst = req.src
        if req.sync:
            req.reply_q.put(rep)
        else:
            self.send_event(rep.dst, rep)

*close()是拆卸方法。close是方法的名稱。被python上下文管理器選擇使用*
    def close(self):
        """
        teardown method.
        The method name, close, is chosen for python context manager
        """
        pass
           

controller

controller檔案夾中許多非常重要的檔案,如events.py, ofp_handler.py, controller.py等。其中controller.py中定義了OpenFlowController基類。用于定義OpenFlow的控制器,用于處理交換機和控制器的連接配接等事件,同時還可以産生事件和路由事件。其事件系統的定義,可以檢視events.py和ofp_events.py。

在ofp_handler.py中定義了基本的handler(應該怎麼稱呼呢?句柄?處理函數?),完成了基本的如:握手,錯誤資訊處理和keep alive 等功能。更多的如packet_in_handler應該在app中定義。

在dpset.py檔案中,定義了交換機端的一些消息,如端口狀态資訊等,用于描述和操作交換機。如添加端口,删除端口等操作。

其他的檔案不再贅述。

lib

lib中定義了我們需要使用到的基本的資料結構,如dpid, mac和ip等資料結構。在lib/packet目錄下,還定義了許多網絡協定,如ICMP, DHCP, MPLS和IGMP等協定内容。而每一個資料包的類中都有parser和serialize兩個函數。用于解析和序列化資料包。

lib目錄下,還有ovs, netconf目錄,對應的目錄下有一些定義好的資料類型,不再贅述。

ofproto

在這個目錄下,基本分為兩類檔案,一類是協定的資料結構定義,另一類是協定解析,也即資料包處理函數檔案。如ofproto_v1_0.py是1.0版本的OpenFlow協定資料結構的定義,而ofproto_v1_0_parser.py則定義了1.0版本的協定編碼和解碼。具體内容不贅述,實作功能與協定相同。

topology

包含了switches.py等檔案,基本定義了一套交換機的資料結構。event.py定義了交換上的事件。dumper.py定義了擷取網絡拓撲的内容。最後api.py向上提供了一套調用topology目錄中定義函數的接口。

contrib

這個檔案夾主要存放的是開源社群貢獻者的代碼。我沒看過。

cmd

定義了RYU的指令系統,具體不贅述。

services

完成了BGP和vrrp的實作。具體我還沒有使用這個子產品。

tests

tests目錄下存放了單元測試以及整合測試的代碼,有興趣的讀者可以自行研究。

開發你自己的RYU應用程式

大概浏覽了一下RYU的源代碼,相信看過OpenDaylight的同學會發現,太輕松了!哈哈,我想我真的不喜歡maven, osgi, xml, yang以及java,但是不能不承認OpenDaylight還是很牛逼的,在學習的讀者要堅持啊!

開發RYU的APP,真的再簡單不過了。先來最簡單的:

from ryu.base import app_manager

class L2Switch(app_manager.RyuApp):

def init(self, *args, **kwargs):

super(L2Switch, self).init(*args, **kwargs)

首先,我們從ryu.base import app_manager,在前面我們也提到過這個檔案中定義了RyuApp基類。我們在開發APP的時候隻需要繼承這個基類,就獲得你想要的一個APP的一切了。于是,我們就不用去注冊了?!是的,不需要了!

儲存檔案,可以取一個名字為L2Switch.py。

現在你可以運作你的APP了。快得有點不敢相信吧!但是目前什麼都沒有,運作之後,馬上就會結束,但起碼我們的代碼沒有報錯。

運作:

ryu-manager L2Switch.py

繼續往裡面添加内容:

from ryu.base import app_manager

from ryu.controller import ofp_event

from ryu.controller.handler import MAIN_DISPATCHER

from ryu.controller.handler import set_ev_cls

class L2Switch(app_manager.RyuApp):

def init(self, *args, **kwargs):

super(L2Switch, self).init(*args, **kwargs)

@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
    msg = ev.msg
    datapath = msg.datapath
    ofp = datapath.ofproto
    ofp_parser = datapath.ofproto_parser

    actions = [ofp_parser.OFPActionOutput(ofp.OFPP_FLOOD)]
    out = ofp_parser.OFPPacketOut(
        datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port,
        actions=actions)
    datapath.send_msg(out)
           

其中ofp_event完成了事件的定義,進而我們可以在函數中注冊handler,監聽事件,并作出回應。

packet_in_handler方法用于處理packet_in事件。@set_ev_cls修飾符用于告知RYU,被修飾的函數應該被調用。(翻譯得有點爛這句)

set_ev_cls第一個參數表示事件發生時應該調用的函數,第二個參數告訴交換機隻有在交換機握手完成之後,才可以被調用。

下面分析具體的資料操作:

ev.msg:每一個事件類ev中都有msg成員,用于攜帶觸發事件的資料包。

msg.datapath:已經格式化的msg其實就是一個packet_in封包,msg.datapath直接可以獲得packet_in封包的datapath結構。datapath用于描述一個交換網橋。也是和控制器通信的實體單元。datapath.send_msg()函數用于發送資料到指定datapath。通過datapath.id可獲得dpid資料,在後續的教程中會有使用。

datapath.ofproto對象是一個OpenFlow協定資料結構的對象,成員包含OpenFlow協定的資料結構,如動作類型OFPP_FLOOD。

datapath.ofp_parser則是一個按照OpenFlow解析的資料結構。

actions是一個清單,用于存放action list,可在其中添加動作。

通過ofp_parser類,可以構造構造packet_out資料結構。括弧中填寫對應字段的指派即可。

如果datapath.send_msg()函數發送的是一個OpenFlow的資料結構,RYU将把這個資料發送到對應的datapath。

至此,一個簡單的HUB已經完成。

RYU進階——二層交換機

在以上的基礎之上,繼續修改就可以完成二層交換機的功能。具體代碼如下:

import struct
import logging
 
from ryu.base import app_manager
from ryu.controller import mac_to_port
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_0
from ryu.lib.mac import haddr_to_bin
from ryu.lib.packet import packet
from ryu.lib.packet import ethernet
 
class L2Switch(app_manager.RyuApp):
 
    OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]#define the version of OpenFlow
 
    def __init__(self, *args, **kwargs):
        super(L2Switch, self).__init__(*args, **kwargs)
        self.mac_to_port = {}
 
    def add_flow(self, datapath, in_port, dst, actions):
        ofproto = datapath.ofproto
 
        match = datapath.ofproto_parser.OFPMatch(
            in_port = in_port, dl_dst = haddr_to_bin(dst))
 
        mod = datapath.ofproto_parser.OFPFlowMod(
            datapath = datapath, match = match, cookie = 0,
            command = ofproto.OFPFC_ADD, idle_timeout = 10,hard_timeout = 30,
            priority = ofproto.OFP_DEFAULT_PRIORITY,
            flags =ofproto.OFPFF_SEND_FLOW_REM, actions = actions)
 
        datapath.send_msg(mod)
 
    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
    def packet_in_handler(self, ev):
        msg = ev.msg
        datapath = msg.datapath
        ofproto = datapath.ofproto
 
        pkt = packet.Packet(msg.data)
        eth = pkt.get_protocol(ethernet.ethernet)
 
        dst = eth.dst
        src = eth.src
 
        dpid = datapath.id    #get the dpid
        self.mac_to_port.setdefault(dpid, {})
 
        self.logger.info("packet in %s %s %s %s", dpid, src, dst , msg.in_port)
        #To learn a mac address to avoid FLOOD next time.
 
        self.mac_to_port[dpid][src] = msg.in_port
 
 
        out_port = ofproto.OFPP_FLOOD
 
        #Look up the out_port 
        if dst in self.mac_to_port[dpid]:
            out_port = self.mac_to_port[dpid][dst]
 
        ofp_parser = datapath.ofproto_parser
 
        actions = [ofp_parser.OFPActionOutput(out_port)]
 
        if out_port != ofproto.OFPP_FLOOD:
            self.add_flow(datapath, msg.in_port, dst, actions)
 
 
        #We always send the packet_out to handle the first packet.
        packet_out = ofp_parser.OFPPacketOut(datapath = datapath, buffer_id = msg.buffer_id,
            in_port = msg.in_port, actions = actions)
        datapath.send_msg(packet_out)
    #To show the message of ports' status.
    @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)
    def _port_status_handler(self, ev):
        msg = ev.msg
        reason = msg.reason
        port_no = msg.desc.port_no
 
        ofproto = msg.datapath.ofproto
 
        if reason == ofproto.OFPPR_ADD:
            self.logger.info("port added %s", port_no)
        elif reason == ofproto.OFPPR_DELETE:
            self.logger.info("port deleted %s", port_no)
        elif reason == ofproto.OFPPR_MODIFY:
            self.logger.info("port modified %s", port_no)
        else:
            self.logger.info("Illeagal port state %s %s", port_no, reason)
           

packet_in_handler函數的函數體:

ev.msg:每一個事件類ev中都有msg成員,用于攜帶觸發事件的資料包,(此處是packet_in的消息對象,存儲着消息的資料部分)

msg.datapath:已經格式化的msg其實就是一個packet_in封包,msg.datapath直接可以獲得packet_in封包的datapath結構。datapath用于描述一個交換網橋。也是和控制器通信的實體單元。

datapath.send_msg():函數用于發送資料到指定datapath。

datapath.ofproto對象是一個OpenFlow協定資料結構的對象,成員包含OpenFlow協定的資料結構,如動作類型OFPP_FLOOD。

datapath.ofp_parser則是一個按照OpenFlow解析的資料結構。

actions是一個清單,用于存放action list,可在其中添加動作。

通過ofp_parser類,可以構造構造packet_out資料結構。括弧中填寫對應字段的指派即可。如果datapath.send_msg()函數發送的是一個OpenFlow的資料結構,RYU将把這個資料發送到對應的datapath。

每當RYU控制器收到OpenFlow協定中的packet_in消息時,packet_in_handler函數就會被調用,因為這個函數被注冊到裝飾器set_ev_cls中,并且裝飾器将packet_in_handler注冊到packet_in消息上,每當收到packet_in消息時就調用該函數。

set_ev_cls裝飾器的參數:

第一個參數指觸發函數的調用事件,這裡指packet_in事件。

第二個參數指交換機的狀态。比如,當交換機處于與控制器協商(negotiation)階段時,可能你想忽略此時的packet_in消息,那我們就可以使用MAIN_DISPATCHER作為參數來表明當協商完成後該函數才被調用(即握手後才可以被調用)。

作者:墨痕hz

連結:https://www.jianshu.com/p/2b3bffa31ecb

來源:簡書

著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

解釋的原文連結

https://blog.csdn.net/Jiana_Feng/article/details/107861130添加連結描述

*def init(self, *args, *kwargs):

編寫python script的時候,經常需要使用def init(self, *args, **kwargs): 其含義代表什麼?

這種寫法代表這個方法接受任意個數的參數

如果是沒有指定key的參數,比如單單‘apple’,‘people’,即為無指定,則會以list的形式放在args變量裡面

如果是有指定key的參數,比如item=‘apple’這種形式,即為有指定,則會以dict的形式放在kwargs變量裡面

For example:

Ryu的安裝+使用+源碼分析+二層交換機

相信代碼中的注釋已經足以讓讀者了解這個程式。完成之後,運作:

ryu-manager L2Switch.py
           

然後可以使用Mininet進行pingall測試,成功!

轉載自:李呈部落格@李呈,http://www.muzixing.com/pages/2014/09/20/ryuru-men-jiao-cheng.html