Source code for penchy.server

"""
Initiates multiple JVM Benchmarks and accumulates the results.

 .. moduleauthor:: Fabian Hirschmann <fabian@hirschmann.email>

 :copyright: PenchY Developers 2011-2012, see AUTHORS
 :license: MIT License, see LICENSE
"""
import logging
import os
import signal
import threading
from penchy.compat import SimpleXMLRPCServer, nested

from penchy.maven import make_bootstrap_pom
from penchy.util import make_bootstrap_client
from penchy.node import Node


log = logging.getLogger(__name__)


[docs]class Server(object): """ This class represents the server. """ _rcv_lock = threading.Lock() def __init__(self, config, job): """ :param config: config module to use :type config: module :param job: module of job to execute :type job: module """ self.config = config self.job = job.job self.job_file = job.__file__ # The dict of results we will receive (SystemComposition : result) self.results = {} # The dict of Timers which implement timeouts self.timers = {} # additional arguments to pass to the bootstrap client self.bootstrap_args = [] # List of nodes to upload to self.nodes = dict((n.node_setting.identifier, Node(n.node_setting, job)) for n in self.job.compositions) # Files to upload self.uploads = ( (job.__file__,), (self.config.__file__, 'config.py')) # Set up the listener self.server = SimpleXMLRPCServer( (config.SERVER_HOST, config.SERVER_PORT), allow_none=True) self.server.register_function(self.exp_rcv_data, 'rcv_data') self.server.register_function(self.exp_report_error, 'report_error') self.server.register_function(self.exp_set_timeout, 'set_timeout') # This sets the timeout after which self.server.handle_request() should # return. This should be a nonzero value, because we are running it # in a loop until we have received all results. However, timeouts # can occur while running handle_request() in which case handle_request() # would run forever without actually expecting anymore results. self.server.timeout = 2 # Set up the thread which is deploying the job self.client_thread = self._setup_client_thread() # Signal handler signal.signal(signal.SIGTERM, self._signal_handler) def _setup_client_thread(self): """ Sets up the client threads. :returns: the thread object :rtype: :class:`threading.Thread` """ thread = threading.Thread(target=self.run_clients) thread.daemon = True return thread def _signal_handler(self, signum, frame): """ Handles signals sent to this process. :param signum: signal number as defined in the ``signal`` module :type signum: int :param frame: execution frame :type frame: frame object """ log.info('Received signal %s ' % signum) if signum == signal.SIGTERM: for node in self.nodes.values(): node.close() self.server.server_close()
[docs] def node_for(self, setting): """ Find the Node for a given :class:`~penchy.jobs.job.NodeSetting`. :param setting: setting identifier to receive Node for :type setting: string :returns: the Node :rtype: :class:`~penchy.node.Node` """ return self.nodes[setting.identifier]
[docs] def composition_for(self, hashcode): """ Find the :class:`~penchy.jobs.job.SystemComposition` for a given hashcode. :param hashcode: hashcode of the wanted composition :type hashcode: string :returns: the system composition :rtype: :class:`~penchy.jobs.job.SystemComposition` """ for composition in self.job.compositions: if hashcode == composition.hash(): return composition raise ValueError('Composition not found')
[docs] def exp_rcv_data(self, hashcode, result): """ Receive data from nodes. :param hashcode: the hashcode to identify the :class:`~penchy.jobs.job.SystemComposition` by :type hashcode: string :param result: the result of the job :type result: dict """ composition = self.composition_for(hashcode) with Server._rcv_lock: node = self.node_for(composition.node_setting) node.received(composition) self.results[composition] = result log.info('Received result. Waiting for %s more.' % self.remaining_compositions)
[docs] def exp_report_error(self, hashcode, reason=None): """ Deal with client-side errors. Call this for each composition for which a job failed. :param hashcode: the hashcode to identify the :class:`~penchy.jobs.job.SystemComposition` :type hashcode: string :type reason: reason for the error; please note that this is used when you'd like to display an error right away without waiting for the server to fetch the logs from the node. """ composition = self.composition_for(hashcode) with Server._rcv_lock: node = self.node_for(composition.node_setting) node.received(composition) if reason: node.log.error(reason)
[docs] def exp_set_timeout(self, hashcode, timeout): """ Sets the timeout for the node identified by ``hashcode`` to ``timeout`` :param hashcode: hashcode for composition :type hashcode: string :param timeout: timeout in seconds :type timeout: int """ composition = self.composition_for(hashcode) with Server._rcv_lock: if hashcode in self.timers: self.timers[hashcode].cancel() if timeout > 0: self.timers[hashcode] = threading.Timer(timeout, lambda: self._on_timeout(hashcode)) self.timers[hashcode].start() log.debug('Timeout set to %s for %s' % (timeout, composition))
def _on_timeout(self, hashcode): """ Called when a timeout occurs for the node identified by ``hashcode``. :param hashcode: hashcode of the node :type hashcode: string """ composition = self.composition_for(hashcode) node = self.node_for(composition.node_setting) with node.connection_required(): node.kill_composition() log.error('%s timed out.' % self.composition_for(hashcode)) @property def received_all_results(self): """ Indicates wheter we have received results for *all* :class:`~penchy.jobs.job.SystemComposition`. """ return all([n.received_all_results for n in self.nodes.values()]) @property def remaining_compositions(self): """ Number of composition we are still waiting for. """ return sum([len(n.expected) for n in self.nodes.values()])
[docs] def run_clients(self): """ Run the client on all nodes. """ with nested(make_bootstrap_pom(), make_bootstrap_client()) \ as (pom, bclient): for node in self.nodes.values(): with node.connection_required(): for upload in self.uploads: node.put(*upload) node.put(pom.name, 'bootstrap.pom') node.put(bclient.name, 'penchy_bootstrap') node.execute_penchy(' '.join( self.bootstrap_args + [os.path.basename(self.job_file), 'config.py', node.setting.identifier]))
[docs] def run(self): """ Run the server component. """ self.client_thread.start() try: while not self.received_all_results: self.server.handle_request() if self.results: self.run_pipeline() else: log.error('Received no results. Server-pipline was not executed.') except KeyboardInterrupt: log.warning('Keyboard Interrupt - Shutting down, please wait') finally: for node in self.nodes.values(): node.close()
[docs] def run_pipeline(self): """ Called when we have received results for *all* compositions; starts the server-side pipeline. """ log.info('Run server-side pipeline.') self.job.filename = self.job_file self.job.receive = lambda: self.results self.job.run_server_pipeline()