# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
# Copyright 2012-2013 Sam Whitlock
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict, namedtuple
from sts.fingerprints.messages import *
import sts.replay_event
from pox.lib.revent import Event, EventMixin
import logging
log = logging.getLogger("god_scheduler")
[docs]class PendingMessage(Event):
[docs] def __init__(self, pending_message, send_event=False):
# TODO(cs): boolean flag is ugly. Should use subclasses, but EventMixin
# doesn't support addListener() on super/subclasses.
super(PendingMessage, self).__init__()
self.pending_message = pending_message
self.send_event = send_event
# TODO(cs): move me to another file?
[docs]class GodScheduler(EventMixin):
'''
Models asynchrony: chooses when switches get to process packets from
controllers. Buffers packets until they are pulled off the buffer and chosen
by god (control_flow.py) to be processed.
'''
_eventMixin_events = set([PendingMessage])
[docs] def __init__(self):
# keep around a queue for each switch of pending openflow messages waiting to
# arrive at the switches.
# { pending receive -> [(connection, pending ofp)_1, (connection, pending ofp)_2, ...] }
self.pendingreceive2conn_messages = defaultdict(list)
# { pending send -> [(connection, pending ofp)_1, (connection, pending ofp)_2, ...] }
self.pendingsend2conn_messages = defaultdict(list)
def _pass_through_handler(self, message_event):
''' handler for pass-through mode '''
pending_message = message_event.pending_message
# Pass through
self.schedule(pending_message)
# Record
if message_event.send_event:
replay_event_class = sts.replay_event.ControlMessageSend
else:
replay_event_class = sts.replay_event.ControlMessageReceive
replay_event = replay_event_class(pending_message.dpid,
pending_message.controller_id,
pending_message.fingerprint)
self.passed_through_events.append(replay_event)
[docs] def set_pass_through(self):
''' Cause all message receipts to pass through immediately without being
buffered'''
self.passed_through_events = []
self.addListener(PendingMessage, self._pass_through_receive_handler)
[docs] def unset_pass_through(self):
'''Unset pass through mode, and return any events that were passed through
since pass through mode was set'''
self.removeListener(self._pass_through_handler)
passed_events = self.passed_through_events
self.passed_through_events = []
return passed_events
[docs] def message_receipt_waiting(self, pending_message):
'''
Return whether the pending message receive is available
'''
return pending_message in self.pendingreceive2conn_messages
[docs] def message_send_waiting(self, pending_message):
'''
Return whether the pending send is available
'''
return pending_message in self.pendingsend2conn_messages
[docs] def schedule(self, pending_message):
'''
Cause the switch to process the pending message associated with
the fingerprint and controller connection.
'''
receive = type(pending_message) == PendingReceive
if receive:
if not self.message_receipt_waiting(pending_message):
raise ValueError("No such pending message %s" % pending_message)
multiset = self.pendingreceive2conn_messages
else:
if not self.message_send_waiting(pending_message):
raise ValueError("No such pending message %s" % pending_message)
multiset = self.pendingsend2conn_messages
(conn, message) = multiset[pending_message].pop(0)
# Avoid memory leak:
if multiset[pending_message] == []:
del multiset[pending_message]
if receive:
conn.allow_message_receipt(message)
else:
conn.allow_message_send(message)
# TODO(cs): make this a factory method that returns DefferedOFConnection objects
# with bound god_scheduler.insert() method. (much cleaner API + separation of concerns)
[docs] def insert_pending_receipt(self, dpid, controller_id, ofp_message, conn):
''' Called by DefferedOFConnection to insert messages into our buffer '''
fingerprint = OFFingerprint.from_pkt(ofp_message)
conn_message = (conn, ofp_message)
pending_receive = PendingReceive(dpid, controller_id, fingerprint)
self.pendingreceive2conn_messages[pending_receive].append(conn_message)
self.raiseEventNoErrors(PendingMessage(pending_receive))
# TODO(cs): make this a factory method that returns DefferedOFConnection objects
# with bound god_scheduler.insert() method. (much cleaner API + separation of concerns)
[docs] def insert_pending_send(self, dpid, controller_id, ofp_message, conn):
''' Called by DefferedOFConnection to insert messages into our buffer '''
fingerprint = OFFingerprint.from_pkt(ofp_message)
conn_message = (conn, ofp_message)
pending_send = PendingSend(dpid, controller_id, fingerprint)
self.pendingsend2conn_messages[pending_send].append(conn_message)
self.raiseEventNoErrors(PendingMessage(pending_send, send_event=True))
[docs] def pending_receives(self):
''' Return the message receipts which are waiting to be scheduled '''
return self.pendingreceive2conn_messages.keys()
[docs] def pending_sends(self):
''' Return the message sends which are waiting to be scheduled '''
return self.pendingsend2conn_messages.keys()
[docs] def flush(self):
''' Garbage collect any previous pending messages '''
num_pending_messages = (len(self.pendingreceive2conn_messages) +
len(self.pendingsend2conn_messages))
if num_pending_messages > 0:
log.info("Flushing %d pending messages" % num_pending_messages)
self.pendingreceive2conn_messages = defaultdict(list)
self.pendingsend2conn_messages = defaultdict(list)
PendingReceive = namedtuple('PendingReceive', ['dpid', 'controller_id', 'fingerprint'])
PendingSend = namedtuple('PendingSend', ['dpid', 'controller_id', 'fingerprint'])