# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Base class for all (python) controller-specific syncer modules, as well as
# the STS syncer. (Runs in both the STS process and the controller
# process(es))
import collections
import itertools
import logging
import time
import socket
# TODO(cs): specific to POX!
from pox.lib.ioworker.io_worker import JSONIOWorker
log = logging.getLogger("sync_connection")
[docs]def unpatched_time():
if hasattr(time, "_orig_time"):
return time._orig_time()
else:
return time.time()
[docs]class SyncTime(collections.namedtuple('SyncTime', ('seconds', 'microSeconds'))):
""" ValueObject that models the microsecond timestamps used in STS Sync Messages """
def __new__(cls, seconds, microSeconds):
return super(cls, SyncTime).__new__(cls, seconds, microSeconds)
@staticmethod
[docs] def now():
# if time.time has been patched by sts then we don't want to fall into this
# trap ourselves
if(hasattr(time, "_orig_time")):
now = time._orig_time()
else:
now = time.time()
return SyncTime( int(now), int((now * 1000000) % 1000000))
[docs] def as_float(self):
return float(self.seconds) + float(self.microSeconds) / 1e6
[docs]class SyncMessage(collections.namedtuple('SyncMessage', ('type', 'messageClass', 'time', 'xid', 'name', 'value', 'fingerPrint'))):
""" value object that models a message in the STS sync protocol """
def __new__(cls, type, messageClass, time=None, xid=None, name=None, value=None, fingerPrint=None):
if type not in ("ASYNC", "SYNC", "ACK", "REQUEST", "RESPONSE"):
raise ValueError("SyncMessage: type must one of (ASYNC, SYNC, ACK, REQUEST, RESPONSE)")
if (type == "RESPONSE" or type == "ACK") and xid is None:
raise ValueError("SyncMessage: xid must be given for messages of type %s" % type)
if time is None:
time = SyncTime.now()
elif isinstance(time, SyncTime):
pass
elif isinstance(time, list):
time = SyncTime(*time)
elif isinstance(time, dict):
time = SyncTime(**time)
else:
raise ValueError("Unknown type %s (repr %s) for time" % (time.__class__, repr(time)))
return super(cls, SyncMessage).__new__(cls, type=type, messageClass=messageClass, time=time, xid=xid, name=name, value=value, fingerPrint=fingerPrint)
[docs]class SyncIODelegate(object):
[docs] def __init__(self, io_master, socket):
self.io_master = io_master
self.io_worker = JSONIOWorker(self.io_master.create_worker_for_socket(socket))
[docs] def wait_for_message(self, timeout=None):
self.io_master.select(timeout)
[docs] def send(self, msg):
self.io_worker.send(msg)
[docs] def get_on_message_received(self):
return self.io_worker.on_json_received
[docs] def set_on_message_received(self, f):
self.io_worker.on_json_received = lambda io_worker, msg: f(msg)
[docs] def close(self):
self.io_worker.close()
on_message_received = property(get_on_message_received, set_on_message_received)
[docs]class SyncProtocolSpeaker(object):
""" speaks the sts sync protocol """
[docs] def __init__(self, handlers, io_delegate, collect_stats=True):
self.xid_generator = itertools.count(1)
self.io = io_delegate
self.sent_xids = set()
self.listener = SyncProtocolListener(handlers, io_delegate,
collect_stats=collect_stats)
[docs] def message_with_xid(self, message):
if message.xid:
return message
else:
return message._replace(xid=self.xid_generator.next())
[docs] def send(self, message):
''' Send a message you don't expect a response from '''
message = self.message_with_xid(message)
if((message.type, message.xid) in self.sent_xids):
raise RuntimeError("Error sending message %s: XID %d already sent" % (str(message), message.xid))
self.sent_xids.add( (message.type, message.xid) )
self.io.send(message._asdict())
return message
[docs] def async_notification(self, messageClass, fingerPrint, value):
# Don't really need an xid..
message = self.message_with_xid(SyncMessage(type="ASYNC",
messageClass=messageClass,
fingerPrint=fingerPrint,
value=value))
self.send(message)
[docs] def sync_notification(self, messageClass, fingerPrint, value):
message = self.message_with_xid(SyncMessage(type="SYNC",
messageClass=messageClass,
fingerPrint=fingerPrint,
value=value))
self.send(message)
return self.listener.wait_for_xaction(message)
[docs] def ack_sync_notification(self, messageClass, xid):
message = SyncMessage(type="ACK", messageClass=messageClass, xid=xid)
self.send(message)
[docs] def sync_request(self, messageClass, name, timeout=None):
''' Send a message you expect a response from.
Note: Blocks this thread until a response is received!'''
message = self.message_with_xid(SyncMessage(type="REQUEST", messageClass=messageClass, name=name))
self.send(message)
return self.listener.wait_for_xaction(message, timeout)
[docs]class SyncProtocolListener(object):
''' Speaker delegates to this class to wait on messages '''
[docs] def __init__(self, handlers, io_delegate, collect_stats=True,
delay_threshold_ms=3.0):
self.handlers = handlers
self.collect_stats = collect_stats
self.delay_threshold_ms = delay_threshold_ms
self.waiting_xids = {}
self.received_responses = {}
self.io = io_delegate
self.io.on_message_received = self.on_message_received
[docs] def on_message_received(self, msg_hash):
message = SyncMessage(**msg_hash)
key = (message.type, message.messageClass)
if (message.type == "RESPONSE" or message.type == "ACK") and message.xid in self.waiting_xids:
del self.waiting_xids[message.xid]
self.received_responses[message.xid] = message
return
if key not in self.handlers:
raise ValueError("%s: No message handler for: %s\nKnown handlers are: %s" % (type(self).__name__, str(key), ", ".join(map(lambda h: str(h), self.handlers))))
# dispatch message
self.handlers[key](message)
[docs] def wait_for_xaction(self, message, timeout=None):
xid = message.xid
self.waiting_xids[xid] = message
start = unpatched_time()
# Blocks this thread!
while not xid in self.received_responses:
if timeout:
now = unpatched_time()
if now - start > timeout:
raise socket.timeout()
to_wait = timeout - (now-start)
else:
to_wait = None
self.io.wait_for_message(to_wait)
if self.collect_stats:
end = unpatched_time()
ms_elapsed = (end - start) * 1000
if ms_elapsed > self.delay_threshold_ms:
if hasattr(log, "_orig_log"):
log._orig_log(logging.DEBUG, "Spent %.02f milliseconds waiting on %s" %
(ms_elapsed, str(message)), [])
else:
log._log(logging.DEBUG, "Spent %.02f milliseconds waiting on %s" %
(ms_elapsed, str(message)), [])
response = self.received_responses.pop(xid)
return response.value