Source code for sts.util.deferred_io

# Copyright 2011-2013 Colin Scott
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
Created on Feb 25, 2012

@author: aw, cs
'''
import logging
import Queue

log = logging.getLogger()

[docs]class DeferredIOWorker(object): ''' Wrapper class for RecocoIOWorkers. Rather than actually sending/receiving right away, queue the data. Then there are separate methods for actually sending the data via the wrapped io_worker io_worker: io_worker to wrap '''
[docs] def __init__(self, io_worker): self._io_worker = io_worker self._io_worker.set_receive_handler(self.io_worker_receive_handler) # Thread-safe read and write queues of indefinite length # TODO(cs): no need for thread safety anymore self._receive_queue = Queue.Queue() self._send_queue = Queue.Queue() # Read buffer that we present to clients self._receive_buf = "" # Whether this control channel is currently blocked. If False, passes # through packets. self._currently_blocked = False
[docs] def block(self): ''' Stop allowing data through until unblock() is called ''' self._currently_blocked = True
[docs] def unblock(self): ''' Allow data through, and flush buffers ''' self._currently_blocked = False while not self._send_queue.empty(): data = self._send_queue.get() self._actual_send(data) while not self._receive_queue.empty(): data = self._receive_queue.get() self._actual_receive(data)
[docs] def send(self, data): ''' send data from the client side. fire and forget. ''' if self._currently_blocked: self._send_queue.put(data) else: self._actual_send(data)
def _actual_send(self, data): self._io_worker.send(data) def _actual_receive(self, data): self._receive_buf += data self._client_receive_handler(self)
[docs] def set_receive_handler(self, block): ''' Called by client ''' self._client_receive_handler = block
[docs] def peek_receive_buf(self): ''' Called by client ''' return self._receive_buf
[docs] def consume_receive_buf(self, l): ''' called by client to consume receive buffer ''' assert(len(self._receive_buf) >= l) self._receive_buf = self._receive_buf[l:]
[docs] def io_worker_receive_handler(self, io_worker): ''' called from io_worker (recoco thread, after the Select loop pushes onto io_worker) ''' # Consume everything immediately data = io_worker.peek_receive_buf() io_worker.consume_receive_buf(len(data)) if self._currently_blocked: # thread-safe queue self._receive_queue.put(data) else: self._actual_receive(data)
@property
[docs] def currently_blocked(self): ''' Return whether we are currently allowing data through ''' return self._currently_blocked # ------- Delegation functions. --------- # TODO(cs): is there a more pythonic way to implement delegation?
[docs] def fileno(self): # thread safety shoudn't matter here return self._io_worker.fileno()
[docs] def close(self): self._io_worker.close
@property
[docs] def socket(self): return self._io_worker.socket