#!/usr/bin/env python
# --------------------------------------------------------------
# The functions and classes for the lattice modifications with
# different type of error nodes. You do not need these classes
# if you apply errors only to handful number of nodes, but it will
# be useful if, for instance, you want to apply certain type of errors
# with certain error parameters to all quads in the lattice.
# At this moment , there are the following types:
# 1. Coordinate displacement error nodes. They shift coordinates
# of the macro-particles at some point (like start of the lattice node)
# and then back at other point downstream (or at the end of the node).
# The shift of the energy can be used as errors in the dipole fields
# in the bend nodes.
# --------------------------------------------------------------
import math
import sys
import os
import time
import random
# ---- we need MPI for Gaussian distribution errors to be sure the lattices
# ---- are the same across all relevant node (the same communicator)
from orbit.core.orbit_mpi import mpi_comm, mpi_datatype, mpi_op, MPI_Bcast
# import from orbit Python utilities
from orbit.utils import orbitFinalize
from orbit.utils import NamedObject
from orbit.utils import TypedObject
# import general accelerator elements and lattice
from orbit.lattice import AccNode
from orbit.py_linac.lattice import Quad
from orbit.py_linac.lattice import DCorrectorH, DCorrectorV
from orbit.py_linac.lattice import Bend
# import error controllers from orbit.py_linac.errors package
from orbit.py_linac.errors import ErrorCntrlCoordDisplacement
from orbit.py_linac.errors import ErrorCntrlBendField
from orbit.py_linac.errors import ErrorCntrlLongitudinalDisplacement
from orbit.py_linac.errors import ErrorCntrlStraightRotationX
from orbit.py_linac.errors import ErrorCntrlStraightRotationY
from orbit.py_linac.errors import ErrorCntrlStraightRotationZ
[docs]class ErrorForNodesModification(NamedObject, TypedObject):
"""
The base abstract class for set of separate nodes modification
with two error nodes: one at the entrance and one at the exit
of the lattice node. The lattice is specified for possible
needs in the future when we will introduce errors for some
section of the lattice as a whole. Right now we are working
only with nodes.
"""
def __init__(self, name="no_name", type_in="ErrorForNodesModification"):
NamedObject.__init__(self, name)
TypedObject.__init__(self, type_in)
self.nodes = []
self.error_controllers = []
self.node_to_cntrl_dict = {}
self.lattice = None
def _getInstanceOfErrorController(self):
"""
This is abstract method. The subclasses should implement this method
according the particular error type.
"""
return None
[docs] def updateErrorParameters(self):
"""
This is abstract method. The sub-classes should implement this method
according the particular error type. It updates the error defining
parameters for all registered error controllers.
"""
pass
[docs] def getLatticeNodes(self):
"""
Returns the list of lattice nodes to which we applied errors.
"""
return self.nodes
[docs] def getErrorControllers(self):
"""
Returns the array of error controllers.
"""
return self.error_controllers
[docs] def addLatticeNodes(self, nodes, lattice=None):
"""
Adds the error controllers to the nodes
"""
self.lattice = lattice
self.nodes += nodes
for node in self.nodes:
errCntrl = self._getInstanceOfErrorController()
errCntrl.setName("ErrCntrl:" + errCntrl.getShortTypeName() + ":" + node.getName())
errCntrl.setLattice(lattice)
errCntrl.setOneNodeParent(node)
self.error_controllers.append(errCntrl)
self.node_to_cntrl_dict[node] = errCntrl
self.updateErrorParameters()
[docs]class CoordinateDisplacementNodesModification(ErrorForNodesModification):
"""
This class applies the coordinate displacement errors to the set of nodes.
The parameters of displacements are translated to all error controllers.
It could be fixed (all nodes will have the same displacement) or
random values distributed around 0. by Gaussian with one sigma.
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "CoordinateDisplacementNodesModification")
# ---- these parameters can be interpreted as just values or sigmas for
# ---- for Gaussian distributions
self.param_dict = {"dx": 0.0, "dxp": 0.0, "dy": 0.0, "dyp": 0.0, "dz": 0.0, "dE": 0.0}
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlCoordDisplacement error controller.
"""
return ErrorCntrlCoordDisplacement()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the error defining parameters for all registered
error controllers.
"""
for key in self.param_dict.keys():
for errCntrl in self.error_controllers:
errCntrl.setDisplacementParameter(key, self.param_dict[key])
[docs] def getCoordinateDisplacementParameters(self):
"""
Returns tuple with coordinate shift parameters.
"""
dx = self.param_dict["dx"]
dxp = self.param_dict["dxp"]
dy = self.param_dict["dy"]
dyp = self.param_dict["dyp"]
dz = self.param_dict["dz"]
dE = self.param_dict["dE"]
return (dx, dxp, dy, dyp, dz, dE)
[docs] def setDisplacementParameter(self, key, value):
"""
Sets the error value for a particular coordinate for all nodes.
The cooridinate is defined by key parameter.
"""
if key in self.param_dict:
self.param_dict[key] = value
else:
msg = "Class CoordinateDisplacementNodesModification - key-value problem"
msg = msg + os.linesep
msg = msg + "Method setDisplacementParameter:"
msg = msg + os.linesep
msg = msg + "You are trying to set value for key=" + key
msg = msg + os.linesep
msg = msg + "Keys could be only = (dx, dxp, dy, dyp, dz, dE)"
msg = msg + os.linesep
orbitFinalize(msg)
return
self.updateErrorParameters()
[docs] def setGaussDistributedDisplacementParameter(self, key, value, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error value for a particular coordinate for all nodes.
The cooridinate is defined by key parameter.
"""
value = abs(value)
if key in self.param_dict:
self.param_dict[key] = value
else:
msg = "Class CoordinateDisplacementNodesModification - key-value problem"
msg = msg + os.linesep
msg = msg + "Method setGaussDistributedDisplacementParameter:"
msg = msg + os.linesep
msg = msg + "You are trying to set value for key=" + key
msg = msg + os.linesep
msg = msg + "Keys could be only = (dx, dxp, dy, dyp, dz, dE)"
msg = msg + os.linesep
orbitFinalize(msg)
return
# ---------------------------------------------------------------------
for errCntrl in self.error_controllers:
value_tmp = random.gauss(0.0, value)
while abs(value_tmp) > abs(value) * cut_off_level:
value_tmp = random.gauss(0.0, value)
main_rank = 0
value_tmp = MPI_Bcast(value_tmp, mpi_datatype.MPI_DOUBLE, main_rank, comm)
errCntrl.setDisplacementParameter(key, value_tmp)
[docs]class LongitudinalDisplacementNodesModification(ErrorForNodesModification):
"""
This class shifts lattice nodes in longitudinal directions.
The shift length is sent to all error controllers.
It could be fixed (all nodes will have the same displacement) or
random values distributed around 0. by Gaussian with one sigma.
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "LongitudinalDisplacementNodesModification")
# ---- this parameter can be interpreted as just a value or a sigma for
# ---- for Gaussian distributions
self.shift_length = shift_length
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlCoordDisplacement error controller.
"""
return ErrorCntrlLongitudinalDisplacement()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the error defining parameters for all registered
error controllers.
"""
for errCntrl in self.error_controllers:
errCntrl.setShiftLength(self.shift_length)
[docs] def getShiftLength(self):
"""
Returns the shift-length parameter.
"""
return self.shift_length
[docs] def setShiftLength(self, shift_length):
"""
Sets the shift-length parameter.
"""
self.shift_length = shift_length
self.updateErrorParameters()
[docs] def setGaussDistributedShiftLength(self, shift_length, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error shift_length for all nodes.
"""
for errCntrl in self.error_controllers:
shift_length_tmp = random.gauss(0.0, shift_length)
while abs(shift_length_tmp) > abs(shift_length) * cut_off_level:
shift_length_tmp = random.gauss(0.0, shift_length)
main_rank = 0
shift_length_tmp = MPI_Bcast(shift_length, mpi_datatype.MPI_DOUBLE, main_rank, comm)
errCntrl.setShiftLength(shift_length_tmp)
[docs]class StraightRotationZ_NodesModification(ErrorForNodesModification):
"""
This class rotate lattice nodes around z-axis directions.
The rotating angle is sent to all error controllers.
It could be fixed (all nodes will have the same angle) or
random values distributed around 0. by Gaussian with one sigma.
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "StraightRotationZ_NodesModification")
# ---- this parameter can be interpreted as just a value or a sigma for
# ---- for Gaussian distributions
self.angle = 0.0
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlStraightRotationZ error controller.
"""
return ErrorCntrlStraightRotationZ()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the error defining parameters for all registered
error controllers.
"""
for errCntrl in self.error_controllers:
errCntrl.setRotationAngle(self.angle)
[docs] def getRotationAngle(self):
"""
Returns the angle parameter.
"""
return self.angle
[docs] def setRotationAngle(self, angle):
"""
Sets the angle parameter.
"""
self.angle = angle
self.updateErrorParameters()
[docs] def setGaussDistributedAngle(self, angle, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error angle for all nodes.
"""
for errCntrl in self.error_controllers:
angle_tmp = random.gauss(0.0, angle)
while abs(angle_tmp) > abs(angle) * cut_off_level:
angle_tmp = random.gauss(0.0, angle)
main_rank = 0
angle_tmp = MPI_Bcast(angle, mpi_datatype.MPI_DOUBLE, main_rank, comm)
errCntrl.setRotationAngle(angle_tmp)
[docs]class StraightRotationX_NodesModification(ErrorForNodesModification):
"""
This class rotate lattice nodes around x-axis directions.
The rotating angle is sent to all error controllers.
It could be fixed (all nodes will have the same angle) or
random values distributed around 0. by Gaussian with one sigma.
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "StraightRotationX_NodesModification")
# ---- this parameter can be interpreted as just a value or a sigma for
# ---- for Gaussian distributions
self.angle = 0.0
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlStraightRotationZ error controller.
"""
return ErrorCntrlStraightRotationX()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the error defining parameters for all registered
error controllers.
"""
for errCntrl in self.error_controllers:
entr_parent_node = errCntrl.getEntanceNodeParent()
exit_parent_node = errCntrl.getExitNodeParent()
if entr_parent_node != exit_parent_node:
msg = "Class StraightRotationX_NodesModification update parameters problem!"
msg = msg + os.linesep
msg = msg + "For this type of Error Node Controllers parent node is one, not two!"
msg = msg + os.linesep
msg = msg + "Entrance parent node=" + entr_parent_node.getName()
msg = msg + os.linesep
msg = msg + "Exit parent node=" + exit_parent_node.getName()
msg = msg + os.linesep
msg = msg + "Stop"
msg = msg + os.linesep
orbitFinalize(msg)
return
errCntrl.setRotationAngle(self.angle)
errCntrl.setBaseLength(entr_parent_node.getLength())
[docs] def getRotationAngle(self):
"""
Returns the angle parameter.
"""
return self.angle
[docs] def setRotationAngle(self, angle):
"""
Sets the angle parameter.
"""
self.angle = angle
self.updateErrorParameters()
[docs] def setGaussDistributedAngle(self, angle, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error angle for all nodes.
"""
for errCntrl in self.error_controllers:
angle_tmp = random.gauss(0.0, angle)
while abs(angle_tmp) > abs(angle) * cut_off_level:
angle_tmp = random.gauss(0.0, angle)
main_rank = 0
angle_tmp = MPI_Bcast(angle, mpi_datatype.MPI_DOUBLE, main_rank, comm)
errCntrl.setRotationAngle(angle_tmp)
[docs]class StraightRotationY_NodesModification(ErrorForNodesModification):
"""
This class rotate lattice nodes around y-axis directions.
The rotating angle is sent to all error controllers.
It could be fixed (all nodes will have the same angle) or
random values distributed around 0. by Gaussian with one sigma.
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "StraightRotationY_NodesModification")
# ---- this parameter can be interpreted as just a value or a sigma for
# ---- for Gaussian distributions
self.angle = 0.0
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlStraightRotationZ error controller.
"""
return ErrorCntrlStraightRotationY()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the error defining parameters for all registered
error controllers.
"""
for errCntrl in self.error_controllers:
entr_parent_node = errCntrl.getEntanceNodeParent()
exit_parent_node = errCntrl.getExitNodeParent()
if entr_parent_node != exit_parent_node:
msg = "Class StraightRotationY_NodesModification update parameters problem!"
msg = msg + os.linesep
msg = msg + "For this type of Error Node Controllers parent node is one, not two!"
msg = msg + os.linesep
msg = msg + "Entrance parent node=" + entr_parent_node.getName()
msg = msg + os.linesep
msg = msg + "Exit parent node=" + exit_parent_node.getName()
msg = msg + os.linesep
msg = msg + "Stop"
msg = msg + os.linesep
orbitFinalize(msg)
return
errCntrl.setRotationAngle(self.angle)
errCntrl.setBaseLength(entr_parent_node.getLength())
[docs] def getRotationAngle(self):
"""
Returns the angle parameter.
"""
return self.angle
[docs] def setRotationAngle(self, angle):
"""
Sets the angle parameter.
"""
self.angle = angle
self.updateErrorParameters()
[docs] def setGaussDistributedAngle(self, angle, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error angle for all nodes.
"""
for errCntrl in self.error_controllers:
angle_tmp = random.gauss(0.0, angle)
while abs(angle_tmp) > abs(angle) * cut_off_level:
angle_tmp = random.gauss(0.0, angle)
main_rank = 0
angle_tmp = MPI_Bcast(angle, mpi_datatype.MPI_DOUBLE, main_rank, comm)
errCntrl.setRotationAngle(angle_tmp)
# ------------------------------------------------------------------
# The magnet field errors application classes
# ------------------------------------------------------------------
[docs]class QuadFieldsErrorsDeployment(NamedObject, TypedObject):
"""
Class will apply the errors to the fields of the quads
"""
def __init__(self, name="no_name", type_in="QuadFieldsErrorsDeployment"):
NamedObject.__init__(self, name)
TypedObject.__init__(self, type_in)
# ---- self.quad_and_field_arr[ind] = [[quad,field_init],...]
self.quad_and_field_arr = []
[docs] def addQuads(self, quads):
"""
Add quads to the inner array of quads.
"""
for quad in quads:
if isinstance(quad, Quad):
self.quad_and_field_arr.append([quad, quad.getParam("dB/dr")])
[docs] def addQuad(self, quad):
"""
Add one quad to the inner array of quads.
"""
if isinstance(quad, Quad):
self.quad_and_field_arr.append([quad, quad.getParam("dB/dr")])
[docs] def restoreFields(self):
for [quad, field_init] in self.quad_and_field_arr:
quad.setParam("dB/dr", field_init)
[docs] def setGaussDistributedRealtiveErrors(self, relative_error, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error fields for all quads.
Changes will be different for all quads.
"""
for [quad, field_init] in self.quad_and_field_arr:
rel_err = random.gauss(0.0, relative_error)
while abs(rel_err) > abs(relative_error) * cut_off_level:
rel_err = random.gauss(0.0, relative_error)
main_rank = 0
rel_err = MPI_Bcast(rel_err, mpi_datatype.MPI_DOUBLE, main_rank, comm)
field = field_init * (1.0 + rel_err)
quad.setParam("dB/dr", field)
[docs] def setGaussDistributedRealtiveErrorToGroup(self,relative_error,cut_off_level = 3.0,comm = mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated error fields for all quads in the string.
Changes will be the same for all quads. We assume one PS for all quads.
"""
rel_err = random.gauss(0.,relative_error)
while(abs(rel_err) > abs(relative_error)*cut_off_level):
rel_err = random.gauss(0.,relative_error)
main_rank = 0
rel_err = orbit_mpi.MPI_Bcast(rel_err,mpi_datatype.MPI_DOUBLE,main_rank,comm)
for [quad,field_init] in self.quad_and_field_arr:
field = field_init*(1.0 + rel_err)
quad.setParam("dB/dr",field)
[docs]class BendFieldNodesModification(ErrorForNodesModification):
"""
This class will apply the errors to the fields of the bends using energy shift
"""
def __init__(self, name="no_name"):
ErrorForNodesModification.__init__(self, name, "BendFieldNodesModification")
# ---- defines a relative change in the bend field
self.relative_field_change = 0.0
def _getInstanceOfErrorController(self):
"""
Returns the instance of ErrorCntrlCoordDisplacement error controller.
"""
return ErrorCntrlBendField()
[docs] def updateErrorParameters(self):
"""
This is an implementation of parent class abstract method.
It updates the realtive field change parameter for all registered
error controllers.
"""
for errCntrl in self.error_controllers:
errCntrl.setRelativeFieldChange(self.relative_field_change)
[docs] def addBends(self, bends):
"""
Adds up bend nodes array.
"""
self.addLatticeNodes(bends)
[docs] def addBend(self, bend):
"""
Adds up bend node.
"""
self.addLatticeNodes(
[
bend,
]
)
[docs] def setRelativeFieldChange(self, relative_field_change):
"""
Sets the realtive change in the bend field.
"""
self.relative_field_change = relative_field_change
self.updateErrorParameters()
[docs] def getRelativeFieldChange(self):
"""
Returns the realtive change in the bend field.
"""
return self.relative_field_change
[docs] def setGaussDistributedRelativeFieldError(self, relative_error, cut_off_level=3.0, comm=mpi_comm.MPI_COMM_WORLD):
"""
Sets the random generated field error for all bends. The same value for all registered bends.
"""
rel_err = random.gauss(0.0, relative_error)
while abs(rel_err) > abs(relative_error) * cut_off_level:
rel_err = random.gauss(0.0, relative_error)
main_rank = 0
rel_err = MPI_Bcast(rel_err, mpi_datatype.MPI_DOUBLE, main_rank, comm)
self.relative_field_change = rel_err
self.updateErrorParameters()