#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Functions to make star clusters."""
__author__ = "Nathaniel Starkman"
__all__ = [
"make_Potential_Contenta2018",
"initialize_star_cluster_Contenta2018",
"make_noPBH_system",
"simulate_EriII",
]
##############################################################################
# IMPORTS
# BUILT-IN
from typing import Any, Union, Optional, Callable
from typing_extensions import Literal
# THIRD PARTY
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
# amuse
from amuse.ic.plummer import new_plummer_model
from amuse.io import write_set_to_file
from amuse.units import nbody_system
from amuse.units.quantities import Quantity
from amuse.community.bhtree.interface import BHTree
from amuse.community.sse.interface import SSE
# CUSTOM
# utilipy
from utilipy import LogFile
from utilipy.decorators import store_function_input
# amuse_util
from amuse_util import initialize_system, Systems
from amuse_util.units import (
amuse_units as amu,
astropy_units as apu,
to_astropy_decorator,
to_amuse,
to_amuse_decorator,
convert_units_decorator,
)
from amuse_util.ic import num_particles_from_mtot_given_mass_func
from amuse_util.ic.brokenimf import new_kroupa_mass_distribution
from amuse_util.ic.star_cluster import (
_initialize_star_cluster_unbound,
separate_bound_unbound,
SC_Tuple,
)
# PROJECT-SPECIFIC
from ... import plot
from ...analyze import make_summary_table
from ...potential import (
chandrasekhar_dynamical_friction,
DehnenSphericalPotential,
)
from ...utils import sim_utils
from ...utils.convert import (
Plummer_halfmass_to_virial_radius,
Plummer_virial_to_halfmass_radius,
)
##############################################################################
# PARAMETERS
_PLOT = True
_LOGFILE = LogFile(verbose=np.inf, header=False)
##############################################################################
# CODE
##############################################################################
[docs]@to_astropy_decorator(arguments=["mass", "a"])
def make_Potential_Contenta2018(
mass: Union[apu.Quantity, Quantity] = 4.79e8 * apu.solMass,
a: Union[apu.Quantity, Quantity] = 0.877 * apu.kpc,
cored: bool = True,
):
"""Make Eridanus II Potential from Contenta 2018.
Parameters
----------
mass: apu.Quantity or amu.Quantity, optional
a: apu.Quantity or amu.Quantity, optional
scale length
cored: bool
whether to create a cored (default) or cusped profile
corresponds to `alpha`=0 vs `1`
Returns
-------
System
with ``.gravity`` attribute
"""
if cored:
alpha = 0.0
else:
alpha = 1.0
return DehnenSphericalPotential(
mass=mass, a=a, alpha=alpha, to_system=True
)
# /def
#####################################################################
# initialize_star_cluster_Contenta2018
[docs]@to_amuse_decorator(
arguments=["Rhm", "position", "velocity", "smoothing_length", "timestep"]
)
def initialize_star_cluster_Contenta2018(
Mcluster: Union[int, amu.MSun],
potential: Any,
# object properties
Rhm: amu.parsec = 10 | amu.parsec,
position: amu.kpc = [0.14, 0, 0] | amu.kpc,
velocity=None,
vel_dir: amu.kms = [0, 1, 0],
# for evolution
evln_func=SSE,
# for gravity
gravity_func=BHTree,
gravity_kwargs={},
smoothing_length: amu.parsec = 0.1 | amu.parsec,
opening_angle: float = 0.6,
number_of_workers=8,
# util
timestep: amu.Myr = 25 | amu.kyr,
random=np.random.RandomState(0),
make_unbound=True,
# channel_attrs=["mass", "x", "y", "z", "vx", "vy", "vz"],
make_plot=False,
# logging
logger=_LOGFILE,
verbose=None,
# debugging
_scale_to_standard: bool = True,
_use_self_gravity: Union[Literal[0], Literal[1]] = 1,
):
"""Initialize Contenta (2018) star cluster.
Parameters
----------
Mcluster: amuse units.MSun
mass of cluster
if unitless, assumed to be number_of_particles
the number of particlues is derived using
`~num_particles_from_mtot_given_mass_func`
potential: galpy amuse potential
potential of galaxy about which cluster orbits
used for calculating a circular orbital velocity (Contenta 2018)
Rhm: amuse units.parsec
half-mass radius
pos: list amuse units.kpc
Galactocentric position
position: amuse distance 3-list quantity, optional (default [0,0,0] pc)
the position of the system in GC coordinates
velocity: amuse velocity quantity, optional (default None)
the velocity magnitude of the system in GC coordinates
None will calculate a circular velocity (magnitude) at the `position`
vel_dir: 3-list
velocity unit direction
evln_func: function, optional (default SeBa)
object evolution function
signature of function should be::
func(metallicity, **evln_kwargs)
the metallicity is 0.0008
smoothing_length: amuse distance quantity, optional (default 0 pc)
the smoothing length used in scaling and the gravity
opening_angle: float, optional (default 0.6)
number_of_workers: int, optional (default 1)
number of gravity workers
timestep: amuse time quantity, optional
the timestep for evolving the gravity and stellar evolution
default, 0.1 Gyr
random: True or random number generator, optional
(default True)
ex: np.random.default_rng(seed=0)
will default to random seed
include_unbound : bool
whether to also make the unbound cluster (default True)
Returns
-------
bound_system: System
bound_inputs: BoundArgument
BoundArgument to ``initialize_system``, which this function wraps
can be used with ``recreate_system`` to reconstruct a System.
unbound_system : System
if make_unbound is True
unbound_inputs : BoundArgument
if make_unbound is True
Raises
------
TypeError
if Mcluster does not have units of Mass and is not an integer
Notes
-----
metallicity=0.0008
"""
logger.record("Initialize star cluster from Contenta2018")
# call helper function _initialize_star_cluster_bound_Contenta2018
# returns system, inputs to ``initialize_system``,
# and inputs to _initialize_star_cluster_bound_Contenta2018
(system, inputs), _inputs = _initialize_star_cluster_bound_Contenta2018(
Mcluster=Mcluster,
potential=potential,
# object properties
Rhm=Rhm,
position=position,
velocity=velocity,
vel_dir=vel_dir,
# for evolution
evln_func=SSE,
# for gravity
gravity_func=BHTree,
gravity_kwargs={},
smoothing_length=smoothing_length,
opening_angle=opening_angle,
number_of_workers=number_of_workers,
# util
timestep=timestep,
random=random,
# channel_attrs=channel_attrs,
make_plot=make_plot,
# logging
logger=logger,
verbose=verbose,
# debugging
_scale_to_standard=_scale_to_standard,
_use_self_gravity=_use_self_gravity,
)
if make_unbound:
logger.record("\n\nFind unbound stars from star cluster.")
(
unbound_system,
unbound_inputs,
) = _initialize_star_cluster_unbound_Contenta2018(system, _inputs,)
else:
unbound_system, unbound_inputs = None, None
return SC_Tuple(
bound_system=system,
bound_inputs=inputs,
unbound_system=unbound_system,
unbound_inputs=unbound_inputs,
)
# /def
# -------------------------------------------------------------------
@store_function_input(store_inputs=True)
def _initialize_star_cluster_bound_Contenta2018(
Mcluster: (int, amu.MSun),
potential: Any,
# object properties
Rhm: amu.parsec,
position: amu.kpc,
velocity,
vel_dir: amu.kms,
# for evolution
evln_func: Callable,
# for gravity
gravity_func: Callable,
gravity_kwargs: dict,
smoothing_length: amu.parsec,
opening_angle: float,
number_of_workers: int,
# util
timestep: amu.Myr,
random,
make_plot: bool,
# logging
logger=_LOGFILE,
verbose=None,
# debugging
_scale_to_standard: bool = True,
_use_self_gravity: {0, 1} = 1,
):
"""Initialize Contenta (2018) star cluster.
Parameters
----------
Mcluster: amuse units.MSun
mass of cluster
if unitless, assumed to be number_of_particles
the number of particlues is derived using
number_of_stars_from_total_mass_given_mass_function
potential: galpy amuse potential
potential of galaxy about which cluster orbits
used for calculating a circular orbital velocity (Contenta 2018)
Rhm: amuse units.parsec
half-mass radius
pos: list amuse units.kpc
Galactocentric position
position: amuse distance 3-list quantity, optional (default [0,0,0] pc)
the position of the system in GC coordinates
velocity: amuse velocity quantity, optional (default None)
the velocity magnitude of the system in GC coordinates
None will calculate a circular velocity (magnitude) at the `position`
vel_dir: 3-list
velocity unit direction
evln_func: function, optional (default SeBa)
object evolution function
signature of function should be
func(`metallicity`, **`evln_kwargs`)
the metallicity is 0.0008
smoothing_length: amuse distance quantity, optional (default 0 pc)
the smoothing length used in scaling and the gravity
opening_angle: float, optional (default 0.6)
number_of_workers: int, optional (default 1)
number of gravity workers
timestep: amuse time quantity, optional (default .1 Gyr)
the timestep for evolving the gravity and stellar evolution
random: True or random number generator, optional (default True)
ex: np.random.default_rng(seed=0)
will default to random seed
# channel_attrs: list or None, optional
# default ['mass', 'x', 'y', 'z', 'vx', 'vy', 'vz']
Returns
-------
system: class
# cluster
.cluster
.cluster_gravity
# stars
.stellar
SSE with `metallicity`
has the particles .star_distribution
has channel .channel_from_stellar_to_cluster_gravity
.star_distribution
plummer_model w/ number_of_particles
does NOT have any channels
# channels
.channel_from_stellar_to_cluster_gravity
channel from star_evolution.particles to cluster_gravity
**NOT a channel from .stars to cluster_gravity
IS a channel from .stellar_evolution to cluster_gravity
.channel_from_cluster_gravity_to_cluster
channel from cluster_gravity to cluster
# util
.converter
nbody_to_si from .stars.masses.sum() to `Rcluster`
inputs: BoundArgument
BoundArgument to ``initialize_system``, which this function wraps
can be used with ``recreate_system`` to reconstruct a System.
Raises
------
TypeError
if Mcluster does not have units of Mass and is not an integer
Notes
-----
metallicity=0.0008
"""
logger.report(
f"""initialize_star_cluster_Contenta2018:
Mcluster: {Mcluster}
potential: {potential}
# object properties
Rhm: {Rhm}
position: {position}
velocity: {velocity}
vel_dir: {vel_dir}
# for evolution
evln_func: {evln_func}
# for gravity
gravity_func: {gravity_func} gravity_kwargs: {gravity_kwargs}
smoothing_length: {smoothing_length}
opening_angle: {opening_angle}
number_of_worker: {number_of_workers}
# util
timestep: {timestep}
random: {random}
# channel_attrs: {'deprecated'}
# debugging
_scale_to_standard: {_scale_to_standard}
_use_self_gravity: {_use_self_gravity}
""",
verbose=verbose,
)
# ----------------------------------------
# number_of_particles from Mcluster
try:
Mcluster.unit
except AttributeError:
# it's actually the number of stars
# which is what we really want
if not isinstance(Mcluster, int):
raise TypeError("number_of_particles (Mcluster) is not an integer")
N = Mcluster
Mcluster = None
else: # need to figure out number of stars
N, M, err = num_particles_from_mtot_given_mass_func(
to_amuse(Mcluster),
new_kroupa_mass_distribution,
tolerance=1e-7,
random=0,
imf_kwargs=dict(
mass_min=0.1 | amu.MSun, mass_max=100.0 | amu.MSun
),
)
logger.report(
f"Making cluster with {N} particles",
f"Making cluster with {N} particles, {M}, {err}",
verbose=verbose,
)
# ----------------------------------------
# figure out velocity
if velocity is None: # use circular velocity
Rg = position.length()
velocity = potential.circular_velocity(Rg) * vel_dir
logger.report(
f"Starting velocity of <{velocity}>", verbose=verbose,
)
# ----------------------------------------
# virial radius
Rvirial = Plummer_halfmass_to_virial_radius(Rhm).in_(amu.parsec)
logger.report(f"Rvirial = {Rvirial}", verbose=verbose, start_at=1)
# ----------------------------------------
# converter
if Mcluster is not None:
converter = nbody_system.nbody_to_si(Mcluster, Rvirial)
logger.report(
f"made converter: {Mcluster, Rvirial}", verbose=verbose,
)
else:
converter = None
logger.report(
f"cannot pre-make converter", verbose=verbose,
)
# ----------------------------------------
system, inputs = initialize_system(
N,
# for IMF
imf_func=new_kroupa_mass_distribution,
imf_args=[],
imf_kwargs=dict(mass_min=0.1 | amu.MSun, mass_max=100 | amu.MSun),
# for distribution function
distr_func=new_plummer_model,
distr_args=[],
distr_kwargs={},
# object properties
Rvirial=Rvirial,
position=position,
velocity=velocity,
obj_radius=0 | amu.AU,
# for stellar_evolution
evln_func=evln_func,
evln_kwargs={"metallicity": 0.0008},
# for gravity
gravity_func=gravity_func,
gravity_args=[],
gravity_kwargs=gravity_kwargs,
smoothing_length=smoothing_length, # @TODO
opening_angle=opening_angle,
number_of_workers=number_of_workers,
use_self_gravity=_use_self_gravity,
# util
converter=converter,
timestep=timestep,
random=random,
_num_particles_reconstruct=0,
# channel_attrs=channel_attrs,
# logging
logger=logger,
verbose=verbose,
# debugging
_scale_to_standard=_scale_to_standard,
# decorator
store_inputs=True, # to return inputs. Allows for fast reconstruction
)
# ----------------------------------------
# bound objects
try:
system.bound_radius_cutoff = (
3 # TODO something more robust, like 3x the virial radius
* system.particles.LagrangianRadii(
mf=[0.9], unit_converter=system.converter
)[0]
)
except:
logger.write("not calculating bound_radius_cutoff")
system.bound_radius_cutoff = np.inf | amu.pc
# ----------------------------------------
# diagnostic plot
if make_plot is not False and _PLOT:
p = system.particles
savefig = make_plot if isinstance(make_plot, str) else None
plot.plot_snapshot_vxvv(
p, timestamp=None, savefig=savefig, close=True, kind="cartesian"
)
logger.report(
"plotted Contenta (2018) initial star cluster",
f"",
sep="\n",
endsection=True,
verbose=verbose,
)
# ----------------------------------------
return system, inputs
# /def
# -------------------------------------------------------------------
def _initialize_star_cluster_unbound_Contenta2018(
bound_cluster, bound_inputs,
):
"""Initialize unbound system from Contenta (2018) star cluster.
.. todo::
use .no_PBH_in_galay._initialize_star_cluster_unbound::
.no_PBH_in_galay._initialize_star_cluster_unbound(
bound_cluster, bound_inputs,
bound_system_func=_initialize_star_cluster_bound_Contenta2018
)
Parameters
----------
bound_cluster
bound_inputs
smoothing_length: Quantity
zero_unit
Returns
-------
unbound_cluster
unbound_inputs
See Also
--------
_initialize_star_cluster_unbound
"""
return _initialize_star_cluster_unbound(
bound_cluster=bound_cluster,
bound_inputs=bound_inputs,
bound_system_func=_initialize_star_cluster_bound_Contenta2018,
init_arg=("Mcluster", 2),
)
# /def
#####################################################################
[docs]@convert_units_decorator(
to_astropy_args=["pot_mass", "pot_a"],
to_amuse_args=[
"sc_mass",
"sc_rhm",
"sc_position",
"sc_velocity",
"sc_vel_dir",
"sc_smoothing_length",
"sc_timestep",
"DF_rmin",
],
)
def make_noPBH_system(
# Potential
pot_mass: Union[apu.Quantity, Quantity] = 4.79e8 * apu.solMass,
pot_a: Union[apu.Quantity, Quantity] = 0.877 * apu.kpc,
# cluster
sc_mass: Quantity = 1.9e4 | amu.MSun,
sc_rhm: Quantity = 10 | amu.parsec,
sc_position: Quantity = [0.14, 0, 0] | amu.kpc,
sc_velocity=None,
sc_vel_dir: amu.kms = [0, 1, 0],
sc_evln_func=SSE,
sc_gravity_func=BHTree,
sc_gravity_kwargs: dict = {},
sc_smoothing_length: Quantity = 0.1 | amu.parsec,
sc_opening_angle: float = 0.6,
sc_number_of_workers: int = 8,
sc_timestep: Quantity = 25 | amu.kyr,
# dynamical friction
use_DF: bool = True,
DF_rmin: Quantity = 0.05 | amu.kpc,
# time
time=0 | amu.Myr,
# util
random=0,
logger=_LOGFILE,
verbose=None,
save: Optional[str] = None,
use_threading: bool = True,
):
"""Make Galaxy System.
DehnenSphericalPotential approximating a coreNFW potential
this should match the potential from Contenta2018
Parameters
----------
use_DF: bool
whether to use Dynamical Friction
save: bool
the folder location where to save all the initial conditions.
Returns
-------
Systems
galaxy
cluster
unbound
cdf_code
"""
if isinstance(random, (int, float)):
print('making RandomState')
random = np.random.RandomState(int(random))
###########################################################################
# Making Galaxy
logger.newsection(title="Making Galaxy", div="=")
#######################################################################
# DehnenSphericalPotential approximating a coreNFW potential
# this should match the potential from Contenta2018
logger.newsection(
title="Making Potential\n", div="-",
)
galaxy = make_Potential_Contenta2018(mass=pot_mass, a=pot_a, cored=True)
# plot
plot.plot_contenta18_fig2b(
pot_core=galaxy.gravity.pot,
# savefig="figures/contenta18_fig_2b.png",
)
logger.report(
(
"\nSmooth potential is a Dehnen Potential approximating a coreNFW."
"\nThis should match the potential from Contenta (2018)."
"\n"
),
verbose=verbose,
)
#######################################################################
# Star Cluster
# bound & unbound, so don't have to apply dynamical friction to everything
logger.newsection(
title="Making Star Cluster\n", div="-",
)
(
star_cluster,
cluster_inputs,
unbound_cluster,
unbound_inputs,
) = initialize_star_cluster_Contenta2018(
sc_mass, # mass
galaxy.gravity, # potential
# object properties
Rhm=sc_rhm,
position=sc_position,
velocity=sc_velocity,
vel_dir=sc_vel_dir,
# for evolution
evln_func=sc_evln_func,
# for gravity
gravity_func=sc_gravity_func,
gravity_kwargs=sc_gravity_kwargs,
smoothing_length=sc_smoothing_length,
opening_angle=sc_opening_angle,
number_of_workers=sc_number_of_workers,
# util
timestep=sc_timestep,
random=random,
make_unbound=True,
# channel_attrs=_SC_CHANNEL_ATTRS,
make_plot="figures/star_cluster_Contenta2018.pdf",
# logger
logger=logger,
verbose=verbose,
# debugging
_scale_to_standard=True,
_use_self_gravity=1, # True
)
# figure
if _PLOT:
plot.plot_snapshot_vxvv(
star_cluster.particles,
timestamp="0 Myr",
savefig="figures/initial_star_cluster.png",
close=True,
)
# TODO table of summary statistics
logger.report(
"",
verbose=verbose,
)
# TODO table of summary statistics
logger.report(
f"initial unbound cluster star: {len(unbound_cluster.particles)} particles",
verbose=verbose
# "![initial star cluster]" "(../figures/initial_unbound_cluster.png)",
)
#######################################################################
# Dynamical Friction
# The Chandrasekhar dynamical friction is implemented approximately
# instead of performing the expensive calculation for each star,
# the cluster is approximated by its total mass and mean position
# the dynamical friction is then applied to each star
if use_DF:
cdf_code = chandrasekhar_dynamical_friction(
GMs=star_cluster.particles.total_mass().in_(amu.MSun),
rhm=Plummer_virial_to_halfmass_radius(
star_cluster.particles.virial_radius().in_(amu.pc)
),
dens=galaxy.gravity.pot,
rmin=DF_rmin,
)
# TODO some test statistics
# TODO save CDF code
logger.report(f"made dynamical friction", verbose=True)
# TODO just for testing
separate_bound_unbound(
star_cluster,
unbound_cluster,
star_cluster.bound_radius_cutoff,
cdf_code=cdf_code,
)
# /if
#######################################################################
# Collective System
# Gravity Bridges
# stars in cluster_code depend on gravity from galaxy_code & dyn fric
# approximating that the stars do not feel the unbound stars
# 1) make bridges dict
cluster_bridge = [
"galaxy.gravity",
]
if use_DF: # add cdf to bridge
cluster_bridge.append("cdf_code")
print(cluster_bridge) # TODO delete
internal_bridges = {
"cluster.gravity": cluster_bridge,
"unbound.gravity": ["galaxy.gravity", ],
"galaxy.gravity": [],
}
# 2) make the Systems
kw = {}
if use_DF:
kw.update({"cdf_code": cdf_code})
system = Systems(
# systems
galaxy=galaxy,
cluster=star_cluster,
unbound=unbound_cluster,
**kw,
# util
use_threading=use_threading,
internal_bridges=internal_bridges,
timestep="cluster.gravity.parameters.timestep",
)
# ---------------------------------------------------
# deprecated
# if use_DF:
# system.gravity.add_system(
# system.cluster.gravity, (system.galaxy.gravity, system.cdf_code)
# )
# else:
# system.gravity.add_system(
# system.cluster.gravity, (system.galaxy.gravity,)
# )
# # the unbound stars do not have the dyn fric, for computational speed
# # approximating that the unbound stars do not feel the bound stars
# system.gravity.add_system(system.unbound.gravity, (system.galaxy.gravity,))
# # galaxy_code still needs to be added so it evolves with time
# system.gravity.add_system(system.galaxy.gravity,)
# # Set how often to update external potential
# system.gravity.timestep = system.cluster.gravity.parameters.timestep
# ---------------------------------------------------
# set current time
system.gravity.time = time
system.gravity.synchronize_model() # TODO check this sets all the times
logger.write("Made System")
if save is not None:
# pickle.dump(system, save, use_dill=True)
optional_components = []
if use_DF:
optional_components.append("cdf_code")
sim_utils.save_init_conds(
drct=save,
system=system,
star_inputs=cluster_inputs,
unbound_inputs=unbound_inputs,
optional_components=optional_components,
)
return system
# /def
#####################################################################
[docs]@convert_units_decorator(
to_amuse_args=[
"timestep",
"timestop",
"time_snap",
"dynfric_time_update",
],
)
def simulate_EriII(
system,
timestep,
timestop,
time_snap,
dynfric_time_update,
logger=_LOGFILE,
verbose=None,
):
"""Simulate Eridanus II without PBH.
.. todo::
consolidate time stuff into single argument
use ``run_system.simulate_EriII_no_PBH`` instead
Parameters
----------
system : `System`
timestep : Quantity
timestop : Quantity
time_snap : Quantity
dynfric_time_update : Quantity
logger : LogFile, optional
verbose : int, optional
"""
logger.report(
f"Starting simulation @ time {system.gravity.time}",
verbose=verbose,
startsection=True,
)
###########################################################################
# Getting System
time = system.time
use_DF = True if hasattr(system, "cdf_code") else False
logger.write("use DF = ", use_DF)
###########################################################################
# Evolve System
logger.report("Evolving System", verbose=verbose)
last_snap = time
last_dynfic_update = time
# save data
write_set_to_file(
system.cluster.particles,
f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
write_set_to_file(
system.unbound.particles,
f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
# --------------------
print(time, timestep, timestop, last_snap, last_dynfic_update)
tq = tqdm(
desc="Elapsed Simulation Time",
total=int(
(timestop - time).value_in(amu.Gyr) / timestep.value_in(amu.Gyr)
),
)
while time <= timestop:
# evolve stellar, part 1
system.cluster.evolution.evolve_model(time + timestep / 2.0)
system.cluster.evolution.channel_to.gravity.copy_attributes(["mass"])
system.cluster.evolution.channel_to.particles.copy()
# gravity setup (dynamical friction)
if use_DF:
xcm, ycm, zcm = system.cluster.particles.center_of_mass()
(
vxcm,
vycm,
vzcm,
) = system.cluster.particles.center_of_mass_velocity()
system.cdf_code.set_rv(xcm, ycm, zcm, vxcm, vycm, vzcm)
# evolve gravity
system.gravity.evolve_model(time + timestep)
system.gravity.synchronize_model()
system.cluster.gravity.channel_to.particles.copy()
system.cluster.gravity.channel_to.evolution.copy()
system.unbound.gravity.channel_to.particles.copy()
# evolve stellar, the second
system.cluster.evolution.evolve_model(time + timestep)
system.cluster.evolution.channel_to.gravity.copy_attributes(["mass"])
system.cluster.evolution.channel_to.particles.copy()
# set the time
time = system.gravity.model_time
# ---------
# dynamical friction
# every _DYNFRIC_TIME_UPDATE time steps the system is updated to:
# - stop tracking unbound particles
if use_DF:
if time - last_dynfic_update > dynfric_time_update:
# TODO replace with after test
separate_bound_unbound(
system.cluster,
system.unbound,
system.cluster.bound_radius_cutoff,
cdf_code=system.cdf_code,
)
# TODO test that cdf code is properly updating
print("GMS:", system.cluster.particles.total_mass())
print(
"rhm: ",
system.cluster.particles.LagrangianRadii(
mf=[0.5], unit_converter=system.cluster.converter
)[0],
)
last_dynfic_update = time # reset counter
# /if
# ---------
# snapshot
if time - last_snap > time_snap:
write_set_to_file(
system.cluster.particles,
f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
write_set_to_file(
system.unbound.particles,
f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
if _PLOT:
# plot
_temp = f"{time.value_in(amu.Myr):06.1f}".replace(".", "_")
plot.plot_snapshot_vxvv(
system.cluster.particles,
timestamp=f"{time.value_in(amu.Myr):06.1f} Myr",
savefig=f"figures/vxvv/snapshot_{_temp}Myr.png",
close=True,
kind="cartesian",
)
# TODO plot of whole system
# update time
last_snap = time # reset counter
# /if
# update tqdm
tq.update(n=1)
# /while
tq.close() # close progress bar
system.cluster.gravity.channel_to.particles.copy() # final copy of properties
system.gravity.stop() # ensure code stops running
# --------------------
# save final output
time = system.gravity.model_time
print(
"Making ",
f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
)
write_set_to_file(
system.cluster.particles,
f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
write_set_to_file(
system.unbound.particles,
f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat",
format="csv",
# attribute_names=list(attributes.keys()),
# attribute_types=list(attributes.values()),
)
if _PLOT:
# plot
_temp = f"{time.value_in(amu.Myr):06.1f}".replace(".", "_")
plot.plot_snapshot_vxvv(
system.cluster.particles,
timestamp=f"{time.value_in(amu.Myr):06.1f} Myr",
savefig=f"figures/vxvv/snapshot_{_temp}Myr.png",
close=True,
kind="cartesian",
)
# TODO plot of whole system
# --------------------
# figure
# plot.plot_snapshot_vxvv(
# system.cluster.particles,
# timestamp=f"{time.value_in(amu.Myr):06.1f} Myr",
# savefig="figures/final_star_cluster.png",
# close=True, kind="cartesian",
# )
# logger.write(
# ("![final star cluster]" "(../figures/final_star_cluster.png)")
# )
# --------------------
# analysis table
df = make_summary_table(
"output/snaps/cluster",
potential=system.galaxy.gravity.pot,
save=True,
overwrite=True,
)
try:
make_summary_table(
"output/snaps/unbound",
potential=system.galaxy.gravity.pot,
save=True,
overwrite=True,
)
except Exception as e:
print(e)
pass
if _PLOT:
# mtot
plot.plot_all_mtot(df)
plt.savefig("figures/mtot.png")
plt.close()
# rtidal
plot.plot_all_rtidal(df)
plt.savefig("figures/rtidal.png")
plt.close()
# rgc
plot.plot_all_rgc(df)
plt.savefig("figures/rgc.png")
plt.close()
# movies
# plot.make_xy_movie_from_snapshots(
# "output",
# "figures/Eri_no_PBH_xy_movie.mp4",
# fps=20,
# title=f"movie: xy ({__date__})",
# # comment='',
# overwrite=True,
# )
# plt.close()
# plot.make_rlim_movie_from_snapshots(
# "output",
# "figures/Eri_no_PBH_rlim_movie.mp4",
# fps=20,
# title=f"movie: xy ({__date__})",
# # comment='',
# overwrite=True,
# )
# plt.close()
return
# /def
##############################################################################
# END
# /def