import logging
from astropy import units as u
from astropy.coordinates import Angle
import numpy as np
from scipy.interpolate import interp1d
try:
from spectral_cube import SpectralCube
except ModuleNotFoundError:
# Error handling
pass
import os.path
directory = os.path.dirname(__file__)
track_dict = {
"3kf":"3kF_lbvRBD.dat", "3kpc_far":"3kF_lbvRBD.dat",
"3kn":"3kN_lbvRBD.dat", "3kpc_near":"3kN_lbvRBD.dat",
"aqr":"AqR_lbvRBD.dat", "aquila_rift":"AqR_lbvRBD.dat",
"aqs":"AqS_lbvRBD.dat", "aquila_spur":"AqS_lbvRBD.dat",
"cnn":"CnN_lbvRBD.dat", "connecting_near":"CnN_lbvRBD.dat",
"cnx":"CnX_lbvRBD.dat", "connecting_extension":"CnX_lbvRBD.dat",
"crn":"CrN_lbvRBD.dat", "carina_near":"CrN_lbvRBD.dat",
"crf":"CrF_lbvRBD.dat", "carina_far":"CrF_lbvRBD.dat",
"ctn":"CtN_lbvRBD.dat", "centaurus_near":"CtN_lbvRBD.dat",
"ctf":"CtF_lbvRBD.dat", "centaurus_far":"CtF_lbvRBD.dat",
"loc":"Loc_lbvRBD.dat", "local":"Loc_lbvRBD.dat",
"los":"LoS_lbvRBD.dat", "local_spur":"LoS_lbvRBD.dat",
"nor":"Nor_lbvRBD.dat", "norma":"Nor_lbvRBD.dat",
"osc":"OSC_lbvRBD.dat", "outer_centaurus":"OSC_lbvRBD.dat",
"outer_scutum":"OSC_lbvRBD.dat", "outer_scutum_centaurus":"OSC_lbvRBD.dat",
"outer":"Out_lbvRBD.dat", "out":"Out_lbvRBD.dat",
"per":"Per_lbvRBD.dat", "perseus":"Per_lbvRBD.dat",
"scn":"ScN_lbvRBD.dat", "scutum_near":"ScN_lbvRBD.dat",
"scf":"ScF_lbvRBD.dat", "scutum_far":"ScF_lbvRBD.dat",
"sgn":"SgN_lbvRBD.dat", "sagittarius_near":"SgN_lbvRBD.dat",
"sgf":"SgF_lbvRBD.dat", "sagittarius_far":"SgF_lbvRBD.dat"
}
def get_lbv_track(filename = None, reid_track = None, **kwargs):
"""
returns (longitude, latitude, velocity, radius, beta, distance) for spiral arm tracks
Parameters
----------
filename:'str', optional, must be keyword
Filename of custom LBV track to be read in
reid_track:'str', optional, must be keyword
Spiral Arm track from Reid et al. (2016) to read in
Options are not case sensitive:
3kpc Arm:
Far: "3kf", "3kpc_far"
Near: "3kn", "3kpc_near"
Aquila Rift: "AqR", "Aquila_Rift"
Aquila Spur: "AqS", "Aquila_Spur"
Connecting Arm:
Near: "CnN", "Connecting_Near"
Extension: "CnX", "Connecting_Extension"
Carina Arm:
Near: "CrN", "Carina_Near"
Far: "CrF", "Carina_Far"
Centaurus Arm:
Near: "CtN", "Centaurus_Near"
Far: "CtF", "Centaurus_Far"
Local Arm:
"Loc", "Local"
Spur?: "LoS", "Local_Spur"
Norma Arm:
"Nor", "Norma"
Outer Scutum Centaurus Arm:
"OSC", "Outer_Scutum", "Outer_Centaurus", "Outer_Scutum_Centaurus"
Outer Arm:
"Outer", "Out"
Perseus Arm:
"Per", "Perseus"
Scutum Arm:
Near: "ScN", "Scutum_Near"
Far: "ScF", "Scutum_Far"
Sagittarius Arm:
Near: "SgN", "Sagittarius_Near"
Far: "SgF", "Sagittarius_Far"
kwargs:
optional keywords passed to np.genfromtxt
"""
# Check for Reid Track Match:
if reid_track is not None:
try:
filename = os.path.join(directory, "data", "Reid16_SpiralArms", track_dict[reid_track.lower()])
except KeyError:
raise KeyError("Reid et al. (2016) Spiral Arm track for {} was not found!".format(reid_track))
else:
logging.warning("Not using provided data files - expecting columns to be ordered as lbv_RBD.")
# Try reading in data:
if not "comments" in kwargs:
# Default comments flag in Reid lbv_RBD Tracks
kwargs["comments"] = "!"
track = np.genfromtxt(filename, **kwargs)
# Check shape
if track.shape[1] != 6:
logging.warning("Track file did not have expected number of columns - adding in columns of 0s")
new_track = np.zeros((track.shape[0], 6))
for column in range(track.shape[1]):
new_track[:,column] = track[:,column]
track = new_track
return track
[docs]def get_spiral_slice(survey, track = None, filename = None,
brange = None, lrange = None,
interpolate = True, wrap_at_180 = False,
vel_width = None, return_track = False):
"""
Returns SkySurvey object isolated to velocity ranges corresponding to specified spiral arm
Parameters
----------
survey: 'whampy.skySurvey'
input skySurvey
track: 'np.ndarray', 'str', optional, must be keyword
if 'numpy array', lbv_RBD track data
if 'str', name of spiral arm from Reid et al. (2016)
filename: 'str', optional, must be keyword
filename of track file to read in using whampy.lbvTracks.get_lbv_track
brange: 'list', 'u.Quantity'
min,max latitude to restrict data to
Default of +/- 40 deg
lrange: 'list', 'u.Quantity'
min,max longitude to restrict data to
Default of full track extent
interpolate: 'bool', optional, must be keyword
if True, interpolates velocity to coordinate of pointings
if False, ... do nothing ... for now
Future: slices have velocities set by track
wrap_at_180: `bool`, optional, must be keyword
if True, wraps longitude angles at 180d
use if mapping accross Galactic Center
vel_width: `number`, `u.Quantity`, optional, must be keyword
velocity width to isolate in km/s
return_track: `bool`, optional, must be keyword
if True, will also return the spiral arm track as the second element of a tuple
"""
# Check Track Data
if track is not None:
if not track.__class__ is np.ndarray:
track = get_lbv_track(reid_track = track)
else:
if track.shape[1] < 2:
raise TypeError("track should have at least two columns (l,v) or up to 6 (lbvRBD).")
elif filename is None:
raise SyntaxError("No track or track filename provided!")
else:
track = get_lbv_track(filename = filename)
if wrap_at_180:
wrap_at = "180d"
else:
wrap_at = "360d"
# Check velocity width
if vel_width is None:
logging.warning("No Velocity width specified, using default of 16 km/s.")
vel_width = 16*u.km/u.s
elif not isinstance(vel_width, u.Quantity):
vel_width *= u.km/u.s
logging.warning("No units specified for vel_width, assuming km/s.")
#Extract Needed track informattion
if track.shape[1] == 2:
lon_track = track[:,0] * u.deg
lon_track = Angle(lon_track).wrap_at(wrap_at)
vel_track = track[:,1] * u.km/u.s
else: # Reid et al. (2016) format
lon_track = track[:,0] * u.deg
lon_track = Angle(lon_track).wrap_at(wrap_at)
vel_track = track[:,2] * u.km/u.s
# Set Default latitude range
if brange is None:
brange = [-40,40]*u.deg
elif isinstance(brange, u.Quantity):
brange = brange.to(u.deg)
else:
brange = brange * u.deg
logging.warning("No units specified for latitude range, assuming Degrees!")
# Set Longitude Range
if lrange is None:
lrange = [np.min(lon_track.value), np.max(lon_track.value)]*u.deg
elif isinstance(lrange, u.Quantity):
lrange = lrange.to(u.deg)
else:
lrange = lrange * u.deg
logging.warning("No units specified for longitude range, assuming Degrees!")
# Extract Relevant sky section
bounds = [lrange[0], lrange[1], brange[0], brange[1]]
sky_cut = survey.sky_section(bounds, wrap_at_180 = wrap_at_180)
# Restrict Velocity Space with a mask
wham_coords = sky_cut.get_SkyCoord()
if interpolate:
l_v_track = interp1d(lon_track, vel_track)
central_velocities = l_v_track(wham_coords.l.wrap_at(wrap_at)) * vel_track.unit
else: ## For now - always interpolate
l_v_track = interp1d(lon_track, vel_track)
central_velocities = l_v_track(wham_coords.l.wrap_at(wrap_at)) * vel_track.unit
# Get velocity masks
# Is there a better way to do this without a loop?
sky_cut["VEL_MASK"] = np.empty((len(sky_cut), len(sky_cut[0]["VELOCITY"])), dtype = bool)
for ell, vel in enumerate(central_velocities):
mask = sky_cut[ell]["VELOCITY"] <= (vel + vel_width/2).value
mask &= sky_cut[ell]["VELOCITY"] >= (vel - vel_width/2).value
sky_cut[ell]["VEL_MASK"] = mask
# Set Intensities
sky_cut["INTEN"], sky_cut["ERROR"] = sky_cut.moment(return_sigma = True, masked = True)
# Save Track info as attribute
sky_cut.lbv_RBD_track = track
if not return_track:
return sky_cut
else:
return sky_cut, track