Source code for sts.syncproto.sts_syncer

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This is STS's end of the sync protocol. Listens to all controller-specific
# syncers and dispatches messages to STS handlers.

from sts.syncproto.base import SyncProtocolSpeaker, SyncMessage, SyncTime, SyncIODelegate

from pox.lib.util import parse_openflow_uri, connect_socket_with_backoff

import logging
log = logging.getLogger("sts_sync_proto")

[docs]class STSSyncProtocolSpeaker(SyncProtocolSpeaker):
[docs] def __init__(self, controller, state_master, io_delegate): if state_master is None: raise ValueError("state_master is null") self.state_master = state_master self.controller = controller handlers = { ("ASYNC", "StateChange"): self._log_async_state_change, ("SYNC", "StateChange"): self._log_sync_state_change, ("REQUEST", "DeterministicValue"): self._get_deterministic_value } SyncProtocolSpeaker.__init__(self, handlers, io_delegate)
def _log_async_state_change(self, message): self.state_master.state_change("ASYNC", message.xid, self.controller, message.time, message.fingerPrint, message.name, message.value) def _log_sync_state_change(self, message): # Note: control_flow needs to register a handler on state_master to ACK the # controller self.state_master.state_change("SYNC", message.xid, self.controller, message.time, message.fingerPrint, message.name, message.value) def _get_deterministic_value(self, message): self.state_master.get_deterministic_value(self.controller, message.name, message.xid)
[docs]class STSSyncConnection(object): """ A connection to a controller with the sts sync protocol """
[docs] def __init__(self, controller, state_master, sync_uri): self.controller = controller (self.mode, self.host, self.port) = parse_openflow_uri(sync_uri) if state_master is None: raise ValueError("state_master is null") self.state_master = state_master self._on_disconnect = [] self.io_delegate = None self.speaker = None
[docs] def on_disconnect(self, func): self._on_disconnect.append(func)
[docs] def connect(self, io_master): if self.mode != "tcp": raise RuntimeError("only tcp (active) mode supported by now") socket = connect_socket_with_backoff(self.host, self.port) self.io_delegate = SyncIODelegate(io_master, socket) self.speaker = STSSyncProtocolSpeaker(controller=self.controller, state_master=self.state_master, io_delegate=self.io_delegate)
[docs] def disconnect(self): self.io_delegate.close() for handler in self._on_disconnect: handler(self)
[docs] def close(self): self.disconnect()
[docs] def get_nom_snapshot(self): if self.speaker: return self.speaker.sync_request("NOMSnapshot", "", timeout=10) else: log.warn("STSSyncConnection: not connected. cannot handle requests")
[docs] def ack_sync_notification(self, messageClass, xid): if self.speaker: return self.speaker.ack_sync_notification(messageClass, xid) else: log.warn("STSSyncConnection: not connected. cannot ACK")
[docs] def send_deterministic_value(self, xid, value): if self.speaker: msg = SyncMessage(type="RESPONSE", messageClass="DeterministicValue", time=value, xid=xid, value=value) return self.speaker.send(msg) else: log.warn("STSSyncConnection: not connected. cannot ACK")
[docs]class STSSyncConnectionManager(object): """the connection manager for the STS sync protocols. TODO: finish"""
[docs] def __init__(self, io_master, state_master): self.io_master = io_master self.sync_connections = [] if state_master is None: raise ValueError("state_master is null") self.state_master = state_master
[docs] def connect(self, controller, sync_uri): s = STSSyncConnection(controller=controller, state_master=self.state_master, sync_uri=sync_uri) s.connect(self.io_master) s.on_disconnect(self.remove_connection) self.sync_connections.append(s) return s
[docs] def remove_connection(self, connection): if connection in self.sync_connections: self.sync_connections.remove(connection)
[docs]class STSSyncCallback(object): """ override with your favorite functionality """
[docs] def state_change(self, type, controller, time, fingerprint, name, value): log.info("{}: controller: {} time: {} fingerprint: {} name: {} value: {}"\ .format(type, controller, time, fingerprint, name, value))
[docs] def get_deterministic_value(self, controller, name): if name == "gettimeofday": return SyncTime.now()