Source code for sardana.pool.poolmeasurementgroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolMeasurementGroup"]

__docformat__ = 'restructuredtext'

import threading

try:
    from taurus.core.taurusvalidator import AttributeNameValidator as\
        TangoAttributeNameValidator
except ImportError:
    # TODO: For Taurus 4 compatibility
    from taurus.core.tango.tangovalidator import TangoAttributeNameValidator

from sardana import State, ElementType, \
    TYPE_EXP_CHANNEL_ELEMENTS, TYPE_TIMERABLE_ELEMENTS
from sardana.sardanaevent import EventType
from sardana.pool.pooldefs import (AcqMode, AcqSynchType, SynchParam, AcqSynch,
                                   SynchDomain)
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolacquisition import PoolAcquisition
from sardana.pool.poolexternal import PoolExternalObject

from sardana.taurus.core.tango.sardana import PlotType, Normalization


# ----------------------------------------------
# Measurement Group Configuration information
# ----------------------------------------------
# dict <str, obj> with (at least) keys:
#    - 'timer' : the timer channel name / timer channel id
#    - 'monitor' : the monitor channel name / monitor channel id
#    - 'controllers' : dict<Controller, dict> where:
#        - key: ctrl
#        - value: dict<str, dict> with (at least) keys:
#                - 'timer' : the timer channel name / timer channel id
#                - 'monitor' : the monitor channel name / monitor channel id
#                - 'synchronization' : 'Gate'/'Software'
#                - 'channels' where value is a dict<str, obj> with (at least)
#                   keys:
#                    - 'id' : the channel name ( channel id )
#                    optional keys:
#                    - 'enabled' : True/False (default is True)
#                    any hints:
#                    - 'output' : True/False (default is True)
#                    - 'plot_type' : 'No'/'1D'/'2D' (default is 'No')
#                    - 'plot_axes' : list<str> 'where str is channel
#                                    name/'step#/'index#' (default is [])
#                    - 'label' : prefered label (default is channel name)
#                    - 'scale' : <float, float> with min/max (defaults to
#                                channel range if it is defined
#                    - 'plot_color' : int representing RGB
#    optional keys:
#    - 'label' : measurement group label (defaults to measurement group name)
#    - 'description' : measurement group description

# <MeasurementGroupConfiguration>
#  <timer>UxTimer</timer>
#  <monitor>CT1</monitor>
# </MeasurementGroupConfiguration>

# Example: 2 NI cards, where channel 1 of card 1 is wired to channel 1 of
# card 2 at configuration time we should set:

# ni0ctrl.setCtrlPar(0, 'synchronization', AcqSynch.SoftwareTrigger)
# ni0ctrl.setCtrlPar(0, 'timer', 1) # channel 1 is the timer
# ni0ctrl.setCtrlPar(0, 'monitor', 4) # channel 4 is the monitor
# ni1ctrl.setCtrlPar(0, 'synchronization', AcqSynch.HardwareTrigger)
# ni1ctrl.setCtrlPar(0, 'master', 0)

# when we count for 1.5 seconds:
# ni1ctrl.Load(1.5)
# ni0ctrl.Load(1.5)
# ni1ctrl.Start()
# ni0ctrl.Start()

"""

"""


def _to_fqdn(name, logger=None):
    full_name = name
    # try to use Taurus 4 to retrieve FQDN
    try:
        from taurus.core.tango.tangovalidator import TangoDeviceNameValidator
        full_name, _, _ = TangoDeviceNameValidator().getNames(name)
    # if Taurus3 in use just continue
    except ImportError:
        pass
    if full_name != name and logger:
        msg = ("PQDN full name is deprecated in favor of FQDN full name."
               " Re-apply configuration in order to upgrade.")
        logger.warning(msg)
    return full_name


[docs]class PoolMeasurementGroup(PoolGroupElement): DFT_DESC = 'General purpose measurement group' def __init__(self, **kwargs): self._state_lock = threading.Lock() self._monitor_count = None self._repetitions = 1 self._acquisition_mode = AcqMode.Timer self._config = None self._config_dirty = True self._moveable = None self._moveable_obj = None self._synchronization = [] # dict with channel and its acquisition synchronization # key: PoolBaseChannel; value: AcqSynch self._channel_to_acq_synch = {} # dict with controller and its acquisition synchronization # key: PoolController; value: AcqSynch self._ctrl_to_acq_synch = {} kwargs['elem_type'] = ElementType.MeasurementGroup PoolGroupElement.__init__(self, **kwargs) configuration = kwargs.get("configuration") self.set_configuration(configuration) # if the configuration was never "really" written e.g. newly created MG # just sets it now so the _channe_to_acq_synch and _ctrl_to_acq_synch # are properly populated # TODO: make it more elegant if configuration is None: configuration = self.get_configuration() self.set_configuration(configuration, propagate=0, to_fqdn=False) def _create_action_cache(self): acq_name = "%s.Acquisition" % self._name return PoolAcquisition(self, acq_name) def _calculate_states(self, state_info=None): state, status = PoolGroupElement._calculate_states(self, state_info) # check if software synchronizer is occupied synch_soft = self.acquisition._synch._synch_soft acq_sw = self.acquisition._sw_acq acq_0d = self.acquisition._0d_acq if state in (State.On, State.Unknown) \ and (synch_soft.is_started() or acq_sw._is_started() or acq_0d._is_started()): state = State.Moving status += "/nSoftware synchronization is in progress" return state, status
[docs] def on_element_changed(self, evt_src, evt_type, evt_value): name = evt_type.name if name == 'state': with self._state_lock: state, status = self._calculate_states() self.set_state(state, propagate=2) self.set_status("\n".join(status))
[docs] def get_pool_controllers(self): return self.get_acquisition().get_pool_controllers()
[docs] def get_pool_controller_by_name(self, name): name = name.lower() for ctrl in self.get_pool_controllers(): if ctrl.name.lower() == name or ctrl.full_name.lower() == name: return ctrl
[docs] def add_user_element(self, element, index=None): '''Override the base behavior, so the TriggerGate elements are silently skipped if used multiple times in the group''' user_elements = self._user_elements if element in user_elements: # skipping TriggerGate element if already present if element.get_type() is ElementType.TriggerGate: return return PoolGroupElement.add_user_element(self, element, index)
# ------------------------------------------------------------------------- # configuration # ------------------------------------------------------------------------- def _is_managed_element(self, element): element_type = element.get_type() return (element_type in TYPE_EXP_CHANNEL_ELEMENTS or element_type is ElementType.TriggerGate) """Fills the channel default values for the given channel dictionary""" def _build_channel_defaults(self, channel_data, channel): external_from_name = isinstance(channel, (str, unicode)) ndim = None if external_from_name: name = full_name = source = channel ndim = 0 # TODO: this should somehow verify the dimension else: name = channel.name full_name = channel.full_name source = channel.get_source() ndim = None ctype = channel.get_type() if ctype == ElementType.CTExpChannel: ndim = 0 elif ctype == ElementType.PseudoCounter: ndim = 0 elif ctype == ElementType.ZeroDExpChannel: ndim = 0 elif ctype == ElementType.OneDExpChannel: ndim = 1 elif ctype == ElementType.TwoDExpChannel: ndim = 2 elif ctype == ElementType.External: config = channel.get_config() if config is not None: ndim = int(config.data_format) elif ctype == ElementType.IORegister: ndim = 0 # Definitively should be initialized by measurement group # index MUST be here already (asserting this in the following line) channel_data['index'] = channel_data['index'] channel_data['name'] = channel_data.get('name', name) channel_data['full_name'] = channel_data.get('full_name', full_name) channel_data['source'] = channel_data.get('source', source) channel_data['enabled'] = channel_data.get('enabled', True) channel_data['label'] = channel_data.get('label', channel_data['name']) channel_data['ndim'] = ndim # Probably should be initialized by measurement group channel_data['output'] = channel_data.get('output', True) # Perhaps should NOT be initialized by measurement group channel_data['plot_type'] = channel_data.get('plot_type', PlotType.No) channel_data['plot_axes'] = channel_data.get('plot_axes', []) channel_data['conditioning'] = channel_data.get('conditioning', '') channel_data['normalization'] = channel_data.get( 'normalization', Normalization.No) return channel_data def _build_configuration(self): """Builds a configuration object from the list of elements""" config = {} user_elements = self.get_user_elements() ctrls = self.get_pool_controllers() # find the first CT first_timerable = None for elem in user_elements: if elem.get_type() in TYPE_TIMERABLE_ELEMENTS: first_timerable = elem break if first_timerable is None: raise Exception("It is not possible to construct a measurement " "group without at least one timer able channel " "(Counter/timer, 1D or 2D)") g_timer = g_monitor = first_timerable config['timer'] = g_timer config['monitor'] = g_monitor config['controllers'] = controllers = {} external_user_elements = [] for index, element in enumerate(user_elements): elem_type = element.get_type() if elem_type == ElementType.External: external_user_elements.append((index, element)) continue ctrl = element.controller ctrl_data = controllers.get(ctrl) if ctrl_data is None: controllers[ctrl] = ctrl_data = {} ctrl_data['channels'] = channels = {} if elem_type in TYPE_TIMERABLE_ELEMENTS: elements = ctrls[ctrl] if g_timer in elements: ctrl_data['timer'] = g_timer else: ctrl_data['timer'] = elements[0] if g_monitor in elements: ctrl_data['monitor'] = g_monitor else: ctrl_data['monitor'] = elements[0] ctrl_data['synchronization'] = AcqSynchType.Trigger ctrl_data['synchronizer'] = 'software' self._ctrl_to_acq_synch[ctrl] = AcqSynch.SoftwareTrigger self._channel_to_acq_synch[ element] = AcqSynch.SoftwareTrigger else: channels = ctrl_data['channels'] channels[element] = channel_data = {} channel_data['index'] = user_elements.index(element) channel_data = self._build_channel_defaults(channel_data, element) config['label'] = self.name config['description'] = self.DFT_DESC if len(external_user_elements) > 0: controllers['__tango__'] = ctrl_data = {} ctrl_data['channels'] = channels = {} for index, element in external_user_elements: channels[element] = channel_data = {} channel_data['index'] = index channel_data = self._build_channel_defaults( channel_data, element) return config
[docs] def set_configuration(self, config=None, propagate=1, to_fqdn=True): if config is None: config = self._build_configuration() else: # create a configuration based on a new configuration user_elem_ids = {} tg_elem_ids = [] pool = self.pool for c, c_data in config['controllers'].items(): synchronizer = c_data.get('synchronizer') acq_synch_type = c_data.get('synchronization') software = synchronizer == 'software' external = isinstance(c, (str, unicode)) # only timerable elements are configured with acq_synch acq_synch = None ctrl_enabled = False if not external and c.is_timerable(): acq_synch = AcqSynch.from_synch_type( software, acq_synch_type) for channel_data in c_data['channels'].values(): if external: element = _id = channel_data['full_name'] channel_data['source'] = _id else: full_name = channel_data['full_name'] if to_fqdn: full_name = _to_fqdn(full_name, logger=self) element = pool.get_element_by_full_name(full_name) _id = element.id channel_data = self._build_channel_defaults( channel_data, element) if channel_data["enabled"]: if acq_synch is not None: ctrl_enabled = True self._channel_to_acq_synch[element] = acq_synch if not software: tg_elem_ids.append(synchronizer.id) user_elem_ids[channel_data['index']] = _id if ctrl_enabled: self._ctrl_to_acq_synch[c] = acq_synch # sorted ids may not be consecutive (if a channel is disabled) indexes = sorted(user_elem_ids.keys()) user_elem_ids_list = [user_elem_ids[idx] for idx in indexes] user_elem_ids_list.extend(tg_elem_ids) self.set_user_element_ids(user_elem_ids_list) g_timer, g_monitor = config['timer'], config['monitor'] timer_ctrl_data = config['controllers'][g_timer.controller] if timer_ctrl_data['timer'] != g_timer: self.warning('controller timer and global timer mismatch. ' 'Using global timer') self.debug('For controller %s, timer is defined as channel %s. ' 'The global timer is set to channel %s which belongs ' 'to the same controller', g_timer.controller.name, timer_ctrl_data['timer'].name, g_timer.name) timer_ctrl_data['timer'] = g_timer monitor_ctrl_data = config['controllers'][g_monitor.controller] if monitor_ctrl_data['monitor'] != g_monitor: self.warning('controller monitor and global monitor mismatch. ' 'Using global monitor') self.debug('For controller %s, monitor is defined as channel %s. ' 'The global timer is set to channel %s which belongs ' 'to the same controller', g_monitor.controller.name, monitor_ctrl_data['monitor'].name, g_monitor.name) monitor_ctrl_data['monitor'] != g_monitor self._config = config self._config_dirty = True if not propagate: return self.fire_event(EventType("configuration", priority=propagate), config)
[docs] def set_configuration_from_user(self, cfg, propagate=1, to_fqdn=True): config = {} user_elements = self.get_user_elements() pool = self.pool timer_name = cfg.get('timer', user_elements[0].full_name) monitor_name = cfg.get('monitor', user_elements[0].full_name) if to_fqdn: timer_name = _to_fqdn(timer_name, logger=self) config['timer'] = pool.get_element_by_full_name(timer_name) if to_fqdn: monitor_name = _to_fqdn(monitor_name, logger=self) config['monitor'] = pool.get_element_by_full_name(monitor_name) config['controllers'] = controllers = {} for c_name, c_data in cfg['controllers'].items(): # backwards compatibility for measurement groups created before # implementing feature-372: # https://sourceforge.net/p/sardana/tickets/372/ # WARNING: this is one direction backwards compatibility - it just # reads channels from the units, but does not write channels to the # units back if 'units' in c_data: c_data = c_data['units']['0'] # discard controllers which don't have items (garbage) ch_count = len(c_data['channels']) if ch_count == 0: continue external = c_name.startswith('__') if external: ctrl = c_name else: if to_fqdn: c_name = _to_fqdn(c_name, logger=self) ctrl = pool.get_element_by_full_name(c_name) assert ctrl.get_type() == ElementType.Controller controllers[ctrl] = ctrl_data = {} # exclude external and not timerable elements if not external and ctrl.is_timerable(): timer_name = c_data['timer'] if to_fqdn: timer_name = _to_fqdn(timer_name, logger=self) timer = pool.get_element_by_full_name(timer_name) ctrl_data['timer'] = timer monitor_name = c_data['monitor'] if to_fqdn: monitor_name = _to_fqdn(monitor_name, logger=self) monitor = pool.get_element_by_full_name(monitor_name) ctrl_data['monitor'] = monitor synchronizer = c_data.get('synchronizer') # for backwards compatibility purposes # protect measurement groups without synchronizer defined if synchronizer is None: synchronizer = 'software' elif synchronizer != 'software': if to_fqdn: synchronizer = _to_fqdn(synchronizer, logger=self) synchronizer = pool.get_element_by_full_name(synchronizer) ctrl_data['synchronizer'] = synchronizer try: synchronization = c_data['synchronization'] except KeyError: # backwards compatibility for configurations before SEP6 synchronization = c_data['trigger_type'] msg = ("trigger_type configuration parameter is deprecated" " in favor of synchronization. Re-apply " "configuration in order to upgrade.") self.warning(msg) ctrl_data['synchronization'] = synchronization ctrl_data['channels'] = channels = {} for ch_name, ch_data in c_data['channels'].items(): if external: validator = TangoAttributeNameValidator() params = validator.getParams(ch_data['full_name']) params['pool'] = self.pool channel = PoolExternalObject(**params) else: if to_fqdn: ch_name = _to_fqdn(ch_name, logger=self) channel = pool.get_element_by_full_name(ch_name) channels[channel] = dict(ch_data) config['label'] = cfg.get('label', self.name) config['description'] = cfg.get('description', self.DFT_DESC) self.set_configuration(config, propagate=propagate, to_fqdn=to_fqdn)
[docs] def get_configuration(self): return self._config
[docs] def get_user_configuration(self): cfg = self.get_configuration() config = {} config['timer'] = cfg['timer'].full_name config['monitor'] = cfg['monitor'].full_name config['controllers'] = controllers = {} for c, c_data in cfg['controllers'].items(): ctrl_name = c if not isinstance(c, (str, unicode)): ctrl_name = c.full_name external = ctrl_name.startswith('__') controllers[ctrl_name] = ctrl_data = {} if not external and c.is_timerable(): if 'timer' in c_data: ctrl_data['timer'] = c_data['timer'].full_name if 'monitor' in c_data: ctrl_data['monitor'] = c_data['monitor'].full_name if 'synchronizer' in c_data: synchronizer = c_data['synchronizer'] if synchronizer != 'software': synchronizer = synchronizer.full_name ctrl_data['synchronizer'] = synchronizer if 'synchronization' in c_data: ctrl_data['synchronization'] = c_data['synchronization'] ctrl_data['channels'] = channels = {} for ch, ch_data in c_data['channels'].items(): channels[ch.full_name] = dict(ch_data) config['label'] = cfg['label'] config['description'] = cfg['description'] return config
[docs] def load_configuration(self, force=False): """Loads the current configuration to all involved controllers""" cfg = self.get_configuration() # g_timer, g_monitor = cfg['timer'], cfg['monitor'] for ctrl, ctrl_data in cfg['controllers'].items(): # skip external channels if isinstance(ctrl, str): continue # telling controller in which acquisition mode it will participate if not ctrl.is_online(): continue ctrl.set_ctrl_par('acquisition_mode', self.acquisition_mode) # @TODO: fix optimization and enable it again if ctrl.operator == self and not force and not self._config_dirty: continue ctrl.operator = self if ctrl.is_timerable(): # if ctrl == g_timer.controller: # ctrl.set_ctrl_par('timer', g_timer.axis) # if ctrl == g_monitor.controller: # ctrl.set_ctrl_par('monitor', g_monitor.axis) ctrl.set_ctrl_par('timer', ctrl_data['timer'].axis) ctrl.set_ctrl_par('monitor', ctrl_data['monitor'].axis) synchronization = self._ctrl_to_acq_synch.get(ctrl) self.debug('load_configuration: setting trigger_type: %s ' 'to ctrl: %s' % (synchronization, ctrl)) ctrl.set_ctrl_par('synchronization', synchronization) self._config_dirty = False
[docs] def get_timer(self): return self.get_configuration()['timer']
timer = property(get_timer) # ------------------------------------------------------------------------- # integration time # -------------------------------------------------------------------------
[docs] def get_integration_time(self): if len(self._synchronization) == 0: raise Exception("The synchronization group has not been" " initialized") elif len(self._synchronization) > 1: raise Exception("There are more than one synchronization groups") else: return self._synchronization[0][SynchParam.Active][ SynchDomain.Time]
[docs] def set_integration_time(self, integration_time, propagate=1): total_time = integration_time + self.latency_time synch = [{SynchParam.Delay: {SynchDomain.Time: 0}, SynchParam.Active: {SynchDomain.Time: integration_time}, SynchParam.Total: {SynchDomain.Time: total_time}, SynchParam.Repeats: 1}] self.set_synchronization(synch) self._integration_time = integration_time if not propagate: return self.fire_event(EventType("integration_time", priority=propagate), integration_time)
integration_time = property(get_integration_time, set_integration_time, doc="the current integration time") # ------------------------------------------------------------------------- # monitor count # -------------------------------------------------------------------------
[docs] def get_monitor_count(self): return self._monitor_count
[docs] def set_monitor_count(self, monitor_count, propagate=1): self._monitor_count = monitor_count if not propagate: return self.fire_event(EventType("monitor_count", priority=propagate), monitor_count)
monitor_count = property(get_monitor_count, set_monitor_count, doc="the current monitor count") # ------------------------------------------------------------------------- # acquisition mode # -------------------------------------------------------------------------
[docs] def get_acquisition_mode(self): return self._acquisition_mode
[docs] def set_acquisition_mode(self, acquisition_mode, propagate=1): self._acquisition_mode = acquisition_mode self._config_dirty = True # acquisition mode goes to configuration if not propagate: return self.fire_event(EventType("acquisition_mode", priority=propagate), acquisition_mode)
acquisition_mode = property(get_acquisition_mode, set_acquisition_mode, doc="the current acquisition mode") # ------------------------------------------------------------------------- # synchronization # -------------------------------------------------------------------------
[docs] def get_synchronization(self): return self._synchronization
[docs] def set_synchronization(self, synchronization, propagate=1): self._synchronization = synchronization self._config_dirty = True # acquisition mode goes to configuration if not propagate: return self.fire_event(EventType("synchronization", priority=propagate), synchronization)
synchronization = property(get_synchronization, set_synchronization, doc="the current acquisition mode") # ------------------------------------------------------------------------- # moveable # -------------------------------------------------------------------------
[docs] def get_moveable(self): return self._moveable
[docs] def set_moveable(self, moveable, propagate=1, to_fqdn=True): self._moveable = moveable if self._moveable != 'None' and self._moveable is not None: if to_fqdn: moveable = _to_fqdn(moveable, logger=self) self._moveable_obj = self.pool.get_element_by_full_name(moveable) self.fire_event(EventType("moveable", priority=propagate), moveable)
moveable = property(get_moveable, set_moveable, doc="moveable source used in synchronization") # ------------------------------------------------------------------------- # latency time # -------------------------------------------------------------------------
[docs] def get_latency_time(self): latency_time = 0 pool_ctrls = self.acquisition.get_pool_controllers() for pool_ctrl in pool_ctrls: if not pool_ctrl.is_timerable(): continue candidate = pool_ctrl.get_ctrl_par("latency_time") if candidate > latency_time: latency_time = candidate return latency_time
latency_time = property(get_latency_time, doc="latency time between two consecutive " "acquisitions") # ------------------------------------------------------------------------- # acquisition # -------------------------------------------------------------------------
[docs] def start_acquisition(self, value=None, multiple=1): self._aborted = False if not self._simulation_mode: # load configuration into controller(s) if necessary self.load_configuration() # determining the acquisition parameters kwargs = dict(head=self, config=self._config, multiple=multiple) acquisition_mode = self.acquisition_mode if acquisition_mode is AcqMode.Timer: kwargs['integ_time'] = self.get_integration_time() elif acquisition_mode is AcqMode.Monitor: kwargs['monitor'] = self._monitor kwargs['synchronization'] = self._synchronization kwargs['moveable'] = self._moveable_obj # start acquisition self.acquisition.run(**kwargs)
[docs] def set_acquisition(self, acq_cache): self.set_action_cache(acq_cache)
[docs] def get_acquisition(self): return self.get_action_cache()
acquisition = property(get_acquisition, doc="acquisition object")
[docs] def stop(self): self.acquisition._synch._synch_soft.stop() PoolGroupElement.stop(self)