# System Imports
import os
import socket
import threading
import logging
# Local Imports
from . import version
from . import bases
from . import common
from .common import server
from .common.log import RotatingMemoryHandler
from . import log_formatter
[docs]class InstrumentManager(object):
"""
Labtronyx InstrumentManager
Facilitates communication with instruments using all available interfaces.
:param server: Enable RPC endpoint
:type server: bool
:param server_port: RPC endpoint port
:type server_port: int
:param plugin_dirs: List of directories containing plugins
:type plugin_dirs: list
:param logger: Logger
:type logger: logging.Logger
"""
name = 'Labtronyx'
longname = 'Labtronyx Instrumentation Control Framework'
SERVER_PORT = 6780
ZMQ_PORT = 6781
def __init__(self, **kwargs):
# Configurable instance variables
self.server_port = kwargs.get('server_port', self.SERVER_PORT)
if 'logger' in kwargs:
self.logger = kwargs.get('logger', )
else:
self.logger = logging.getLogger('labtronyx')
# Memory logger
self._handler_mem = RotatingMemoryHandler(100)
self._handler_mem.setFormatter(log_formatter)
self.logger.addHandler(self._handler_mem)
# Initialize PYTHONPATH
self.rootPath = os.path.dirname(os.path.realpath(os.path.join(__file__, os.curdir)))
# if self.rootPath not in sys.path:
# sys.path.append(self.rootPath)
# Announce Version
self.logger.info(self.longname)
self.logger.info("Version: %s", version.ver_full)
self.logger.debug("Build Date: %s", version.build_date)
# Library paths to search for plugins
self.plugin_manager = common.plugin.plugin_manager
self.plugin_manager.logger = self.logger
dirs = [os.path.join(self.rootPath, dir) for dir in ['drivers', 'interfaces']]
dirs += kwargs.get('plugin_dirs', [])
for dir in dirs:
self.logger.info("Plugin search directory: %s", dir)
self.plugin_manager.search(dir)
# Start Interfaces
for i_fqn, i_cls in self.plugin_manager.getPluginsByBaseClass(bases.InterfaceBase).items():
self.enableInterface(i_fqn)
# Create the flask server app
self._server_app = server.create_server(self, self.server_port, logger=self.logger)
self._server_events = common.events.EventPublisher(self.ZMQ_PORT)
# Start Server
if kwargs.get('server', False):
if not self.server_start():
raise EnvironmentError("Unable to start Labtronyx Server")
def __del__(self):
try:
self.server_stop()
except:
pass
self._close()
def _close(self):
"""
Close all resources and interfaces
"""
# Disable all interfaces, close all resources
for inter_uuid, inter_obj in self.interfaces.items():
self.disableInterface(inter_uuid)
self.plugin_manager.destroyAllPluginInstances()
[docs] def getLog(self):
"""
Get the last 100 log entries
:return: list
"""
return self._handler_mem.getBuffer()
#===========================================================================
# Labtronyx Server
#===========================================================================
[docs] def server_start(self, new_thread=True):
"""
Start the API/RPC Server and Event publisher. If `new_thread` is True, the server will be started in a new
thread in a non-blocking fashion.
If a server is already running, it will be stopped and then restarted.
:returns: True if successful, False otherwise
:rtype: bool
"""
SERVER_THREAD_NAME = 'Labtronyx-Server'
# Clean out old server, if any exists
threads = [th.name for th in threading.enumerate()]
if SERVER_THREAD_NAME in threads:
self.server_stop()
# Server start command
from werkzeug.serving import run_simple
srv_start_cmd = lambda: run_simple(
hostname=self.getHostname(), port=self.server_port, application=self._server_app,
threaded=True, use_debugger=True
)
# Instantiate server
try:
# Start event publisher
self._server_events.start()
if new_thread:
server_thread = threading.Thread(name=SERVER_THREAD_NAME, target=srv_start_cmd)
server_thread.setDaemon(True)
server_thread.start()
return True
else:
srv_start_cmd()
except:
self.logger.exception("Exception during server start")
self.server_stop()
[docs] def server_stop(self):
"""
Stop the Server
"""
# Signal the event
self._publishEvent(common.events.EventCodes.manager.shutdown)
# Stop the event publisher
try:
self._server_events.stop()
except:
pass
# Shutdown server
try:
# Must use the REST API to shutdown
import urllib2
url = 'http://{0}:{1}/api/shutdown'.format(self.getHostname(), self.server_port)
resp = urllib2.Request(url)
handler = urllib2.urlopen(resp, timeout=0.5)
if handler.code == 200:
self.logger.debug('Server stopped')
else:
self.logger.error('Server stop returned code: %d', handler.code)
except:
pass
@staticmethod
[docs] def getVersion():
"""
Get the Labtronyx version
:rtype: dict{str: str}
"""
return {
'version': version.ver_sem,
'version_full': version.ver_full,
'build_date': version.build_date,
'git_revision': version.git_revision
}
[docs] def getAddress(self):
"""
Get the local IP Address
:returns: str
"""
return socket.gethostbyname(self.getHostname())
@staticmethod
[docs] def getHostname():
"""
Get the local hostname
:rtype: str
"""
return socket.gethostname()
############################################################################
# Plugin Operations
############################################################################
[docs] def addPluginSearchDirectory(self, path):
"""
Add a search path for plugins. Directory will be searched immediately and all discovered plugins will be
cataloged. If a search path has already been searched, it will not be searched again. Interfaces will not be
started automatically. Use :func:`enableInterface` to start the interface once it has been discovered.
:param path: Search directory
:type path: str
"""
self.plugin_manager.search(path)
[docs] def getPluginSearchDirectories(self):
"""
Get a list of all the searched paths for plugins.
:rtype: list
"""
return self.plugin_manager.directories
[docs] def getAttributes(self):
"""
Get the class attributes for all loaded plugins. Dictionary keys are the Fully Qualified Names (FQN) of the
plugins
:rtype: dict[str:dict]
"""
return self.plugin_manager.getAllPluginInfo()
[docs] def getProperties(self):
"""
Returns the property dictionaries for all resources
:rtype: dict[str:dict]
"""
ret = {}
ret.update({uuid: pObj.getProperties() for uuid, pObj in self.interfaces.items()})
ret.update({uuid: pObj.getProperties() for uuid, pObj in self.resources.items()})
ret.update({uuid: pObj.getProperties() for uuid, pObj in self.scripts.items()})
return ret
#===========================================================================
# Event Publishing
#===========================================================================
def _publishEvent(self, event, *args, **kwargs):
"""
Private method called internally to signal events. Calls handlers and dispatches event signal to subscribed
clients
:param event: event
:type event: str
"""
if hasattr(self, '_server_events'):
self._server_events.publishEvent(event, *args, **kwargs)
# ===========================================================================
# Interface Operations
# ===========================================================================
@property
def interfaces(self):
"""
:returns: Dictionary of interface plugin instances {UUID -> Interface Object}
:rtype: dict[str:labtronyx.bases.interface.InterfaceBase]
"""
return self.plugin_manager.getPluginInstancesByBaseClass(bases.InterfaceBase)
[docs] def listInterfaces(self):
"""
Get a list of interface names that are enabled
:returns: Interface names
:rtype: list[str]
"""
return [intObj.interfaceName for int_uuid, intObj in self.interfaces.items()]
[docs] def enableInterface(self, interfaceName, **kwargs):
"""
Enable or restart an interface. Use this method to pass parameters to an interface. If an interface with the
same name is already running, it will be stopped first. Each interface may only have one instance at a time
(Singleton pattern).
:param interfaceName: Interface Name (`interfaceName` attribute of plugin) or plugin FQN
:type interfaceName: str
:rtype: bool
:raises: KeyError
"""
if interfaceName in self.listInterfaces():
raise KeyError("Interface already enabled")
interfaceClasses = self.plugin_manager.getPluginsByBaseClass(bases.InterfaceBase)
interfacesByName = {v.interfaceName: k for k, v in interfaceClasses.items()}
# Resolve interfaceName as a plugin FQN
if interfaceName in interfaceClasses:
fqn = interfaceName
elif interfaceName in interfacesByName:
fqn = interfacesByName.get(interfaceName)
else:
raise KeyError("Interface not found or not available")
if self.plugin_manager.validatePlugin(fqn):
try:
# Instantiate interface
int_obj = self.plugin_manager.createPluginInstance(fqn, manager=self, logger=self.logger, **kwargs)
# Call the plugin hook to open the interface. Ensure interface opens correctly
if int_obj.open():
self.logger.info("Started Interface: %s", interfaceName)
self._publishEvent(common.events.EventCodes.interface.created, int_obj.interfaceName)
int_obj.enumerate()
return True
else:
self.logger.error("Interface %s failed to open", interfaceName)
self.disableInterface(int_obj.uuid)
return False
except:
self.logger.exception("Exception during enableInterface")
return False
else:
self.logger.error("Invalid plugin: %s", fqn)
return False
[docs] def disableInterface(self, interface):
"""
Disable an interface that is running.
:param interface: Interface Name (`interfaceName` attribute of plugin) or Interface UUID
:type interface: str
:rtype: bool
"""
interfaces = {plug_obj.uuid: plug_obj for plug_uuid, plug_obj
in self.plugin_manager.getPluginInstancesByBaseClass(bases.InterfaceBase).items()
if plug_obj.interfaceName == interface or plug_obj.uuid == interface}
for inter_uuid, inter in interfaces.items():
try:
# Call the plugin hook to close the interface
inter.close()
self.logger.info("Stopped Interface: %s" % inter.fqn)
self._publishEvent(common.events.EventCodes.interface.destroyed, inter.interfaceName)
self.plugin_manager.destroyPluginInstance(inter_uuid)
except:
self.logger.exception("Exception during interface close")
return False
return True
# ===========================================================================
# Resource Operations
# ===========================================================================
[docs] def refresh(self):
"""
Refresh all interfaces and resources. Attempts enumeration on all interfaces, then calls the `refresh`
method for all resources.
"""
for inter_uuid, inter_obj in self.interfaces.items():
# Discover any new resources
inter_obj.refresh()
@property
def resources(self):
all_resources = {}
for interfaceName, interfaceObj in self.interfaces.items():
for resID, resourceObj in interfaceObj.resources.items():
all_resources[resourceObj.uuid] = resourceObj
return all_resources
[docs] def listResources(self):
"""
Get a list of UUIDs for all resources
:deprecated: Deprecated by :func:`getProperties`
:rtype: list[str]
"""
return self.resources.keys()
[docs] def getResource(self, interfaceName, resID, driverName=None):
"""
Get a resource by name from the specified interface. Not supported by all interfaces, see interface
documentation for more details.
:param interfaceName: Interface name
:type interfaceName: str
:param resID: Resource Identifier
:type resID: str
:param driverName: Driver to load for resource
:type driverName: str
:returns: Resource object
:rtype: labtronyx.bases.resource.ResourceBase
:raises: InterfaceUnavailable
:raises: ResourceUnavailable
:raises: InterfaceError
"""
# NON-SERIALIZABLE
if interfaceName not in self.listInterfaces():
raise common.errors.InterfaceUnavailable('Interface not found')
int_obj = [intObj for int_uuid, intObj in self.interfaces.items() if intObj.interfaceName == interfaceName][0]
try:
# Call the interface getResource hook
res = int_obj.getResource(resID)
# Attempt to load the specified driver, but do not force
res.loadDriver(driverName)
return res
except NotImplementedError:
raise common.errors.InterfaceError('Operation not support by interface %s' % interfaceName)
[docs] def findResources(self, **kwargs):
"""
Get a list of resources/instruments that match the parameters specified.
Parameters can be any key found in the resource property dictionary, such as these:
:param uuid: Unique Resource Identifier (UUID)
:param interface: Interface
:param resourceID: Interface Resource Identifier (Port, Address, etc.)
:param resourceType: Resource Type (Serial, VISA, etc.)
:param deviceVendor: Instrument Vendor
:param deviceModel: Instrument Model Number
:param deviceSerial: Instrument Serial Number
:rtype: list[labtronyx.bases.resource.ResourceBase]
"""
# NON-SERIALIZABLE
matching_instruments = self.plugin_manager.searchPluginInstances(pluginType='resource', **kwargs)
return matching_instruments.values()
[docs] def findInstruments(self, **kwargs):
"""
Alias for :func:`findResources`
"""
# NON-SERIALIZABLE
return self.findResources(**kwargs)
# ===========================================================================
# Script Operations
# ===========================================================================
@property
def scripts(self):
"""
:returns: Dictionary of script plugin instances {UUID -> Script object}
:rtype: dict[str:labtronyx.bases.script.ScriptBase]
"""
return self.plugin_manager.getPluginInstancesByBaseClass(bases.ScriptBase)
[docs] def openScript(self, script_fqn, **kwargs):
"""
Create an instance of a script. The script must have already been loaded by the plugin manager. Any required
script parameters can be provided using keyword arguments.
:param script_fqn: Fully Qualified Name of the script plugin
:type script_fqn: str
:returns: Script Instance UUID
:rtype: str
:raises: KeyError
:raises: RuntimeError
"""
script_classes = self.plugin_manager.getPluginsByBaseClass(bases.ScriptBase)
if script_fqn not in script_classes:
raise KeyError("Script plugin could not be found")
if self.plugin_manager.validatePlugin(script_fqn):
scriptObj = self.plugin_manager.createPluginInstance(script_fqn, manager=self, logger=self.logger, **kwargs)
script_uuid = scriptObj.uuid
self._publishEvent(common.events.EventCodes.script.created, script_uuid)
return script_uuid
else:
raise RuntimeError("Script is invalid")
[docs] def runScript(self, script_uuid):
"""
Run a script that has been previously opened using :func:`openScript`. The script is run in a separate thread
:param script_uuid: Script Instance UUID
:type script_uuid: str
:raises: KeyError
:raises: ThreadError
"""
scriptObj = self.scripts.get(script_uuid)
if scriptObj is None:
raise KeyError("Script instance could not be found")
scriptThread = threading.Thread(target=scriptObj.start, name="script-"+script_uuid)
scriptThread.setDaemon(True)
scriptThread.start()
[docs] def stopScript(self, script_uuid):
"""
Stop a script that is currently running. Does not currently do anything.
:param script_uuid: Script Instance UUID
:type script_uuid: str
:raises: KeyError
"""
scriptObj = self.scripts.get(script_uuid)
if scriptObj is None:
raise KeyError("Script instance could not be found")
scriptObj.stop()
[docs] def closeScript(self, script_uuid):
"""
Destroy a script instance that is not currently running.
:param script_uuid: Script Instance UUID
:type script_uuid: str
:rtype: bool
"""
scriptObj = self.scripts.get(script_uuid)
if scriptObj is None:
return True
if scriptObj.isRunning():
return False
self.plugin_manager.destroyPluginInstance(script_uuid)