Source code for orbit.py_linac.lattice.LinacDiagnosticsNodes

"""
This package is a collection of diagnostics nodes fo linacs.
"""

import os
import math
import sys

# ---- MPI module function and classes
from orbit.core.orbit_mpi import mpi_comm, mpi_datatype, mpi_op, MPI_Allreduce

# import from orbit Python utilities
from orbit.utils import orbitFinalize
from orbit.utils import phaseNearTargetPhase, phaseNearTargetPhaseDeg
from orbit.utils import speed_of_light

# import from orbit c++ utilities
from orbit.core.orbit_utils import Polynomial, Function

# from LinacAccLattice import Sequence
from orbit.py_linac.lattice.LinacAccLatticeLib import Sequence
from orbit.py_linac.lattice.LinacAccNodes import BaseLinacNode, LinacNode


[docs]class LinacBPM(BaseLinacNode): """ The linac BPM representation. It calculates the average x,y, and phases of all particles in the bunch, and the amplitude signal which is an amplitude of the Fourier harmonics. To calculate this it needs the frequency [Hz] of BPM's electronics and the normalization factor. At this moment, all functionality is implemented on the Python level, but in the future the operations with the particles' coordinates should be moved to C++ level. """ def __init__(self, frequency=805.0e6, name="BPM"): BaseLinacNode.__init__(self, name) self.setType("BPM") self.frequency = 805.0e6 self.phase_hist_arr = [] self.phase_step = 1.0 # in deg np = int(360.0 / self.phase_step) self.phase_step = 360.0 / np for ind in range(np): self.phase_hist_arr.append(0.0) self.x_avg = 0.0 self.y_avg = 0.0 self.phase_avg = 0.0 # in deg self.phase_max = 0.0 # in deg self.synch_pahse = 0.0 # in deg self.rms_phase = 0.0 # in deg self.norm_coeff = 1.0 self.fourier_amp = 0.0 self.fourier_phase = 0.0 # in deg self.amp = 0.0
[docs] def setNormalizationCoefficient(self, norm_coeff): self.norm_coeff = norm_coeff
[docs] def getNormalizationCoefficient(self): return self.norm_coeff
[docs] def setFrequency(self, frequency): self.frequency = frequency
[docs] def getFrequency(self): return self.frequency
def _cleanPhaseHist(self): for ind in range(len(self.phase_hist_arr)): self.phase_hist_arr[ind] = 0.0
[docs] def getAvgX(self): return self.x_avg
[docs] def getAvgY(self): return self.y_avg
[docs] def getPhaseRMS(self): return self.rms_phase
[docs] def getAvgPhase(self): return self.phase_avg
[docs] def getSynchPhase(self): return self.synch_pahse
[docs] def getAmplitude(self): return self.amp
[docs] def getFourierAmplitude(self): return self.fourier_amp
[docs] def getFourierPhase(self): return self.fourier_phase
[docs] def getPeakPhase(self): return self.phase_max
[docs] def dumpPhaseHistorgam(self, file_out_name=None): nHistP = len(self.phase_hist_arr) fl_out = None if file_out_name != None: fl_out = open(file_out_name, "w") for ind in range(nHistP): val = self.phase_hist_arr[ind] phase = (ind + 0.5) * self.phase_step - 180.0 st = " %3d " % ind st += " %7.2f %12.5g " % (phase, val) if fl_out != None: fl_out.write(st + "\n") else: print(st) if fl_out != None: fl_out.close()
[docs] def track(self, paramsDict): """ The LinacBPM class implementation of the AccNode class track(paramsDict) method. """ bunch = paramsDict["bunch"] self.analyzeBunch(bunch)
[docs] def analyzeBunch(self, bunch): """ Here we assume that the macrosize is the same for each particle """ comm = bunch.getMPIComm() self._cleanPhaseHist() self.x_avg = 0.0 self.y_avg = 0.0 self.phase_avg = 0.0 # in deg self.phase_max = 0.0 # in deg self.synch_pahse = 0.0 # in d self.fourier_amp = 0.0 self.fourier_phase = 0.0 # in deg self.amp = 0.0 nPartsGlobal = bunch.getSizeGlobal() if nPartsGlobal == 0: return x_avg = 0.0 y_avg = 0.0 phase_avg = 0.0 beta = bunch.getSyncParticle().beta() z_to_phase = -360.0 * self.frequency / (speed_of_light * beta) synch_phase = 360.0 * bunch.getSyncParticle().time() * self.frequency self.synch_pahse = phaseNearTargetPhaseDeg(synch_phase, 0.0) nParts = bunch.getSize() for ind in range(nParts): x_avg += bunch.x(ind) y_avg += bunch.y(ind) phase_avg += z_to_phase * bunch.z(ind) # ---- for parallel case (x_avg, y_avg, phase_avg) = MPI_Allreduce((x_avg, y_avg, phase_avg), mpi_datatype.MPI_DOUBLE, mpi_op.MPI_SUM, comm) x_avg /= nPartsGlobal y_avg /= nPartsGlobal phase_avg /= nPartsGlobal phase2_avg = 0.0 for ind in range(nParts): phase2_avg += (z_to_phase * bunch.z(ind) - phase_avg) ** 2 phase2_avg /= nPartsGlobal self.rms_phase = math.sqrt(phase2_avg) self.x_avg = x_avg self.y_avg = y_avg phase_avg += synch_phase self.phase_avg = phaseNearTargetPhaseDeg(phase_avg, 0.0) # ------- fill out histogram nHistP = len(self.phase_hist_arr) for ind in range(nParts): phase_ind = int((180.0 + phaseNearTargetPhaseDeg(z_to_phase * bunch.z(ind), 0.0)) / self.phase_step) phase_ind = phase_ind % nHistP if phase_ind < 0: phase_ind = 0 if phase_ind >= nHistP: phase_ind = nHistP - 1 self.phase_hist_arr[phase_ind] += 1.0 phase_hist_arr = MPI_Allreduce(self.phase_hist_arr, mpi_datatype.MPI_DOUBLE, mpi_op.MPI_SUM, comm) phase_hist_arr = list(phase_hist_arr) # ---- find the position of the max value total_sum = 0.0 ind_max = -1 max_val = 0 for ind in range(nHistP): val = phase_hist_arr[ind] if val > max_val: ind_max = ind max_val = val self.phase_hist_arr[ind] = val total_sum += val self.phase_max = (ind_max + 0.5) * self.phase_step self.phase_max -= 180.0 self.phase_max += synch_phase self.phase_max = phaseNearTargetPhaseDeg(self.phase_max, 0.0) # ---- calculate Fourier amplitude sin_sum = 0.0 cos_sum = 0.0 grad_to_rad_coeff = math.pi / 180.0 phase_step = self.phase_step * grad_to_rad_coeff # ---- normalization n_local_coeff = 1.0 / (total_sum * phase_step) for ind in range(nHistP): phase_hist_arr[ind] *= n_local_coeff for ind in range(nHistP): self.phase_hist_arr[ind] = phase_hist_arr[ind] # --- Fourier amplitude and phase for ind in range(nHistP): val = phase_hist_arr[ind] phase = (ind + 0.5) * self.phase_step * grad_to_rad_coeff sin_sum += val * math.sin(phase) cos_sum += val * math.cos(phase) sin_sum *= self.phase_step * grad_to_rad_coeff cos_sum *= self.phase_step * grad_to_rad_coeff self.fourier_amp = math.sqrt(sin_sum**2 + cos_sum**2) / math.pi self.amp = self.norm_coeff * self.fourier_amp self.fourier_phase = -math.atan2(cos_sum, sin_sum) * 180.0 / math.pi - 90.0 self.fourier_phase += synch_phase self.fourier_phase = phaseNearTargetPhaseDeg(self.fourier_phase, 0.0)