# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
# Copyright 2012-2013 Sam Whitlock
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
Classes representing events to be replayed. These events can be serialized to
events.trace JSON files.
Note about the JSON events.trace format:
All events are serialized to JSON with the Event.to_json() method.
All events have a fingerprint field, which is used to compute functional
equivalence between events across different replays of the trace.
The default format of the fingerprint field is a tuple (event class name,).
The specific format of the fingerprint field is documented in each class'
fingerprint() method.
The format of other additional fields is documented in
each event's __init__() method.
'''
from sts.util.console import msg
from sts.entities import Link
from sts.god_scheduler import PendingReceive, PendingSend
from sts.dataplane_traces.trace import DataplaneEvent
from sts.fingerprints.messages import *
from invariant_checker import InvariantChecker
from config.invariant_checks import name_to_invariant_check
import itertools
import abc
import logging
import time
import marshal
import types
import json
from collections import namedtuple
from sts.syncproto.base import SyncTime
from pox.lib.util import TimeoutError
log = logging.getLogger("events")
[docs]def dictify_fingerprint(fingerprint):
# Hack: convert Fingerprint objects into Fingerprint.to_dict()
mutable = list(fingerprint)
for i, e in enumerate(mutable):
if isinstance(mutable[i], Fingerprint):
mutable[i] = mutable[i].to_dict()
return tuple(mutable)
[docs]class Event(object):
''' Superclass for all event types. '''
__metaclass__ = abc.ABCMeta
# Create unique labels for events
_label_gen = itertools.count(1)
# Ensure globally unique labels
_all_label_ids = set()
[docs] def __init__(self, prefix="e", label=None, round=-1, time=None, dependent_labels=None,
prunable=True):
if label is None:
label_id = Event._label_gen.next()
label = prefix + str(label_id)
while label_id in Event._all_label_ids:
label_id = Event._label_gen.next()
label = prefix + str(label_id)
if time is None:
# TODO(cs): compress time for interactive mode?
time = SyncTime.now()
self.label = label
Event._all_label_ids.add(int(label[1:]))
self.round = round
self.time = time
# Add on dependent labels to appease log_processing.superlog_parser.
# TODO(cs): Replayer shouldn't depend on superlog_parser
self.dependent_labels = dependent_labels if dependent_labels else []
# Whether this event should be prunable by MCSFinder. Initialization
# inputs are not pruned.
self.prunable = True
# Whether the (internal) event timed out in the most recent round
self.timed_out = False
@property
[docs] def fingerprint(self):
''' All events must have a fingerprint. Fingerprints are used to compute
functional equivalence. '''
return (self.__class__.__name__,)
@abc.abstractmethod
[docs] def proceed(self, simulation):
'''Executes a single `round'. Returns a boolean that is true if the
Replayer may continue to the next Event, otherwise proceed() again
later.'''
pass
[docs] def to_json(self):
''' Convert the event to json format '''
fields = dict(self.__dict__)
fields['class'] = self.__class__.__name__
# fingerprints are accessed through @property, not in __dict__:
fields['fingerprint'] = dictify_fingerprint(self.fingerprint)
if '_fingerprint' in fields:
del fields['_fingerprint']
return json.dumps(fields)
def __hash__(self):
# Assumption: labels are unique
return self.label.__hash__()
def __eq__(self, other):
# Assumption: labels are unique
if type(self) != type(other):
return False
return self.label == other.label
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.__class__.__name__ + ":" + self.label
def __repr__(self):
s = self.__class__.__name__ + ":" + self.label \
+ ":" + str(self.fingerprint)
return s
# -------------------------------------------------------- #
# Semi-abstract classes for internal and external events #
# -------------------------------------------------------- #
[docs]class InternalEvent(Event):
'''An InternalEvent is one that happens within the controller(s) under
simulation. Derivatives of this class verify that the internal event has
occured during replay in its proceed method before it returns.'''
[docs] def __init__(self, label=None, round=-1, time=None, timeout_disallowed=False,
prunable=False):
super(InternalEvent, self).__init__(prefix='i', label=label, round=round, time=time,
prunable=prunable)
self.timeout_disallowed = timeout_disallowed
[docs] def proceed(self, simulation):
# There might be nothing happening for certain internal events, so default
# to just doing nothing for proceed (i.e. proceeding automatically).
pass
[docs] def disallow_timeouts(self):
self.timeout_disallowed = True
[docs]def assert_fields_exist(json_hash, *args):
''' assert that the fields exist in json_hash '''
fields = args
for field in fields:
if field not in json_hash:
raise ValueError("Field %s not in json_hash %s" % (field, str(json_hash)))
[docs]class SwitchFailure(InputEvent):
''' Crashes a switch, by disconnecting its TCP connection with the
controller(s).'''
[docs] def __init__(self, dpid, label=None, round=-1, time=None):
'''
Parameters:
- dpid: unique integer identifier of the switch.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(SwitchFailure, self).__init__(label=label, round=round, time=time)
self.dpid = dpid
[docs] def proceed(self, simulation):
software_switch = simulation.topology.get_switch(self.dpid)
simulation.topology.crash_switch(software_switch)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'dpid')
dpid = int(json_hash['dpid'])
return SwitchFailure(dpid, label=label, round=round, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, dpid) '''
return (self.__class__.__name__,self.dpid,)
[docs]class SwitchRecovery(InputEvent):
''' Recovers a crashed switch, by reconnecting its TCP connection with the
controller(s).'''
[docs] def __init__(self, dpid, label=None, round=-1, time=None):
'''
Parameters:
- dpid: unique integer identifier of the switch.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(SwitchRecovery, self).__init__(label=label, round=round, time=time)
self.dpid = dpid
[docs] def proceed(self, simulation):
software_switch = simulation.topology.get_switch(self.dpid)
try:
down_controller_ids = map(lambda c: c.cid,
simulation.controller_manager.down_controllers)
simulation.topology.recover_switch(software_switch,
down_controller_ids=down_controller_ids)
except TimeoutError:
# Controller is down... Hopefully control flow will notice soon enough
log.warn("Timed out on %s" % str(self.fingerprint))
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'dpid')
dpid = int(json_hash['dpid'])
return SwitchRecovery(dpid, round=round, label=label, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, dpid) '''
return (self.__class__.__name__,self.dpid,)
[docs]def get_link(link_event, simulation):
start_software_switch = simulation.topology.get_switch(link_event.start_dpid)
end_software_switch = simulation.topology.get_switch(link_event.end_dpid)
link = Link(start_software_switch, link_event.start_port_no,
end_software_switch, link_event.end_port_no)
return link
[docs]class LinkFailure(InputEvent):
''' Cuts a link between switches. This causes the switch to send an
ofp_port_status message to its parent(s). All packets forwarded over
this link will be dropped until a LinkRecovery occurs.'''
[docs] def __init__(self, start_dpid, start_port_no, end_dpid, end_port_no,
label=None, round=-1, time=None):
'''
Parameters:
- start_dpid: unique integer identifier of the first switch connected to the link.
- start_port_no: integer port number of the start switch's port.
- end_dpid: unique integer identifier of the second switch connected to the link.
- end_port_no: integer port number of the end switch's port to be created.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(LinkFailure, self).__init__(label=label, round=round, time=time)
self.start_dpid = start_dpid
self.start_port_no = start_port_no
self.end_dpid = end_dpid
self.end_port_no = end_port_no
[docs] def proceed(self, simulation):
link = get_link(self, simulation)
simulation.topology.sever_link(link)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'start_dpid', 'start_port_no', 'end_dpid',
'end_port_no')
start_dpid = int(json_hash['start_dpid'])
start_port_no = int(json_hash['start_port_no'])
end_dpid = int(json_hash['end_dpid'])
end_port_no = int(json_hash['end_port_no'])
return LinkFailure(start_dpid, start_port_no, end_dpid, end_port_no,
round=round, label=label, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, start dpid, start port_no, end dpid, end port_no) '''
return (self.__class__.__name__,
self.start_dpid, self.start_port_no,
self.end_dpid, self.end_port_no)
[docs]class LinkRecovery(InputEvent):
''' Recovers a failed link between switches. This causes the switch to send an
ofp_port_status message to its parent(s). '''
[docs] def __init__(self, start_dpid, start_port_no, end_dpid, end_port_no,
label=None, round=-1, time=None):
'''
Parameters:
- start_dpid: unique integer identifier of the first switch connected to the link.
- start_port_no: integer port number of the start switch's port.
- end_dpid: unique integer identifier of the second switch connected to the link.
- end_port_no: integer port number of the end switch's port to be created.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(LinkRecovery, self).__init__(label=label, round=round, time=time)
self.start_dpid = start_dpid
self.start_port_no = start_port_no
self.end_dpid = end_dpid
self.end_port_no = end_port_no
[docs] def proceed(self, simulation):
link = get_link(self, simulation)
simulation.topology.repair_link(link)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'start_dpid', 'start_port_no', 'end_dpid',
'end_port_no')
start_dpid = int(json_hash['start_dpid'])
start_port_no = int(json_hash['start_port_no'])
end_dpid = int(json_hash['end_dpid'])
end_port_no = int(json_hash['end_port_no'])
return LinkRecovery(start_dpid, start_port_no, end_dpid, end_port_no,
round=round, label=label, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, start dpid, start port, end dpid, end port)
'''
return (self.__class__.__name__,
self.start_dpid, self.start_port_no,
self.end_dpid, self.end_port_no)
[docs]class ControllerFailure(InputEvent):
''' Kills a controller process with `kill -9`'''
[docs] def __init__(self, controller_id, label=None, round=-1, time=None):
'''
Parameters:
- controller_id: unique string label for the controller to be killed.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(ControllerFailure, self).__init__(label=label, round=round, time=time)
self.controller_id = controller_id
[docs] def proceed(self, simulation):
controller = simulation.controller_manager.get_controller(self.controller_id)
simulation.controller_manager.kill_controller(controller)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist('controller_id')
controller_id = json_hash['controller_id']
return ControllerFailure(controller_id, round=round, label=label, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, controller id) '''
return (self.__class__.__name__,self.controller_id)
[docs]class ControllerRecovery(InputEvent):
''' Reboots a crashed controller by reinvoking its original command line
parameters'''
[docs] def __init__(self, controller_id, label=None, round=-1, time=None):
'''
Parameters:
- controller_id: unique string label for the controller.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(ControllerRecovery, self).__init__(label=label, round=round, time=time)
self.controller_id = controller_id
[docs] def proceed(self, simulation):
controller = simulation.controller_manager.get_controller(self.controller_id)
simulation.controller_manager.reboot_controller(controller)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist('controller_id')
controller_id = json_hash['controller_id']
return ControllerFailure(controller_id, round=round, label=label, time=time)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, controller id) '''
return (self.__class__.__name__,self.controller_id)
[docs]class HostMigration(InputEvent):
''' Migrates a host from one location in network to another. Creates a new
virtual port on the new switch, and takes down the old port on the old switch.
'''
[docs] def __init__(self, old_ingress_dpid, old_ingress_port_no,
new_ingress_dpid, new_ingress_port_no, host_id, label=None, round=-1, time=None):
'''
Parameters:
- old_ingress_dpid: unique integer identifier of the ingress switch the host is
moving away from.
- old_ingress_port_no: integer identifier of the port the host is moving
away from.
- new_ingress_dpid: unique integer identifier of the ingress switch the host is
moving to.
- new_ingress_port_no: integer identifier of the port the host is moving
to.
- host_id: unique integer identifier of the host.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(HostMigration, self).__init__(label=label, round=round, time=time)
self.old_ingress_dpid = old_ingress_dpid
self.old_ingress_port_no = old_ingress_port_no
self.new_ingress_dpid = new_ingress_dpid
self.new_ingress_port_no = new_ingress_port_no
self.host_id = host_id
[docs] def proceed(self, simulation):
simulation.topology.migrate_host(self.old_ingress_dpid,
self.old_ingress_port_no,
self.new_ingress_dpid,
self.new_ingress_port_no)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'old_ingress_dpid', 'old_ingress_port_no',
'new_ingress_dpid', 'new_ingress_port_no', 'host_id')
old_ingress_dpid = int(json_hash['old_ingress_dpid'])
old_ingress_port_no = int(json_hash['old_ingress_port_no'])
new_ingress_dpid = int(json_hash['new_ingress_dpid'])
new_ingress_port_no = int(json_hash['new_ingress_port_no'])
host_id = json_hash['host_id']
return HostMigration(old_ingress_dpid, old_ingress_port_no,
new_ingress_dpid, new_ingress_port_no,
host_id, round=round, label=label, time=time)
@property
[docs] def old_location(self):
return (self.old_ingress_dpid, self.old_ingress_port_no)
@property
[docs] def new_location(self):
return (self.new_ingress_dpid, self.new_ingress_port_no)
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, old dpid, old port, new dpid, new port, host id) '''
return (self.__class__.__name__,self.old_ingress_dpid,
self.old_ingress_port_no, self.new_ingress_dpid,
self.new_ingress_port_no, self.host_id)
[docs]class PolicyChange(InputEvent):
''' Not currently supported '''
[docs] def __init__(self, request_type, label=None, round=-1, time=None):
super(PolicyChange, self).__init__(label=label, round=round, time=time)
self.request_type = request_type
[docs] def proceed(self, simulation):
# TODO(cs): implement me, and add PolicyChanges to Fuzzer
pass
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'request_type')
request_type = json_hash['request_type']
return PolicyChange(request_type, round=round, label=label, time=time)
[docs]class TrafficInjection(InputEvent):
''' Injects a dataplane packet into the network at the given host's access link '''
[docs] def __init__(self, label=None, dp_event=None, host_id=None, round=-1, time=None, prunable=True):
'''
Parameters:
- dp_event: DataplaneEvent object encapsulating the packet contents and the
access link.
- host_id: unique integer label identifying the host that generated the
packet.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- prunable: whether this input event can be pruned during delta
debugging.
'''
super(TrafficInjection, self).__init__(label=label, round=round, time=time,
prunable=prunable)
self.dp_event = dp_event
self.host_id = host_id
[docs] def proceed(self, simulation):
# If dp_event is None, backwards compatibility
if self.dp_event is None:
if simulation.dataplane_trace is None:
raise RuntimeError("No dataplane trace specified!")
simulation.dataplane_trace.inject_trace_event()
else:
host = simulation.topology.link_tracker\
.interface2access_link[self.dp_event.interface].host
host.send(self.dp_event.interface, self.dp_event.packet)
return True
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, dp event, host_id)
The format of dp event is:
{"interface": HostInterface.to_json(), "packet": base 64 encoded packet contents}
See entities.py for the HostInterface json format.
'''
return (self.__class__.__name__, self.dp_event, self.host_id)
[docs] def to_json(self):
fields = {}
fields = dict(self.__dict__)
fields['class'] = self.__class__.__name__
fields['dp_event'] = self.dp_event.to_json()
fields['fingerprint'] = (self.__class__.__name__, self.dp_event.to_json())
return json.dumps(fields)
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
prunable = True
if 'prunable' in json_hash:
prunable = json_hash['prunable']
dp_event = None
if 'dp_event' in json_hash:
dp_event = DataplaneEvent.from_json(json_hash['dp_event'])
host_id = None
if 'host_id' in json_hash:
host_id = json_hash['host_id']
return TrafficInjection(label=label, dp_event=dp_event, host_id=host_id, time=time, round=round, prunable=prunable)
[docs]class WaitTime(InputEvent):
''' Causes the simulation to sleep for the specified number of seconds.
Controller processes continue running during this time.'''
[docs] def __init__(self, wait_time, label=None, round=-1, time=None):
'''
Parameters:
- wait_time: float representing how long to sleep in seconds.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(WaitTime, self).__init__(label=label, round=round, time=time)
self.wait_time = wait_time
[docs] def proceed(self, simulation):
log.info("WaitTime: pausing simulation for %f seconds" % (self.wait_time))
time.sleep(self.wait_time)
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'wait_time')
wait_time = json_hash['wait_time']
return WaitTime(wait_time, round=round, label=label, time=time)
[docs]class CheckInvariants(InputEvent):
''' Causes the simulation to pause itself and check the given invariant before
proceeding. '''
[docs] def __init__(self, label=None, round=-1, time=None,
invariant_check_name="InvariantChecker.check_correspondence"):
'''
Parameters:
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- invariant_check_name: unique name of the invariant to be checked. See
config.invariant_checks for an exhaustive list of possible invariant
checks.
'''
super(CheckInvariants, self).__init__(label=label, round=round, time=time)
# For backwards compatibility.. (invariants used to be specified as
# marshalled functions, not invariant check names)
self.legacy_invariant_check = not isinstance(invariant_check_name, basestring)
if self.legacy_invariant_check:
self.invariant_check = invariant_check_name
else:
# Otherwise, invariant check is specified as a name
self.invariant_check_name = invariant_check_name
if invariant_check_name not in name_to_invariant_check:
raise ValueError('''Unknown invariant check %s.\n'''
'''Invariant check name must be defined in config.invariant_checks''',
invariant_check_name)
self.invariant_check = name_to_invariant_check[invariant_check_name]
[docs] def proceed(self, simulation):
try:
violations = self.invariant_check(simulation)
except NameError as e:
raise ValueError('''Closures are unsupported for invariant check '''
'''functions.\n Use dynamic imports inside of your '''
'''invariant check code and define all globals '''
'''locally.\n NameError: %s''' % str(e))
if violations != []:
msg.fail("The following correctness violations occurred!: %s"
% str(violations))
if hasattr(simulation, "fail_to_interactive") and simulation.fail_to_interactive:
raise KeyboardInterrupt("fail to interactive")
else:
msg.interactive("No correctness violations!")
return True
[docs] def to_json(self):
fields = dict(self.__dict__)
fields['class'] = self.__class__.__name__
if self.legacy_invariant_check:
fields['invariant_check'] = marshal.dumps(self.invariant_check.func_code)\
.encode('base64')
fields['invariant_name'] = self.invariant_check.__name__
else:
fields['invariant_name'] = self.invariant_check_name
fields['invariant_check'] = None
fields['fingerprint'] = "N/A"
return json.dumps(fields)
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
invariant_check_name = "InvariantChecker.check_connectivity"
if 'invariant_name' in json_hash:
invariant_check_name = json_hash['invariant_name']
elif 'invariant_check' in json_hash:
# Legacy code (marshalled function)
# Assumes that the closure is empty
code = marshal.loads(json_hash['invariant_check'].decode('base64'))
invariant_check_name = types.FunctionType(code, globals())
return CheckInvariants(label=label, time=time, round=round,
invariant_check_name=invariant_check_name)
[docs]class ControlChannelBlock(InputEvent):
''' Simulates delay between switches and controllers by temporarily
queuing all messages sent on the switch<->controller TCP connection. No
messages will be sent over the connection until a ControlChannelUnblock
occurs. '''
[docs] def __init__(self, dpid, controller_id, label=None, round=-1, time=None):
'''
Parameters:
- dpid: unique integer identifier of the switch.
- controller_id: unique string label for the controller.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(ControlChannelBlock, self).__init__(label=label, round=round, time=time)
self.dpid = dpid
self.controller_id = controller_id
[docs] def proceed(self, simulation):
switch = simulation.topology.get_switch(self.dpid)
connection = switch.get_connection(self.controller_id)
if connection.io_worker.currently_blocked:
raise RuntimeError("Expected channel %s to not be blocked" % str(connection))
connection.io_worker.block()
return True
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, dpid, controller id)
'''
return (self.__class__.__name__,
self.dpid, self.controller_id)
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'dpid', 'controller_id')
dpid = json_hash['dpid']
controller_id = json_hash['controller_id']
return ControlChannelBlock(dpid, controller_id, round=round, label=label, time=time)
[docs]class ControlChannelUnblock(InputEvent):
''' Unblocks the control channel delay triggered by a ControlChannelUnblock.
All queued messages will be sent.'''
[docs] def __init__(self, dpid, controller_id, label=None, round=-1, time=None):
'''
Parameters:
- dpid: unique integer identifier of the switch.
- controller_id: unique string label for the controller.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
super(ControlChannelUnblock, self).__init__(label=label, round=round, time=time)
self.dpid = dpid
self.controller_id = controller_id
[docs] def proceed(self, simulation):
switch = simulation.topology.get_switch(self.dpid)
connection = switch.get_connection(self.controller_id)
if not connection.io_worker.currently_blocked:
raise RuntimeError("Expected channel %s to be blocked" % str(connection))
connection.io_worker.unblock()
return True
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format: (class name, dpid, controller id) '''
return (self.__class__.__name__,
self.dpid, self.controller_id)
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'dpid', 'controller_id')
dpid = json_hash['dpid']
controller_id = json_hash['controller_id']
return ControlChannelUnblock(dpid, controller_id, round=round, label=label, time=time)
[docs]class DataplaneDrop(InputEvent):
''' Removes an in-flight dataplane packet with the given fingerprint from
the network. '''
[docs] def __init__(self, fingerprint, label=None, round=-1, time=None,
passive=True):
'''
Parameters:
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- passive: whether we're using Replayer.DataplaneChecker
'''
super(DataplaneDrop, self).__init__(label=label, round=round, time=time)
# N.B. fingerprint is monkeypatched on to DpPacketOut events by BufferedPatchPanel
if fingerprint[0] != self.__class__.__name__:
fingerprint = list(fingerprint)
fingerprint.insert(0, self.__class__.__name__)
if type(fingerprint) == list:
fingerprint = (fingerprint[0], DPFingerprint(fingerprint[1]),
fingerprint[2], fingerprint[3])
self._fingerprint = fingerprint
# TODO(cs): passive is a bit of a hack, but this was easier.
self.passive = passive
[docs] def proceed(self, simulation):
# Handled by control_flow.replayer.DataplaneChecker
if self.passive:
return True
else:
dp_event = simulation.patch_panel.get_buffered_dp_event(self.fingerprint[1:])
if dp_event is not None:
simulation.patch_panel.drop_dp_event(dp_event)
return True
return False
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, DPFingerprint, switch dpid, port no)
See fingerprints/messages.py for format of DPFingerprint.
'''
return self._fingerprint
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'fingerprint')
fingerprint = json_hash['fingerprint']
return DataplaneDrop(fingerprint, round=round, label=label, time=time)
[docs] def to_json(self):
fields = dict(self.__dict__)
fields['class'] = self.__class__.__name__
fields['fingerprint'] = (self.fingerprint[0], self.fingerprint[1].to_dict(),
self.fingerprint[2], self.fingerprint[3])
del fields['_fingerprint']
return json.dumps(fields)
# TODO(cs): Temporary hack until we figure out determinism
[docs]class LinkDiscovery(InputEvent):
''' Deprecated '''
[docs] def __init__(self, controller_id, link_attrs, label=None, round=-1, time=None):
super(LinkDiscovery, self).__init__(label=label, round=round, time=time)
self._fingerprint = (self.__class__.__name__,
controller_id, tuple(link_attrs))
self.controller_id = controller_id
self.link_attrs = link_attrs
[docs] def proceed(self, simulation):
controller = simulation.controller_manager.get_controller(self.controller_id)
controller.sync_connection.send_link_notification(self.link_attrs)
return True
@property
[docs] def fingerprint(self):
return self._fingerprint
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'controller_id', 'link_attrs')
controller_id = json_hash['controller_id']
link_attrs = json_hash['link_attrs']
return LinkDiscovery(controller_id, link_attrs, round=round, label=label, time=time)
all_input_events = [SwitchFailure, SwitchRecovery, LinkFailure, LinkRecovery,
ControllerFailure, ControllerRecovery, HostMigration,
PolicyChange, TrafficInjection, WaitTime, CheckInvariants,
ControlChannelBlock, ControlChannelUnblock,
DataplaneDrop, LinkDiscovery]
# ----------------------------------- #
# Concrete classes of InternalEvents #
# ----------------------------------- #
[docs]class ControlMessageBase(InternalEvent):
'''
Logged whenever the GodScheduler decides to allow a switch to receive or
send an openflow packet.
'''
[docs] def __init__(self, dpid, controller_id, fingerprint, label=None, round=-1, time=None, timeout_disallowed=False):
'''
Parameters:
- dpid: unique integer identifier of the switch.
- controller_id: unique string label for the controller.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- timeout_disallowed: whether the replayer should wait indefinitely for
this event to occur. Defaults to False.
'''
# If constructed directly (not from json), fingerprint is the
# OFFingerprint, not including dpid and controller_id
super(ControlMessageBase, self).__init__(label=label, round=round, time=time, timeout_disallowed=timeout_disallowed)
self.dpid = dpid
self.controller_id = controller_id
if type(fingerprint) == list:
fingerprint = (fingerprint[0], OFFingerprint(fingerprint[1]),
fingerprint[2], tuple(fingerprint[3]))
if type(fingerprint) == dict or type(fingerprint) != tuple:
fingerprint = (self.__class__.__name__, OFFingerprint(fingerprint),
dpid, controller_id)
self._fingerprint = fingerprint
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, OFFingerprint, dpid, controller id)
See fingerprints/messages.py for OFFingerprint format.
'''
return self._fingerprint
[docs]class ControlMessageReceive(ControlMessageBase):
'''
Logged whenever the GodScheduler decides to allow a switch to receive an
openflow message.
'''
[docs] def proceed(self, simulation):
message_waiting = simulation.god_scheduler.message_receipt_waiting(self.pending_receive)
if message_waiting:
simulation.god_scheduler.schedule(self.pending_receive)
return True
return False
@property
[docs] def pending_receive(self):
return PendingReceive(self.dpid, self.controller_id, self.fingerprint[1])
def __str__(self):
return "ControlMessageReceive:%s c %s -> s %s [%s]" % (self.label, self.controller_id, self.dpid, self.fingerprint[1].human_str())
@staticmethod
[docs] def from_json(json_hash):
(label, time, round, timeout_disallowed) = extract_base_fields(json_hash)
assert_fields_exist(json_hash, 'dpid', 'controller_id', 'fingerprint')
dpid = json_hash['dpid']
controller_id = json_hash['controller_id']
fingerprint = json_hash['fingerprint']
return ControlMessageReceive(dpid, controller_id, fingerprint,
round=round, label=label, time=time, timeout_disallowed=timeout_disallowed)
[docs]class ControlMessageSend(ControlMessageBase):
'''
Logged whenever the GodScheduler decides to allow a switch to send an
openflow message.
'''
[docs] def proceed(self, simulation):
message_waiting = simulation.god_scheduler.message_send_waiting(self.pending_send)
if message_waiting:
simulation.god_scheduler.schedule(self.pending_send)
return True
return False
@property
[docs] def pending_send(self):
return PendingSend(self.dpid, self.controller_id, self.fingerprint[1])
def __str__(self):
return "ControlMessageSend:%s c %s -> s %s [%s]" % (self.label, self.dpid, self.controller_id, self.fingerprint[1].human_str())
@staticmethod
[docs] def from_json(json_hash):
(label, time, round, timeout_disallowed) = extract_base_fields(json_hash)
assert_fields_exist(json_hash, 'dpid', 'controller_id', 'fingerprint')
dpid = json_hash['dpid']
controller_id = json_hash['controller_id']
fingerprint = json_hash['fingerprint']
return ControlMessageSend(dpid, controller_id, fingerprint, round=round, label=label, time=time, timeout_disallowed=timeout_disallowed)
# TODO(cs): move me?
[docs]class PendingStateChange(namedtuple('PendingStateChange',
['controller_id', 'time', 'fingerprint',
'name', 'value'])):
def __new__(cls, controller_id, time, fingerprint, name, value):
controller_id = controller_id
if type(time) == list:
time = tuple(time)
if type(fingerprint) == list:
fingerprint = tuple(fingerprint)
if type(value) == list:
value = tuple(value)
return super(cls, PendingStateChange).__new__(cls, controller_id, time,
fingerprint, name, value)
def _get_regex(self):
# TODO(cs): if we add varargs to the signature, this needs to be changed
if type(self.fingerprint) == tuple:
# Skip over the class name
return self.fingerprint[1]
return self.fingerprint
def __hash__(self):
# TODO(cs): may need to add more context into the fingerprint to avoid
# ambiguity
return self._get_regex().__hash__() + self.controller_id.__hash__()
def __eq__(self, other):
if type(other) != type(self):
return False
return (self._get_regex() == other._get_regex() and
self.controller_id == other.controller_id)
def __ne__(self, other):
# NOTE: __ne__ in python does *NOT* by default delegate to eq
return not self.__eq__(other)
[docs]class ControllerStateChange(InternalEvent):
'''
Logged for any (visible) state change in the controller (e.g.
mastership change). Visibility into controller state changes is obtained
via syncproto.
'''
[docs] def __init__(self, controller_id, fingerprint, name, value, label=None, round=-1, time=None, timeout_disallowed=False):
'''
Parameters:
- controller_id: unique string label for the controller.
- name: The format string passed to the controller's logging library.
- value: An array of values for the format string.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- timeout_disallowed: whether the replayer should wait indefinitely for
this event to occur. Defaults to False.
'''
super(ControllerStateChange, self).__init__(label=label, round=round, time=time, timeout_disallowed=timeout_disallowed)
self.controller_id = controller_id
if type(fingerprint) == str or type(fingerprint) == unicode:
fingerprint = (self.__class__.__name__, fingerprint)
if type(fingerprint) == list:
fingerprint = tuple(fingerprint)
self._fingerprint = fingerprint
self.name = name
if type(value) == list:
value = tuple(value)
self.value = value
[docs] def proceed(self, simulation):
observed_yet = simulation.controller_sync_callback\
.state_change_pending(self.pending_state_change)
if observed_yet:
simulation.controller_sync_callback\
.ack_pending_state_change(self.pending_state_change)
return True
return False
@property
[docs] def pending_state_change(self):
return PendingStateChange(self.controller_id, self.time,
self._get_message_fingerprint(),
self.name, self.value)
def _get_message_fingerprint(self):
return self._fingerprint[1]
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, PendingStateChange.fingerprint, controller id)
PendingStateChange.fingerprint is the format string passed to the
controller's logging library (without interpolated values)
'''
# Somewhat confusing: the StateChange's fingerprint is self._fingerprint,
# but the overall fingerprint of this event needs to include the controller
# id
return tuple(list(self._fingerprint) + [self.controller_id])
@staticmethod
[docs] def from_pending_state_change(state_change):
return ControllerStateChange(state_change.controller_id,
state_change.fingerprint, state_change.name, state_change.value,
time=state_change.time)
@staticmethod
[docs] def from_json(json_hash):
(label, time, round, timeout_disallowed) = extract_base_fields(json_hash)
assert_fields_exist(json_hash, 'controller_id', '_fingerprint',
'name', 'value')
controller_id = json_hash['controller_id']
fingerprint = json_hash['_fingerprint']
name = json_hash['name']
value = json_hash['value']
return ControllerStateChange(controller_id, fingerprint, name, value,
round=round, label=label, time=time,
timeout_disallowed=timeout_disallowed)
[docs]class DeterministicValue(InternalEvent):
'''
Logged whenever the controller asks for a deterministic value (e.g.
gettimeofday()
'''
[docs] def __init__(self, controller_id, name, value, label=None, round=-1, time=None, timeout_disallowed=False):
'''
Parameters:
- controller_id: unique string label for the controller.
- name: name of the DeterministicValue request, e.g. "gettimeofday"
- value: the return value of the DeterministicValue request.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- timeout_disallowed: whether the replayer should wait indefinitely for
this event to occur. Defaults to False.
'''
super(DeterministicValue, self).__init__(label=label, round=round, time=time, timeout_disallowed=timeout_disallowed)
self.controller_id = controller_id
self.name = name
if name == "gettimeofday":
value = SyncTime(seconds=value[0], microSeconds=value[1])
elif type(value) == list:
value = tuple(value)
self.value = value
[docs] def proceed(self, simulation):
if simulation.controller_sync_callback\
.pending_deterministic_value_request(self.controller_id):
simulation.controller_sync_callback.send_deterministic_value(self.controller_id,
self.value)
return True
return False
@staticmethod
[docs] def from_json(json_hash):
(label, time, round, timeout_disallowed) = extract_base_fields(json_hash)
assert_fields_exist(json_hash, 'controller_id',
'name', 'value')
controller_id = json_hash['controller_id']
name = json_hash['name']
value = json_hash['value']
return DeterministicValue(controller_id, name, value, round=round,
label=label, time=time, timeout_disallowed=timeout_disallowed)
# TODO(cs): this should really be an input event. But need to make sure that
# it can be pruned safely
[docs]class ConnectToControllers(InternalEvent):
''' Logged at the beginning of the execution. Causes all switches to open
TCP connections their their parent controller(s).
'''
[docs] def proceed(self, simulation):
simulation.connect_to_controllers()
return True
@staticmethod
[docs] def from_json(json_hash):
(label, time, round, timeout_disallowed) = extract_base_fields(json_hash)
return ConnectToControllers(label=label, time=time, round=round,
timeout_disallowed=timeout_disallowed)
[docs]class DataplanePermit(InternalEvent):
''' DataplanePermit allows a packet to move from one port to another in the
dataplane. We basically just keep this around for bookkeeping purposes. During
replay, this let's us know which packets to let through, and which to drop.
'''
[docs] def __init__(self, fingerprint, label=None, round=-1, time=None,
passive=True):
'''
Parameters:
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
- passive: whether we're using Replayer.DataplaneChecker
'''
# N.B. fingerprint is monkeypatched onto DpPacketOut events by
# BufferedPatchPanel
super(DataplanePermit, self).__init__(label=label, round=round, time=time, )
if fingerprint[0] != self.__class__.__name__:
fingerprint = list(fingerprint)
fingerprint.insert(0, self.__class__.__name__)
if type(fingerprint) == list:
fingerprint = (fingerprint[0], DPFingerprint(fingerprint[1]),
fingerprint[2], fingerprint[3])
self._fingerprint = fingerprint
# TODO(cs): passive is a bit of a hack, but this was easier.
self.passive = passive
[docs] def proceed(self, simulation):
if self.passive:
return True
else:
dp_event = simulation.patch_panel.get_buffered_dp_event(self.fingerprint[1:])
if dp_event is not None:
simulation.patch_panel.permit_dp_event(dp_event)
return True
return False
@property
[docs] def fingerprint(self):
''' Fingerprint tuple format:
(class name, DPFingerprint, switch dpid, port no)
See fingerprints/messages.py for format of DPFingerprint.
'''
return self._fingerprint
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'fingerprint')
fingerprint = json_hash['fingerprint']
return DataplanePermit(fingerprint, label=label, round=round, time=time)
[docs] def to_json(self):
fields = dict(self.__dict__)
fields['class'] = self.__class__.__name__
fields['fingerprint'] = (self.fingerprint[0], self.fingerprint[1].to_dict(),
self.fingerprint[2], self.fingerprint[3])
del fields['_fingerprint']
return json.dumps(fields)
all_internal_events = [ControlMessageReceive, ControlMessageSend,
ConnectToControllers, ControllerStateChange,
DeterministicValue, DataplanePermit]
# Special events:
[docs]class SpecialEvent(Event):
[docs] def proceed(self, _):
raise RuntimeError("Should never be called!")
[docs]class InvariantViolation(SpecialEvent):
''' Class for logging violations as json dicts '''
[docs] def __init__(self, violations, label=None, round=-1, time=None):
'''
Parameters:
- violations: an array of strings specifying the invariant violation
fingerprints. Format of the strings depends on the invariant check.
Empty array means there were no violations.
- label: a unique label for this event. Internal event labels begin with 'i'
and input event labels begin with 'e'.
- time: the timestamp of when this event occured. Stored as a tuple:
[seconds since unix epoch, microseconds].
- round: optional integer. Indicates what simulation round this event occured
in.
'''
Event.__init__(self, label=label, round=round, time=time)
self.violations = [ str(v) for v in violations ]
@staticmethod
[docs] def from_json(json_hash):
(label, time, round) = extract_label_time(json_hash)
assert_fields_exist(json_hash, 'violations')
violations = json_hash['violations']
return InvariantViolation(violations, label=label, round=round, time=time)
all_special_events = [InvariantViolation]
all_events = all_input_events + all_internal_events + all_special_events
dp_events = set([DataplanePermit, DataplaneDrop])