Source code for sardana.tango.pool.MeasurementGroup

#!/usr/bin/env python

##############################################################################
##
# This file is part of Sardana
##
# http://www.sardana-controls.org/
##
# Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
# Sardana is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
##
# Sardana is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
##
# You should have received a copy of the GNU Lesser General Public License
# along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""The sardana tango measurement group module"""

__all__ = ["MeasurementGroup", "MeasurementGroupClass"]

__docformat__ = 'restructuredtext'

import sys
import time

from PyTango import Except, DevVoid, DevLong, DevDouble, DevString, \
    DispLevel, DevState, AttrQuality, READ, READ_WRITE, SCALAR

from taurus.core.util.codecs import CodecFactory
from taurus.core.util.log import DebugIt

from sardana import State, SardanaServer
from sardana.sardanaattribute import SardanaAttribute
from sardana.pool import AcqMode
from sardana.pool.pooldefs import SynchDomain, SynchParam
from sardana.tango.core.util import exception_str
from sardana.tango.pool.PoolDevice import PoolGroupDevice, PoolGroupDeviceClass


[docs]class MeasurementGroup(PoolGroupDevice): def __init__(self, dclass, name): PoolGroupDevice.__init__(self, dclass, name)
[docs] def init(self, name): PoolGroupDevice.init(self, name)
[docs] def get_measurement_group(self): return self.element
[docs] def set_measurement_group(self, measurement_group): self.element = measurement_group
measurement_group = property(get_measurement_group, set_measurement_group) @DebugIt() def delete_device(self): PoolGroupDevice.delete_device(self) mg = self.measurement_group if mg is not None: mg.remove_listener(self.on_measurement_group_changed) @DebugIt() def init_device(self): PoolGroupDevice.init_device(self) # state and status are already set by the super class detect_evts = "latencytime", "moveable", "synchronization" non_detect_evts = "configuration", "integrationtime", "monitorcount", \ "acquisitionmode", "elementlist", "repetitions" self.set_change_events(detect_evts, non_detect_evts) self.Elements = list(self.Elements) for i in range(len(self.Elements)): try: self.Elements[i] = int(self.Elements[i]) except: pass mg = self.measurement_group if mg is None: full_name = self.get_full_name() name = self.alias or full_name self.measurement_group = mg = \ self.pool.create_measurement_group(name=name, full_name=full_name, id=self.Id, user_elements=self.Elements) mg.add_listener(self.on_measurement_group_changed) # force a state read to initialize the state attribute #state = self.measurement_group.state self.set_state(DevState.ON)
[docs] def on_measurement_group_changed(self, event_source, event_type, event_value): try: self._on_measurement_group_changed( event_source, event_type, event_value) except: msg = 'Error occured "on_measurement_group_changed(%s.%s): %s"' exc_info = sys.exc_info() self.error(msg, self.measurement_group.name, event_type.name, exception_str(*exc_info[:2])) self.debug("Details", exc_info=exc_info)
def _on_measurement_group_changed(self, event_source, event_type, event_value): # during server startup and shutdown avoid processing element # creation events if SardanaServer.server_state != State.Running: return timestamp = time.time() name = event_type.name name = name.replace('_', '') multi_attr = self.get_device_attr() attr = multi_attr.get_attr_by_name(name) quality = AttrQuality.ATTR_VALID priority = event_type.priority error = None if name == "state": event_value = self.calculate_tango_state(event_value) elif name == "status": event_value = self.calculate_tango_status(event_value) elif name == "acquisitionmode": event_value = AcqMode.whatis(event_value) elif name == "configuration": cfg = self.measurement_group.get_user_configuration() codec = CodecFactory().getCodec('json') _, event_value = codec.encode(('', cfg)) elif name == "synchronization": codec = CodecFactory().getCodec('json') _, event_value = codec.encode(('', event_value)) else: if isinstance(event_value, SardanaAttribute): if event_value.error: error = Except.to_dev_failed(*event_value.exc_info) timestamp = event_value.timestamp event_value = event_value.value self.set_attribute(attr, value=event_value, timestamp=timestamp, quality=quality, priority=priority, error=error, synch=False) def _synchronization_str2enum(self, synchronization): '''Translates synchronization data structure so it uses SynchDomain enums as keys instead of strings. ''' for group in synchronization: for param, conf in group.iteritems(): group.pop(param) param = SynchParam.fromStr(param) group[param] = conf # skip repeats cause its value is just a long number if param == SynchParam.Repeats: continue for domain, value in conf.iteritems(): conf.pop(domain) domain = SynchDomain.fromStr(domain) conf[domain] = value return synchronization
[docs] def always_executed_hook(self): pass
#state = to_tango_state(self.motor_group.get_state(cache=False))
[docs] def read_attr_hardware(self, data): pass
[docs] def read_IntegrationTime(self, attr): it = self.measurement_group.integration_time if it is None: it = float('nan') attr.set_value(it)
[docs] def write_IntegrationTime(self, attr): self.measurement_group.integration_time = attr.get_write_value()
[docs] def read_MonitorCount(self, attr): it = self.measurement_group.monitor_count if it is None: it = 0 attr.set_value(it)
[docs] def write_MonitorCount(self, attr): self.measurement_group.monitor_count = attr.get_write_value()
[docs] def read_AcquisitionMode(self, attr): acq_mode = self.measurement_group.acquisition_mode acq_mode_str = AcqMode.whatis(acq_mode) attr.set_value(acq_mode_str)
[docs] def write_AcquisitionMode(self, attr): acq_mode_str = attr.get_write_value() try: acq_mode = AcqMode.lookup[acq_mode_str] except KeyError: raise Exception("Invalid acquisition mode. Must be one of " + ", ".join(AcqMode.keys())) self.measurement_group.acquisition_mode = acq_mode
[docs] def read_Configuration(self, attr): cfg = self.measurement_group.get_user_configuration() codec = CodecFactory().getCodec('json') data = codec.encode(('', cfg)) attr.set_value(data[1])
[docs] def write_Configuration(self, attr): data = attr.get_write_value() cfg = CodecFactory().decode(('json', data), ensure_ascii=True) self.measurement_group.set_configuration_from_user(cfg)
[docs] def read_Repetitions(self, attr): repetitions = self.measurement_group.repetitions if repetitions is None: repetitions = int('nan') attr.set_value(repetitions)
[docs] def write_Repetitions(self, attr): self.measurement_group.repetitions = attr.get_write_value()
[docs] def read_Moveable(self, attr): moveable = self.measurement_group.moveable if moveable is None: moveable = 'None' attr.set_value(moveable)
[docs] def write_Moveable(self, attr): self.measurement_group.moveable = attr.get_write_value()
[docs] def read_Synchronization(self, attr): synchronization = self.measurement_group.synchronization codec = CodecFactory().getCodec('json') data = codec.encode(('', synchronization)) attr.set_value(data[1])
[docs] def write_Synchronization(self, attr): data = attr.get_write_value() synchronization = CodecFactory().decode(('json', data), ensure_ascii=True) # translate dictionary keys synchronization = self._synchronization_str2enum(synchronization) self.measurement_group.synchronization = synchronization
[docs] def read_LatencyTime(self, attr): latency_time = self.measurement_group.latency_time attr.set_value(latency_time)
[docs] def Start(self): try: self.wait_for_operation() except: raise Exception("Cannot acquire: already involved in an operation") self.measurement_group.start_acquisition()
[docs] def Stop(self): self.measurement_group.stop()
[docs] def StartMultiple(self, n): try: self.wait_for_operation() except: raise Exception("Cannot acquire: already involved in an operation") self.measurement_group.start_acquisition(multiple=n)
[docs]class MeasurementGroupClass(PoolGroupDeviceClass): # Class Properties class_property_list = { } # Device Properties device_property_list = { } device_property_list.update(PoolGroupDeviceClass.device_property_list) # Command definitions cmd_list = { 'Start': [[DevVoid, ""], [DevVoid, ""]], 'StartMultiple': [[DevLong, ""], [DevVoid, ""]], } cmd_list.update(PoolGroupDeviceClass.cmd_list) # Attribute definitions attr_list = { 'IntegrationTime': [[DevDouble, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.OPERATOR}], 'MonitorCount': [[DevLong, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.OPERATOR}], 'AcquisitionMode': [[DevString, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.OPERATOR}], 'Configuration': [[DevString, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.EXPERT}], 'Repetitions': [[DevLong, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.OPERATOR}], 'Moveable': [[DevString, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.EXPERT}], 'Synchronization': [[DevString, SCALAR, READ_WRITE], {'Memorized': "true", 'Display level': DispLevel.EXPERT}], 'LatencyTime': [[DevDouble, SCALAR, READ], {'Display level': DispLevel.EXPERT}], } attr_list.update(PoolGroupDeviceClass.attr_list) def _get_class_properties(self): ret = PoolGroupDeviceClass._get_class_properties(self) ret['Description'] = "Measurement group device class" ret['InheritedFrom'].insert(0, 'PoolGroupDevice') return ret