Source code for sts.control_flow.base

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pox.lib.revent
from pox.lib.revent import EventMixin
from sts.replay_event import ControllerStateChange, PendingStateChange, DeterministicValue
from sts.syncproto.base import SyncTime
from sts.syncproto.sts_syncer import STSSyncCallback
from functools import partial

from collections import Counter

log = logging.getLogger("control_flow")

[docs]class ControlFlow(object): ''' Superclass of ControlFlow types '''
[docs] def __init__(self, simulation_cfg): self.simulation_cfg = simulation_cfg self.sync_callback = None self.invariant_check_name = None
[docs] def simulate(self): ''' Move the simulation forward!''' pass
[docs] def init_results(self, results_dir): ''' Set up event log files''' pass
[docs] def get_sync_callback(self): return self.sync_callback # ---------------------------------------- # # Callbacks for controller sync messages # # ---------------------------------------- #
[docs]class StateChange(pox.lib.revent.Event):
[docs] def __init__(self, pending_state_change): super(StateChange, self).__init__() self.pending_state_change = pending_state_change
[docs]class ReplaySyncCallback(STSSyncCallback, EventMixin): _eventMixin_events = set([StateChange])
[docs] def __init__(self, get_interpolated_time=None): ''' If get_interpolated_time is None, will always wait on deterministic values. If not None, will always invoke get_interpolated_time and respond immediately''' self.get_interpolated_time = get_interpolated_time # TODO(cs): move buffering functionality into the GodScheduler? Or a # separate class? # Python's Counter object is effectively a multiset self._pending_state_changes = Counter() # Each controller can have at most one outstanding state change (since # it's blocked) # { controller id -> function to send ACK message } self.cid2ack = {} # { controller id -> function to send deterministic value responses } self.cid2deterministic_value = {} self.log = logging.getLogger("synccallback")
def _pass_through_handler(self, state_change_event): state_change = state_change_event.pending_state_change # Pass through self.ack_pending_state_change(state_change) # Record replay_event = ControllerStateChange(state_change.controller_id, state_change.fingerprint, state_change.name, state_change.value, time=state_change.time) self.passed_through_events.append(replay_event)
[docs] def set_pass_through(self): '''Cause all pending state changes to pass through without being buffered''' self.passed_through_events = [] self.addListener(StateChange, self._pass_through_handler)
[docs] def unset_pass_through(self): '''Unset pass through mode, and return any events that were passed through since pass through mode was set''' self.removeListener(self._pass_through_handler) passed_events = self.passed_through_events self.passed_through_events = [] return passed_events
[docs] def state_change_pending(self, pending_state_change): ''' Return whether the PendingStateChange has been observed ''' return self._pending_state_changes[pending_state_change] > 0
[docs] def ack_pending_state_change(self, pending_state_change): ''' ACK the pending state change, and collect the PendingStateChange from our buffer''' self._pending_state_changes[pending_state_change] -= 1 if self._pending_state_changes[pending_state_change] <= 0: del self._pending_state_changes[pending_state_change] if pending_state_change.controller_id in self.cid2ack: # Send an ACK to the controller to let it proceed self.cid2ack[pending_state_change.controller_id]() del self.cid2ack[pending_state_change.controller_id]
[docs] def flush(self): ''' ACK any pending state changes ''' num_pending_state_changes = len(self._pending_state_changes) if num_pending_state_changes > 0: self.log.info("Flushing %d pending state changes" % num_pending_state_changes) self._pending_state_changes = Counter() for cid, ack in self.cid2ack.iteritems(): ack() self.cid2ack = {}
[docs] def state_change(self, sync_type, xid, controller, time, fingerprint, name, value): # TODO(cs): xid arguably shouldn't be known to STS pending_state_change = PendingStateChange(controller.cid, time, fingerprint, name, value) self._pending_state_changes[pending_state_change] += 1 if sync_type == "SYNC": cid = controller.cid if cid in self.cid2ack: raise RuntimeError("More than one outstanding ACKs for %s" % str(cid)) self.cid2ack[cid] =\ partial(controller.sync_connection.ack_sync_notification, "StateChange", xid) self.raiseEvent(StateChange(pending_state_change))
[docs] def pending_state_changes(self): ''' Return any pending state changes ''' return self._pending_state_changes.keys()
[docs] def pending_state_changes_with_counts(self): ''' Return any pending state changes ''' return self._pending_state_changes
[docs] def get_deterministic_value(self, controller, name, xid): # TODO(cs): xid arguably shouldn't be known to STS if name != "gettimeofday": raise ValueError("unsupported deterministic value: %s" % name) # TODO(cs): need to dynamically set get_interpolated_time to not None for # peek() if self.get_interpolated_time is not None: value = self.get_interpolated_time() controller.sync_connection.send_deterministic_value(xid, value) else: self.cid2deterministic_value[controller.cid] =\ partial(controller.sync_connection.send_deterministic_value, xid)
[docs] def pending_deterministic_value_request(self, controller_id): return controller_id in self.cid2deterministic_value
[docs] def send_deterministic_value(self, controller_id, value): self.cid2deterministic_value[controller_id](value) del self.cid2deterministic_value[controller_id]
[docs]class RecordingSyncCallback(STSSyncCallback):
[docs] def __init__(self, input_logger, record_deterministic_values=False): self.input_logger = input_logger self.record_deterministic_values = record_deterministic_values
[docs] def state_change(self, sync_type, xid, controller, time, fingerprint, name, value): # TODO(cs): xid arguably shouldn't be known to STS if self.input_logger is not None: self.input_logger.log_input_event(ControllerStateChange(controller.cid, fingerprint, name, value, time=time)) if sync_type == "SYNC": controller.sync_connection.ack_sync_notification("StateChange", xid)
[docs] def get_deterministic_value(self, controller, name, xid): # TODO(cs): xid arguably shouldn't be known to STS value = None if name == "gettimeofday": value = SyncTime.now() else: raise ValueError("unsupported deterministic value: %s" % name) # TODO(cs): implement Andi's improved gettime heuristic if self.record_deterministic_values: self.input_logger.log_input_event(DeterministicValue(controller.cid, name, value, time=value)) controller.sync_connection.send_deterministic_value(xid, value)