Source code for sardana.pool.poolacquisition

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool libray. It defines the class for an
acquisition"""

__all__ = ["AcquisitionState", "AcquisitionMap", "PoolCTAcquisition",
           "Pool0DAcquisition", "Channel", "PoolIORAcquisition"]

__docformat__ = 'restructuredtext'

import time
import datetime

from taurus.core.util.log import DebugIt
from taurus.core.util.enumeration import Enumeration

from sardana import SardanaValue, State, ElementType, TYPE_TIMERABLE_ELEMENTS
from sardana.sardanathreadpool import get_thread_pool
from sardana.pool import SynchParam, SynchDomain, AcqSynch
from sardana.pool.poolaction import ActionContext, PoolActionItem, PoolAction
from sardana.pool.poolsynchronization import PoolSynchronization

#: enumeration representing possible motion states
AcquisitionState = Enumeration("AcquisitionState", (
    "Stopped",
    #    "StoppedOnError",
    #    "StoppedOnAbort",
    "Acquiring",
    "Invalid"))

AS = AcquisitionState
AcquiringStates = AS.Acquiring,
StoppedStates = AS.Stopped,  # MS.StoppedOnError, MS.StoppedOnAbort

AcquisitionMap = {
    # AS.Stopped           : State.On,
    AS.Acquiring: State.Moving,
    AS.Invalid: State.Invalid,
}


def split_MGConfigurations(mg_cfg_in):
    """Split MeasurementGroup configuration with channels
    triggered by SW Trigger and channels triggered by HW trigger

    TODO: (technical debt) All the MeasurementGroup configuration
    logic should be encapsulate in a dedicated class instead of
    using a basic data structures like dict or lists...
    """
    ctrls_in = mg_cfg_in['controllers']
    mg_sw_cfg_out = {}
    mg_0d_cfg_out = {}
    mg_hw_cfg_out = {}
    mg_sw_cfg_out['controllers'] = ctrls_sw_out = {}
    mg_0d_cfg_out['controllers'] = ctrls_0d_out = {}
    mg_hw_cfg_out['controllers'] = ctrls_hw_out = {}
    for ctrl, ctrl_info in ctrls_in.items():
        external = isinstance(ctrl, str) and ctrl.startswith('__')
        # skipping external controllers e.g. Tango attributes
        if external:
            continue
        # splitting ZeroD based on the type
        if ctrl.get_ctrl_types()[0] == ElementType.ZeroDExpChannel:
            ctrls_0d_out[ctrl] = ctrl_info
        # ignoring PseudoCounter
        elif ctrl.get_ctrl_types()[0] == ElementType.PseudoCounter:
            pass
        # splitting rest of the channels based on the assigned trigger
        else:
            synchronizer = ctrl_info.get('synchronizer')
            if synchronizer is None or synchronizer == 'software':
                ctrls_sw_out[ctrl] = ctrl_info
            else:
                ctrls_hw_out[ctrl] = ctrl_info

    def find_master(ctrls, role):
        master_idx = float("+inf")
        master = None
        for ctrl_info in ctrls.values():
            element = ctrl_info[role]
            element_idx = ctrl_info["channels"][element]["index"]
            element_enabled = ctrl_info["channels"][element]["enabled"]
            # Find master only if is enabled
            if element_idx < master_idx and element_enabled:
                master = element
                master_idx = element_idx
        return master

    if len(ctrls_sw_out):
        mg_sw_cfg_out["timer"] = find_master(ctrls_sw_out, "timer")
        mg_sw_cfg_out["monitor"] = find_master(ctrls_sw_out, "monitor")
    if len(ctrls_hw_out):
        mg_hw_cfg_out["timer"] = find_master(ctrls_hw_out, "timer")
        mg_hw_cfg_out["monitor"] = find_master(ctrls_hw_out, "monitor")
    return (mg_hw_cfg_out, mg_sw_cfg_out, mg_0d_cfg_out)


def getTGConfiguration(MGcfg):
    '''Build TG configuration from complete MG configuration.

    TODO: (technical debt) All the MeasurementGroup configuration
    logic should be encapsulate in a dedicated class instead of
    using a basic data structures like dict or lists...

    :param MGcfg: configuration dictionary of the whole Measurement Group.
    :type MGcfg: dict<>
    :return: a configuration dictionary of TG elements organized by controller
    :rtype: dict<>
    '''

    # Create list with not repeated elements
    _tg_element_list = []

    for ctrl in MGcfg["controllers"]:
        tg_element = MGcfg["controllers"][ctrl].get('synchronizer', None)
        if (tg_element is not None and
                tg_element != "software" and
                tg_element not in _tg_element_list):
            _tg_element_list.append(tg_element)

    # Intermediate dictionary to organize each ctrl with its elements.
    ctrl_tgelem_dict = {}
    for tgelem in _tg_element_list:
        tg_ctrl = tgelem.get_controller()
        if tg_ctrl not in ctrl_tgelem_dict.keys():
            ctrl_tgelem_dict[tg_ctrl] = [tgelem]
        else:
            ctrl_tgelem_dict[tg_ctrl].append(tgelem)

    # Build TG configuration dictionary.
    TGcfg = {}
    TGcfg['controllers'] = {}

    for ctrl in ctrl_tgelem_dict:
        TGcfg['controllers'][ctrl] = ctrls = {}
        ctrls['channels'] = {}
        for tg_elem in ctrl_tgelem_dict[ctrl]:
            ch = ctrls['channels'][tg_elem] = {}
            ch['full_name'] = tg_elem.full_name
    # TODO: temporary returning tg_elements
    return TGcfg, _tg_element_list


def extract_integ_time(synchronization):
    """Extract integration time(s) from synchronization dict. If there is only
    one group in the synchronization than returns float with the integration
    time. Otherwise a list of floats with different integration times.

    TODO: (technical debt) All the MeasurementGroup synchronization
    logic should be encapsulate in a dedicated class instead of
    using a basic data structures like dict or lists...

    :param synchronization: group(s) where each group is described by
        SynchParam(s)
    :type synchronization: list(dict)
    :return list(float) or float
    """
    if len(synchronization) == 1:
        integ_time = synchronization[0][SynchParam.Active][SynchDomain.Time]
    else:
        integ_time = []
        for group in synchronization:
            active_time = group[SynchParam.Active][SynchDomain.Time]
            repeats = group[SynchParam.Repeats]
            integ_time += [active_time] * repeats
    return integ_time


def extract_repetitions(synchronization):
    """Extract repetitions from synchronization dict.

    TODO: (technical debt) All the MeasurementGroup synchronization
    logic should be encapsulate in a dedicated class instead of
    using a basic data structures like dict or lists...

    :param synchronization: group(s) where each group is described by
        SynchParam(s)
    :type synchronization: list(dict)
    :return: number of repetitions
    :rtype: int
    """
    repetitions = 0
    for group in synchronization:
        repetitions += group[SynchParam.Repeats]
    return repetitions


def is_value_error(value):
    if isinstance(value, SardanaValue) and value.error:
        return True
    return False


class PoolAcquisition(PoolAction):

    def __init__(self, main_element, name="Acquisition"):
        PoolAction.__init__(self, main_element, name)
        zerodname = name + ".0DAcquisition"
        hwname = name + ".HardwareAcquisition"
        swname = name + ".SoftwareAcquisition"
        synchname = name + ".Synchronization"

        self._sw_acq_config = None
        self._0d_config = None
        self._0d_acq = Pool0DAcquisition(main_element, name=zerodname)
        self._sw_acq = PoolAcquisitionSoftware(main_element, name=swname)
        self._hw_acq = PoolAcquisitionHardware(main_element, name=hwname)
        self._synch = PoolSynchronization(main_element, name=synchname)

    def set_sw_config(self, config):
        self._sw_acq_config = config

    def set_0d_config(self, config):
        self._0d_config = config

    def event_received(self, *args, **kwargs):
        timestamp = time.time()
        _, type_, value = args
        name = type_.name
        if name == "state":
            return
        t_fmt = '%Y-%m-%d %H:%M:%S.%f'
        t_str = datetime.datetime.fromtimestamp(timestamp).strftime(t_fmt)
        msg = '%s event with id: %d received at: %s' % (name, value, t_str)
        self.debug(msg)
        if name == "active":
            # this code is not thread safe, but for the moment we assume that
            # only one EventGenerator will work at the same time
            if self._sw_acq_config:
                if self._sw_acq._is_started() or self._sw_acq.is_running():
                    msg = ('Skipping trigger: software acquisition is still'
                           ' in progress.')
                    self.debug(msg)
                    return
                else:
                    self.debug('Executing software acquisition.')
                    args = ()
                    kwargs = self._sw_acq_config
                    kwargs['synch'] = True
                    kwargs['idx'] = value
                    self._sw_acq._started = True
                    get_thread_pool().add(self._sw_acq.run, *args, **kwargs)
            if self._0d_config:
                if self._0d_acq._is_started() or self._0d_acq.is_running():
                    msg = ('Skipping trigger: ZeroD acquisition is still in'
                           ' progress.')
                    self.debug(msg)
                    return
                else:
                    self.debug('Executing ZeroD acquisition.')
                    args = ()
                    kwargs = self._0d_config
                    kwargs['synch'] = True
                    kwargs['idx'] = value
                    self._0d_acq._started = True
                    self._0d_acq._stopped = False
                    self._0d_acq._aborted = False
                    get_thread_pool().add(self._0d_acq.run, *args, **kwargs)
        elif name == "passive":
            if self._0d_config and (self._0d_acq._is_started() or
                                    self._0d_acq.is_running()):
                self.debug('Stopping ZeroD acquisition.')
                self._0d_acq.stop_action()

    def is_running(self):
        return self._0d_acq.is_running() or\
            self._sw_acq.is_running() or\
            self._hw_acq.is_running() or\
            self._synch.is_running()

    def run(self, *args, **kwargs):
        for elem in self.get_elements():
            elem.put_state(None)
            # TODO: temporarily clear value buffers at the beginning of the
            # acquisition instead of doing it in the finish hook of each
            # acquisition sub-actions. See extensive explanation in the
            # constructor of PoolAcquisitionBase.
            try:
                elem.clear_value_buffer()
            except AttributeError:
                continue
            # clean also the pseudo counters, even the ones that do not
            # participate directly in the acquisition
            for pseudo_elem in elem.get_pseudo_elements():
                pseudo_elem.clear_value_buffer()
        config = kwargs['config']
        synchronization = kwargs["synchronization"]
        integ_time = extract_integ_time(synchronization)
        repetitions = extract_repetitions(synchronization)
        # TODO: this code splits the global mg configuration into
        # experimental channels triggered by hw and experimental channels
        # triggered by sw. Refactor it!!!!
        (hw_acq_cfg, sw_acq_cfg, zerod_acq_cfg) = split_MGConfigurations(
            config)
        synch_cfg, _ = getTGConfiguration(config)
        # starting continuous acquisition only if there are any controllers
        if len(hw_acq_cfg['controllers']):
            cont_acq_kwargs = dict(kwargs)
            cont_acq_kwargs['config'] = hw_acq_cfg
            cont_acq_kwargs['integ_time'] = integ_time
            cont_acq_kwargs['repetitions'] = repetitions
            self._hw_acq.run(*args, **cont_acq_kwargs)
        if len(sw_acq_cfg['controllers']) or len(zerod_acq_cfg['controllers']):
            self._synch.add_listener(self)
            if len(sw_acq_cfg['controllers']):
                sw_acq_kwargs = dict(kwargs)
                sw_acq_kwargs['config'] = sw_acq_cfg
                sw_acq_kwargs['integ_time'] = integ_time
                sw_acq_kwargs['repetitions'] = 1
                self.set_sw_config(sw_acq_kwargs)
            if len(zerod_acq_cfg['controllers']):
                zerod_acq_kwargs = dict(kwargs)
                zerod_acq_kwargs['config'] = zerod_acq_cfg
                self.set_0d_config(zerod_acq_kwargs)
        synch_kwargs = dict(kwargs)
        synch_kwargs['config'] = synch_cfg
        self._synch.run(*args, **synch_kwargs)

    def _get_action_for_element(self, element):
        elem_type = element.get_type()
        if elem_type in TYPE_TIMERABLE_ELEMENTS:
            main_element = self.main_element
            channel_to_acq_synch = main_element._channel_to_acq_synch
            acq_synch = channel_to_acq_synch.get(element)
            if acq_synch in (AcqSynch.SoftwareTrigger,
                             AcqSynch.SoftwareGate):
                return self._sw_acq
            elif acq_synch in (AcqSynch.HardwareTrigger,
                               AcqSynch.HardwareGate):
                return self._hw_acq
            else:
                # by default software synchronization is in use
                return self._sw_acq
        elif elem_type == ElementType.ZeroDExpChannel:
            return self._0d_acq
        elif elem_type == ElementType.TriggerGate:
            return self._synch
        else:
            raise RuntimeError("Could not determine action for element %s" %
                               element)

    def clear_elements(self):
        """Clears all elements from this action"""

    def add_element(self, element):
        """Adds a new element to this action.

        :param element: the new element to be added
        :type element: sardana.pool.poolelement.PoolElement"""
        action = self._get_action_for_element(element)
        action.add_element(element)

    def remove_element(self, element):
        """Removes an element from this action. If the element is not part of
        this action, a ValueError is raised.

        :param element: the new element to be removed
        :type element: sardana.pool.poolelement.PoolElement

        :raises: ValueError"""
        for action in self._get_acq_for_element(element):
            action.remove_element(element)

    def get_elements(self, copy_of=False):
        """Returns a sequence of all elements involved in this action.

        :param copy_of: If False (default) the internal container of
                        elements is returned. If True, a copy of the
                        internal container is returned instead
        :type copy_of: bool
        :return: a sequence of all elements involved in this action.
        :rtype: seq<sardana.pool.poolelement.PoolElement>"""
        return (self._hw_acq.get_elements() + self._sw_acq.get_elements() +
                self._0d_acq.get_elements() + self._synch.get_elements())

    def get_pool_controller_list(self):
        """Returns a list of all controller elements involved in this action.

        :return: a list of all controller elements involved in this action.
        :rtype: list<sardana.pool.poolelement.PoolController>"""
        return self._pool_ctrl_list

    def get_pool_controllers(self):
        """Returns a dict of all controller elements involved in this action.

        :return: a dict of all controller elements involved in this action.
        :rtype: dict<sardana.pool.poolelement.PoolController,
                seq<sardana.pool.poolelement.PoolElement>>"""
        ret = {}
        ret.update(self._hw_acq.get_pool_controllers())
        ret.update(self._sw_acq.get_pool_controllers())
        ret.update(self._0d_acq.get_pool_controllers())
        return ret

    def read_value(self, ret=None, serial=False):
        """Reads value information of all elements involved in this action

        :param ret: output map parameter that should be filled with value
                    information. If None is given (default), a new map is
                    created an returned
        :type ret: dict
        :param serial: If False (default) perform controller HW value requests
                       in parallel. If True, access is serialized.
        :type serial: bool
        :return: a map containing value information per element
        :rtype: dict<:class:~`sardana.pool.poolelement.PoolElement`,
                     :class:~`sardana.sardanavalue.SardanaValue`>"""
        # TODO: this is broken now - fix it
        ret = self._ct_acq.read_value(ret=ret, serial=serial)
        ret.update(self._0d_acq.read_value(ret=ret, serial=serial))
        return ret


class Channel(PoolActionItem):

    def __init__(self, acquirable, info=None):
        PoolActionItem.__init__(self, acquirable)
        if info:
            self.__dict__.update(info)

    def __getattr__(self, name):
        return getattr(self.element, name)


class PoolAcquisitionBase(PoolAction):
    """Base class for acquisitions with a generic start_action method.

    .. note::
        The PoolAcquisitionBase class has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the module) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, main_element, name):
        PoolAction.__init__(self, main_element, name)
        self._channels = None
        # TODO: for the moment we can not clear value buffers at the end of
        # the acquisition. This is because of the pseudo counters that are
        # based on channels synchronized by hardware and software.
        # These two acquisition actions finish at different moment so the
        # pseudo counter will loose the value buffer of some of its physicals
        # if we clear the buffer at the end.
        # Whenever there will be solution for that, after refactoring of the
        # acquisition actions, uncomment this line
        # self.add_finish_hook(self.clear_value_buffers, True)

    def in_acquisition(self, states):
        """Determines if we are in acquisition or if the acquisition has ended
        based on the current unit trigger modes and states returned by the
        controller(s)

        :param states: a map containing state information as returned by
                       read_state_info
        :type states: dict<PoolElement, State>
        :return: returns True if in acquisition or False otherwise
        :rtype: bool"""
        for elem in states:
            s = states[elem][0][0]
            if self._is_in_action(s):
                return True

    @DebugIt()
    def start_action(self, *args, **kwargs):
        """Prepares everything for acquisition and starts it.
        :param acq_sleep_time: sleep time between state queries
        :param nb_states_per_value: how many state queries between readouts
        :param integ_time: integration time(s)
        :type integ_time: float or seq<float>
        :param repetitions: repetitions
        :type repetitions: int
        :param config: configuration dictionary (with information about
            involved controllers and channels)
        """
        pool = self.pool

        self._aborted = False
        self._stopped = False

        self._acq_sleep_time = kwargs.pop("acq_sleep_time",
                                          pool.acq_loop_sleep_time)
        self._nb_states_per_value = kwargs.pop("nb_states_per_value",
                                               pool.acq_loop_states_per_value)

        self._integ_time = integ_time = kwargs.get("integ_time")
        self._mon_count = mon_count = kwargs.get("monitor_count")
        self._repetitions = repetitions = kwargs.get("repetitions")
        if integ_time is None and mon_count is None:
            raise Exception("must give integration time or monitor counts")
        if integ_time is not None and mon_count is not None:
            msg = ("must give either integration time or monitor counts "
                   "(not both)")
            raise Exception(msg)

        _ = kwargs.get("items", self.get_elements())
        cfg = kwargs['config']
        # determine which is the controller which holds the master channel

        if integ_time is not None:
            master_key = 'timer'
            master_value = integ_time
        if mon_count is not None:
            master_key = 'monitor'
            master_value = -mon_count
        master = cfg[master_key]
        if master is None:
            self.main_element.set_state(State.Fault, propagate=2)
            msg = "master {0} is unknown (probably disabled)".format(
                master_key)
            raise RuntimeError(msg)
        master_ctrl = master.controller

        pool_ctrls_dict = dict(cfg['controllers'])
        pool_ctrls_dict.pop('__tango__', None)

        # controllers to be started (only enabled) in the right order
        pool_ctrls = []
        # controllers that will be read at the end of the action
        self._pool_ctrl_dict_loop = _pool_ctrl_dict_loop = {}
        # channels that are acquired (only enabled)
        self._channels = channels = {}

        # select only suitable e.g. enabled, timerable controllers & channels
        for ctrl, pool_ctrl_data in pool_ctrls_dict.items():
            # skip not timerable controllers e.g. 0D
            if not ctrl.is_timerable():
                continue
            ctrl_enabled = False
            elements = pool_ctrl_data['channels']
            for element, element_info in elements.items():
                # skip disabled elements
                if not element_info['enabled']:
                    continue
                # Add only the enabled channels
                channel = Channel(element, info=element_info)
                channels[element] = channel
                ctrl_enabled = True
            # check if the ctrl has enabled channels
            if ctrl_enabled:
                # enabled controller can no be offline
                if not ctrl.is_online():
                    self.main_element.set_state(State.Fault, propagate=2)
                    msg = "controller {0} is offline".format(ctrl.name)
                    raise RuntimeError(msg)
                pool_ctrls.append(ctrl)
                # only CT will be read in the loop, 1D and 2D not
                if ElementType.CTExpChannel in ctrl.get_ctrl_types():
                    _pool_ctrl_dict_loop[ctrl] = pool_ctrl_data

        # timer/monitor channels can not be disabled
        for pool_ctrl in pool_ctrls:
            ctrl = pool_ctrl.ctrl
            pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
            timer_monitor = pool_ctrl_data[master_key]
            if timer_monitor not in channels:
                self.main_element.set_state(State.Fault, propagate=2)
                msg = "timer/monitor ({0}) of {1} controller is "\
                      "disabled)".format(timer_monitor.name, pool_ctrl.name)
                raise RuntimeError(msg)

        # make sure the controller which has the master channel is the last to
        # be called
        pool_ctrls.remove(master_ctrl)
        pool_ctrls.append(master_ctrl)

        with ActionContext(self):
            # PreLoadAll, PreLoadOne, LoadOne and LoadAll
            for pool_ctrl in pool_ctrls:
                try:
                    ctrl = pool_ctrl.ctrl
                    pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
                    ctrl.PreLoadAll()
                    master = pool_ctrl_data[master_key]
                    axis = master.axis
                    try:
                        res = ctrl.PreLoadOne(axis, master_value, repetitions)
                    except TypeError:
                        msg = ("PreLoadOne(axis, value) is deprecated since "
                               "version 2.3.0. Use PreLoadOne(axis, value, "
                               "repetitions) instead.")
                        self.warning(msg)
                        res = ctrl.PreLoadOne(axis, master_value)
                    if not res:
                        msg = ("%s.PreLoadOne(%d) returned False" %
                               (pool_ctrl.name, axis))
                        raise Exception(msg)
                    try:
                        ctrl.LoadOne(axis, master_value, repetitions)
                    except TypeError:
                        msg = ("LoadOne(axis, value) is deprecated since "
                               "version 2.3.0. Use LoadOne(axis, value, "
                               "repetitions) instead.")
                        self.warning(msg)
                        ctrl.LoadOne(axis, master_value)
                    ctrl.LoadAll()
                except Exception, e:
                    self.debug(e, exc_info=True)
                    master.set_state(State.Fault, propagate=2)
                    msg = ("Load sequence of %s failed" % pool_ctrl.name)
                    raise Exception(msg)

            # PreStartAll on all enabled controllers
            for pool_ctrl in pool_ctrls:
                pool_ctrl.ctrl.PreStartAll()

            # PreStartOne & StartOne on all enabled elements
            for pool_ctrl in pool_ctrls:
                ctrl = pool_ctrl.ctrl
                pool_ctrl_data = pool_ctrls_dict[pool_ctrl]
                elements = pool_ctrl_data['channels'].keys()
                timer_monitor = pool_ctrl_data[master_key]
                # make sure that the timer/monitor is started as the last one
                elements.remove(timer_monitor)
                elements.append(timer_monitor)
                for element in elements:
                    try:
                        channel = channels[element]
                    except KeyError:
                        continue
                    axis = element.axis
                    ret = ctrl.PreStartOne(axis, master_value)
                    if not ret:
                        msg = ("%s.PreStartOne(%d) returns False" %
                               (pool_ctrl.name, axis))
                        raise Exception(msg)
                    try:
                        ctrl.StartOne(axis, master_value)
                    except Exception, e:
                        self.debug(e, exc_info=True)
                        element.set_state(State.Fault, propagate=2)
                        msg = ("%s.StartOne(%d) failed" %
                               (pool_ctrl.name, axis))
                        raise Exception(msg)

            # set the state of all elements to  and inform their listeners
            for channel in channels:
                channel.set_state(State.Moving, propagate=2)

            # StartAll on all enabled controllers
            for pool_ctrl in pool_ctrls:
                try:
                    pool_ctrl.ctrl.StartAll()
                except Exception, e:
                    self.debug(e, exc_info=True)
                    elements = pool_ctrl_data['channels'].keys()
                    for element in elements:
                        element.set_state(State.Fault, propagate=2)
                    msg = ("%s.StartAll() failed" % pool_ctrl.name)
                    raise Exception(msg)

    def clear_value_buffers(self):
        for channel in self._channels:
            channel.clear_value_buffer()


class PoolAcquisitionHardware(PoolAcquisitionBase):
    """Acquisition action for controllers synchronized by hardware

    .. note::
        The PoolAcquisitionHardware class has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the module) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, main_element, name="AcquisitionHardware"):
        PoolAcquisitionBase.__init__(self, main_element, name)

    @DebugIt()
    def action_loop(self):
        i = 0

        states, values = {}, {}
        for element in self._channels:
            states[element] = None
            values[element] = None

        nap = self._acq_sleep_time
        nb_states_per_value = self._nb_states_per_value

        while True:
            self.read_state_info(ret=states)
            if not self.in_acquisition(states):
                break

            # read value every n times
            if not i % nb_states_per_value:
                self.read_value_loop(ret=values)
                for acquirable, value in values.items():
                    if is_value_error(value):
                        self.error("Loop read value error for %s" %
                                   acquirable.name)
                        acquirable.put_value(value)
                    else:
                        acquirable.extend_value_buffer(value)

            time.sleep(nap)
            i += 1

        with ActionContext(self):
            self.raw_read_state_info(ret=states)
            self.raw_read_value_loop(ret=values)

        for acquirable, state_info in states.items():
            # first update the element state so that value calculation
            # that is done after takes the updated state into account
            acquirable.set_state_info(state_info, propagate=0)
            if acquirable in values:
                value = values[acquirable]
                if is_value_error(value):
                    self.error("Loop final read value error for: %s" %
                               acquirable.name)
                    acquirable.put_value(value)
                else:
                    acquirable.extend_value_buffer(value, propagate=2)
            with acquirable:
                acquirable.clear_operation()
                state_info = acquirable._from_ctrl_state_info(state_info)
                acquirable.set_state_info(state_info, propagate=2)


class PoolAcquisitionSoftware(PoolAcquisitionBase):
    """Acquisition action for controllers synchronized by software

    .. note::
        The PoolAcquisitionSoftware class has been included in Sardana
        on a provisional basis. Backwards incompatible changes
        (up to and including removal of the module) may occur if
        deemed necessary by the core developers.
    """

    def __init__(self, main_element, name="AcquisitionSoftware", slaves=None):
        PoolAcquisitionBase.__init__(self, main_element, name)

        if slaves is None:
            slaves = ()
        self._slaves = slaves

    @DebugIt()
    def start_action(self, *args, **kwargs):
        """Prepares everything for acquisition and starts it.
        :param acq_sleep_time: sleep time between state queries
        :param nb_states_per_value: how many state queries between readouts
        :param integ_time: integration time(s)
        :type integ_time: float or seq<float>
        :param repetitions: repetitions
        :type repetitions: int
        :param config: configuration dictionary (with information about
            involved controllers and channels)
        :param index: trigger index that will be assigned to the acquired value
        :type index: int
        """

        PoolAcquisitionBase.start_action(self, *args, **kwargs)
        self.index = kwargs.get("idx")

    @DebugIt()
    def action_loop(self):
        states, values = {}, {}
        for element in self._channels:
            states[element] = None
            values[element] = None

        nap = self._acq_sleep_time
        nb_states_per_value = self._nb_states_per_value

        i = 0
        while True:
            self.read_state_info(ret=states)
            if not self.in_acquisition(states):
                break

            # read value every n times
            if not i % nb_states_per_value:
                self.read_value_loop(ret=values)
                for acquirable, value in values.items():
                    acquirable.put_value(value)

            time.sleep(nap)
            i += 1

        for slave in self._slaves:
            try:
                slave.stop_action()
            except Exception:
                self.warning("Unable to stop slave acquisition %s",
                             slave.getLogName())
                self.debug("Details", exc_info=1)

        with ActionContext(self):
            self.raw_read_state_info(ret=states)
            self.raw_read_value_loop(ret=values)

        for acquirable, state_info in states.items():
            # first update the element state so that value calculation
            # that is done after takes the updated state into account
            acquirable.set_state_info(state_info, propagate=0)
            if acquirable in values:
                value = values[acquirable]
                if is_value_error(value):
                    self.error("Loop final read value error for: %s" %
                               acquirable.name)
                acquirable.append_value_buffer(value, self.index)
            with acquirable:
                acquirable.clear_operation()
                state_info = acquirable._from_ctrl_state_info(state_info)
                acquirable.set_state_info(state_info, propagate=2)


[docs]class PoolCTAcquisition(PoolAcquisitionBase): def __init__(self, main_element, name="CTAcquisition", slaves=None): self._channels = None if slaves is None: slaves = () self._slaves = slaves PoolAcquisitionBase.__init__(self, main_element, name)
[docs] def get_read_value_loop_ctrls(self): return self._pool_ctrl_dict_loop
[docs] def in_acquisition(self, states): """Determines if we are in acquisition or if the acquisition has ended based on the current unit trigger modes and states returned by the controller(s) :param states: a map containing state information as returned by read_state_info :type states: dict<PoolElement, State> :return: returns True if in acquisition or False otherwise :rtype: bool""" for elem in states: s = states[elem][0][0] if self._is_in_action(s): return True
@DebugIt() def action_loop(self): i = 0 states, values = {}, {} for element in self._channels: states[element] = None # values[element] = None nap = self._acq_sleep_time nb_states_per_value = self._nb_states_per_value # read values to send a first event when starting to acquire with ActionContext(self): self.raw_read_value_loop(ret=values) for acquirable, value in values.items(): acquirable.put_value(value, propagate=2) while True: self.read_state_info(ret=states) if not self.in_acquisition(states): break # read value every n times if not i % nb_states_per_value: self.read_value_loop(ret=values) for acquirable, value in values.items(): acquirable.put_value(value) time.sleep(nap) i += 1 for slave in self._slaves: try: slave.stop_action() except Exception: self.warning("Unable to stop slave acquisition %s", slave.getLogName()) self.debug("Details", exc_info=1) with ActionContext(self): self.raw_read_state_info(ret=states) self.raw_read_value_loop(ret=values) for acquirable, state_info in states.items(): # first update the element state so that value calculation # that is done after takes the updated state into account acquirable.set_state_info(state_info, propagate=0) if acquirable in values: value = values[acquirable] acquirable.put_value(value, propagate=2) with acquirable: acquirable.clear_operation() state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=2)
class Pool0DAcquisition(PoolAction): def __init__(self, main_element, name="0DAcquisition"): self._channels = None PoolAction.__init__(self, main_element, name) def start_action(self, *args, **kwargs): """Prepares everything for acquisition and starts it. :param: config""" pool = self.pool self._index = kwargs.get("idx") # prepare data structures # TODO: rollback this change when a proper synchronization between # acquisition actions will be develop. # Now the meta acquisition action is resettung them to 0. # self._aborted = False # self._stopped = False self._acq_sleep_time = kwargs.pop("acq_sleep_time", pool.acq_loop_sleep_time) self._nb_states_per_value = \ kwargs.pop("nb_states_per_value", pool.acq_loop_states_per_value) items = kwargs.get("items") if items is None: items = self.get_elements() cfg = kwargs['config'] pool_ctrls_dict = dict(cfg['controllers']) pool_ctrls_dict.pop('__tango__', None) pool_ctrls = [] for ctrl in pool_ctrls_dict: if ElementType.ZeroDExpChannel in ctrl.get_ctrl_types(): pool_ctrls.append(ctrl) # Determine which channels are active self._channels = channels = {} for pool_ctrl in pool_ctrls: ctrl = pool_ctrl.ctrl pool_ctrl_data = pool_ctrls_dict[pool_ctrl] elements = pool_ctrl_data['channels'] for element, element_info in elements.items(): channel = Channel(element, info=element_info) channels[element] = channel with ActionContext(self): # set the state of all elements to and inform their listeners for channel in channels: channel.clear_buffer() channel.set_state(State.Moving, propagate=2) def in_acquisition(self, states): """Determines if we are in acquisition or if the acquisition has ended based on the current unit trigger modes and states returned by the controller(s) :param states: a map containing state information as returned by read_state_info :type states: dict<PoolElement, State> :return: returns True if in acquisition or False otherwise :rtype: bool""" for state in states: s = states[state][0] if self._is_in_action(s): return True def action_loop(self): states, values = {}, {} for element in self._channels: states[element] = None values[element] = None nap = self._acq_sleep_time while True: self.read_value(ret=values) for acquirable, value in values.items(): acquirable.put_current_value(value, propagate=0) if self._stopped or self._aborted: break time.sleep(nap) for element in self._channels: value = element.accumulated_value.value_obj element.append_value_buffer(value, self._index, propagate=2) with ActionContext(self): self.raw_read_state_info(ret=states) for acquirable, state_info in states.items(): # first update the element state so that value calculation # that is done after takes the updated state into account state_info = acquirable._from_ctrl_state_info(state_info) acquirable.set_state_info(state_info, propagate=0) with acquirable: acquirable.clear_operation() acquirable.set_state_info(state_info, propagate=2) def stop_action(self, *args, **kwargs): """Stop procedure for this action.""" self._stopped = True def abort_action(self, *args, **kwargs): """Aborts procedure for this action""" self._aborted = True class PoolIORAcquisition(PoolAction): def __init__(self, pool, name="IORAcquisition"): self._channels = None PoolAction.__init__(self, pool, name) def start_action(self, *args, **kwargs): pass def in_acquisition(self, states): return True pass @DebugIt() def action_loop(self): i = 0 states, values = {}, {} for element in self._channels: states[element] = None values[element] = None # read values to send a first event when starting to acquire self.read_value(ret=values) for acquirable, value in values.items(): acquirable.put_value(value, propagate=2) while True: self.read_state_info(ret=states) if not self.in_acquisition(states): break # read value every n times if not i % 5: self.read_value(ret=values) for acquirable, value in values.items(): acquirable.put_value(value) i += 1 time.sleep(0.01) self.read_state_info(ret=states) # first update the element state so that value calculation # that is done after takes the updated state into account for acquirable, state_info in states.items(): acquirable.set_state_info(state_info, propagate=0) # Do NOT send events before we exit the OperationContext, otherwise # we may be asked to start another action before we leave the context # of the current action. Instead, send the events in the finish hook # which is executed outside the OperationContext def finish_hook(*args, **kwargs): # read values and propagate the change to all listeners self.read_value(ret=values) for acquirable, value in values.items(): acquirable.put_value(value, propagate=2) # finally set the state and propagate to all listeners for acquirable, state_info in states.items(): acquirable.set_state_info(state_info, propagate=2) self.set_finish_hook(finish_hook)