Source code for eridanus_pbh.scripts.Contenta18.Contenta18

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Functions to make star clusters."""

__author__ = "Nathaniel Starkman"


__all__ = [
    "make_Potential_Contenta2018",
    "initialize_star_cluster_Contenta2018",
    "make_noPBH_system",
    "simulate_EriII",
]


##############################################################################
# IMPORTS

# BUILT-IN

from typing import Any, Union, Optional, Callable
from typing_extensions import Literal


# THIRD PARTY

import numpy as np
from tqdm import tqdm

import matplotlib.pyplot as plt

# amuse
from amuse.ic.plummer import new_plummer_model
from amuse.io import write_set_to_file
from amuse.units import nbody_system
from amuse.units.quantities import Quantity
from amuse.community.bhtree.interface import BHTree
from amuse.community.sse.interface import SSE


# CUSTOM

# utilipy
from utilipy import LogFile
from utilipy.decorators import store_function_input

# amuse_util
from amuse_util import initialize_system, Systems
from amuse_util.units import (
    amuse_units as amu,
    astropy_units as apu,
    to_astropy_decorator,
    to_amuse,
    to_amuse_decorator,
    convert_units_decorator,
)
from amuse_util.ic import num_particles_from_mtot_given_mass_func
from amuse_util.ic.brokenimf import new_kroupa_mass_distribution
from amuse_util.ic.star_cluster import (
    _initialize_star_cluster_unbound,
    separate_bound_unbound,
    SC_Tuple,
)


# PROJECT-SPECIFIC

from ... import plot
from ...analyze import make_summary_table
from ...potential import (
    chandrasekhar_dynamical_friction,
    DehnenSphericalPotential,
)
from ...utils import sim_utils
from ...utils.convert import (
    Plummer_halfmass_to_virial_radius,
    Plummer_virial_to_halfmass_radius,
)


##############################################################################
# PARAMETERS

_PLOT = True
_LOGFILE = LogFile(verbose=np.inf, header=False)


##############################################################################
# CODE
##############################################################################


[docs]@to_astropy_decorator(arguments=["mass", "a"]) def make_Potential_Contenta2018( mass: Union[apu.Quantity, Quantity] = 4.79e8 * apu.solMass, a: Union[apu.Quantity, Quantity] = 0.877 * apu.kpc, cored: bool = True, ): """Make Eridanus II Potential from Contenta 2018. Parameters ---------- mass: apu.Quantity or amu.Quantity, optional a: apu.Quantity or amu.Quantity, optional scale length cored: bool whether to create a cored (default) or cusped profile corresponds to `alpha`=0 vs `1` Returns ------- System with ``.gravity`` attribute """ if cored: alpha = 0.0 else: alpha = 1.0 return DehnenSphericalPotential( mass=mass, a=a, alpha=alpha, to_system=True )
# /def ##################################################################### # initialize_star_cluster_Contenta2018
[docs]@to_amuse_decorator( arguments=["Rhm", "position", "velocity", "smoothing_length", "timestep"] ) def initialize_star_cluster_Contenta2018( Mcluster: Union[int, amu.MSun], potential: Any, # object properties Rhm: amu.parsec = 10 | amu.parsec, position: amu.kpc = [0.14, 0, 0] | amu.kpc, velocity=None, vel_dir: amu.kms = [0, 1, 0], # for evolution evln_func=SSE, # for gravity gravity_func=BHTree, gravity_kwargs={}, smoothing_length: amu.parsec = 0.1 | amu.parsec, opening_angle: float = 0.6, number_of_workers=8, # util timestep: amu.Myr = 25 | amu.kyr, random=np.random.RandomState(0), make_unbound=True, # channel_attrs=["mass", "x", "y", "z", "vx", "vy", "vz"], make_plot=False, # logging logger=_LOGFILE, verbose=None, # debugging _scale_to_standard: bool = True, _use_self_gravity: Union[Literal[0], Literal[1]] = 1, ): """Initialize Contenta (2018) star cluster. Parameters ---------- Mcluster: amuse units.MSun mass of cluster if unitless, assumed to be number_of_particles the number of particlues is derived using `~num_particles_from_mtot_given_mass_func` potential: galpy amuse potential potential of galaxy about which cluster orbits used for calculating a circular orbital velocity (Contenta 2018) Rhm: amuse units.parsec half-mass radius pos: list amuse units.kpc Galactocentric position position: amuse distance 3-list quantity, optional (default [0,0,0] pc) the position of the system in GC coordinates velocity: amuse velocity quantity, optional (default None) the velocity magnitude of the system in GC coordinates None will calculate a circular velocity (magnitude) at the `position` vel_dir: 3-list velocity unit direction evln_func: function, optional (default SeBa) object evolution function signature of function should be:: func(metallicity, **evln_kwargs) the metallicity is 0.0008 smoothing_length: amuse distance quantity, optional (default 0 pc) the smoothing length used in scaling and the gravity opening_angle: float, optional (default 0.6) number_of_workers: int, optional (default 1) number of gravity workers timestep: amuse time quantity, optional the timestep for evolving the gravity and stellar evolution default, 0.1 Gyr random: True or random number generator, optional (default True) ex: np.random.default_rng(seed=0) will default to random seed include_unbound : bool whether to also make the unbound cluster (default True) Returns ------- bound_system: System bound_inputs: BoundArgument BoundArgument to ``initialize_system``, which this function wraps can be used with ``recreate_system`` to reconstruct a System. unbound_system : System if make_unbound is True unbound_inputs : BoundArgument if make_unbound is True Raises ------ TypeError if Mcluster does not have units of Mass and is not an integer Notes ----- metallicity=0.0008 """ logger.record("Initialize star cluster from Contenta2018") # call helper function _initialize_star_cluster_bound_Contenta2018 # returns system, inputs to ``initialize_system``, # and inputs to _initialize_star_cluster_bound_Contenta2018 (system, inputs), _inputs = _initialize_star_cluster_bound_Contenta2018( Mcluster=Mcluster, potential=potential, # object properties Rhm=Rhm, position=position, velocity=velocity, vel_dir=vel_dir, # for evolution evln_func=SSE, # for gravity gravity_func=BHTree, gravity_kwargs={}, smoothing_length=smoothing_length, opening_angle=opening_angle, number_of_workers=number_of_workers, # util timestep=timestep, random=random, # channel_attrs=channel_attrs, make_plot=make_plot, # logging logger=logger, verbose=verbose, # debugging _scale_to_standard=_scale_to_standard, _use_self_gravity=_use_self_gravity, ) if make_unbound: logger.record("\n\nFind unbound stars from star cluster.") ( unbound_system, unbound_inputs, ) = _initialize_star_cluster_unbound_Contenta2018(system, _inputs,) else: unbound_system, unbound_inputs = None, None return SC_Tuple( bound_system=system, bound_inputs=inputs, unbound_system=unbound_system, unbound_inputs=unbound_inputs, )
# /def # ------------------------------------------------------------------- @store_function_input(store_inputs=True) def _initialize_star_cluster_bound_Contenta2018( Mcluster: (int, amu.MSun), potential: Any, # object properties Rhm: amu.parsec, position: amu.kpc, velocity, vel_dir: amu.kms, # for evolution evln_func: Callable, # for gravity gravity_func: Callable, gravity_kwargs: dict, smoothing_length: amu.parsec, opening_angle: float, number_of_workers: int, # util timestep: amu.Myr, random, make_plot: bool, # logging logger=_LOGFILE, verbose=None, # debugging _scale_to_standard: bool = True, _use_self_gravity: {0, 1} = 1, ): """Initialize Contenta (2018) star cluster. Parameters ---------- Mcluster: amuse units.MSun mass of cluster if unitless, assumed to be number_of_particles the number of particlues is derived using number_of_stars_from_total_mass_given_mass_function potential: galpy amuse potential potential of galaxy about which cluster orbits used for calculating a circular orbital velocity (Contenta 2018) Rhm: amuse units.parsec half-mass radius pos: list amuse units.kpc Galactocentric position position: amuse distance 3-list quantity, optional (default [0,0,0] pc) the position of the system in GC coordinates velocity: amuse velocity quantity, optional (default None) the velocity magnitude of the system in GC coordinates None will calculate a circular velocity (magnitude) at the `position` vel_dir: 3-list velocity unit direction evln_func: function, optional (default SeBa) object evolution function signature of function should be func(`metallicity`, **`evln_kwargs`) the metallicity is 0.0008 smoothing_length: amuse distance quantity, optional (default 0 pc) the smoothing length used in scaling and the gravity opening_angle: float, optional (default 0.6) number_of_workers: int, optional (default 1) number of gravity workers timestep: amuse time quantity, optional (default .1 Gyr) the timestep for evolving the gravity and stellar evolution random: True or random number generator, optional (default True) ex: np.random.default_rng(seed=0) will default to random seed # channel_attrs: list or None, optional # default ['mass', 'x', 'y', 'z', 'vx', 'vy', 'vz'] Returns ------- system: class # cluster .cluster .cluster_gravity # stars .stellar SSE with `metallicity` has the particles .star_distribution has channel .channel_from_stellar_to_cluster_gravity .star_distribution plummer_model w/ number_of_particles does NOT have any channels # channels .channel_from_stellar_to_cluster_gravity channel from star_evolution.particles to cluster_gravity **NOT a channel from .stars to cluster_gravity IS a channel from .stellar_evolution to cluster_gravity .channel_from_cluster_gravity_to_cluster channel from cluster_gravity to cluster # util .converter nbody_to_si from .stars.masses.sum() to `Rcluster` inputs: BoundArgument BoundArgument to ``initialize_system``, which this function wraps can be used with ``recreate_system`` to reconstruct a System. Raises ------ TypeError if Mcluster does not have units of Mass and is not an integer Notes ----- metallicity=0.0008 """ logger.report( f"""initialize_star_cluster_Contenta2018: Mcluster: {Mcluster} potential: {potential} # object properties Rhm: {Rhm} position: {position} velocity: {velocity} vel_dir: {vel_dir} # for evolution evln_func: {evln_func} # for gravity gravity_func: {gravity_func} gravity_kwargs: {gravity_kwargs} smoothing_length: {smoothing_length} opening_angle: {opening_angle} number_of_worker: {number_of_workers} # util timestep: {timestep} random: {random} # channel_attrs: {'deprecated'} # debugging _scale_to_standard: {_scale_to_standard} _use_self_gravity: {_use_self_gravity} """, verbose=verbose, ) # ---------------------------------------- # number_of_particles from Mcluster try: Mcluster.unit except AttributeError: # it's actually the number of stars # which is what we really want if not isinstance(Mcluster, int): raise TypeError("number_of_particles (Mcluster) is not an integer") N = Mcluster Mcluster = None else: # need to figure out number of stars N, M, err = num_particles_from_mtot_given_mass_func( to_amuse(Mcluster), new_kroupa_mass_distribution, tolerance=1e-7, random=0, imf_kwargs=dict( mass_min=0.1 | amu.MSun, mass_max=100.0 | amu.MSun ), ) logger.report( f"Making cluster with {N} particles", f"Making cluster with {N} particles, {M}, {err}", verbose=verbose, ) # ---------------------------------------- # figure out velocity if velocity is None: # use circular velocity Rg = position.length() velocity = potential.circular_velocity(Rg) * vel_dir logger.report( f"Starting velocity of <{velocity}>", verbose=verbose, ) # ---------------------------------------- # virial radius Rvirial = Plummer_halfmass_to_virial_radius(Rhm).in_(amu.parsec) logger.report(f"Rvirial = {Rvirial}", verbose=verbose, start_at=1) # ---------------------------------------- # converter if Mcluster is not None: converter = nbody_system.nbody_to_si(Mcluster, Rvirial) logger.report( f"made converter: {Mcluster, Rvirial}", verbose=verbose, ) else: converter = None logger.report( f"cannot pre-make converter", verbose=verbose, ) # ---------------------------------------- system, inputs = initialize_system( N, # for IMF imf_func=new_kroupa_mass_distribution, imf_args=[], imf_kwargs=dict(mass_min=0.1 | amu.MSun, mass_max=100 | amu.MSun), # for distribution function distr_func=new_plummer_model, distr_args=[], distr_kwargs={}, # object properties Rvirial=Rvirial, position=position, velocity=velocity, obj_radius=0 | amu.AU, # for stellar_evolution evln_func=evln_func, evln_kwargs={"metallicity": 0.0008}, # for gravity gravity_func=gravity_func, gravity_args=[], gravity_kwargs=gravity_kwargs, smoothing_length=smoothing_length, # @TODO opening_angle=opening_angle, number_of_workers=number_of_workers, use_self_gravity=_use_self_gravity, # util converter=converter, timestep=timestep, random=random, _num_particles_reconstruct=0, # channel_attrs=channel_attrs, # logging logger=logger, verbose=verbose, # debugging _scale_to_standard=_scale_to_standard, # decorator store_inputs=True, # to return inputs. Allows for fast reconstruction ) # ---------------------------------------- # bound objects try: system.bound_radius_cutoff = ( 3 # TODO something more robust, like 3x the virial radius * system.particles.LagrangianRadii( mf=[0.9], unit_converter=system.converter )[0] ) except: logger.write("not calculating bound_radius_cutoff") system.bound_radius_cutoff = np.inf | amu.pc # ---------------------------------------- # diagnostic plot if make_plot is not False and _PLOT: p = system.particles savefig = make_plot if isinstance(make_plot, str) else None plot.plot_snapshot_vxvv( p, timestamp=None, savefig=savefig, close=True, kind="cartesian" ) logger.report( "plotted Contenta (2018) initial star cluster", f"![Contenta (2018) initial star cluster]({savefig})", sep="\n", endsection=True, verbose=verbose, ) # ---------------------------------------- return system, inputs # /def # ------------------------------------------------------------------- def _initialize_star_cluster_unbound_Contenta2018( bound_cluster, bound_inputs, ): """Initialize unbound system from Contenta (2018) star cluster. .. todo:: use .no_PBH_in_galay._initialize_star_cluster_unbound:: .no_PBH_in_galay._initialize_star_cluster_unbound( bound_cluster, bound_inputs, bound_system_func=_initialize_star_cluster_bound_Contenta2018 ) Parameters ---------- bound_cluster bound_inputs smoothing_length: Quantity zero_unit Returns ------- unbound_cluster unbound_inputs See Also -------- _initialize_star_cluster_unbound """ return _initialize_star_cluster_unbound( bound_cluster=bound_cluster, bound_inputs=bound_inputs, bound_system_func=_initialize_star_cluster_bound_Contenta2018, init_arg=("Mcluster", 2), ) # /def #####################################################################
[docs]@convert_units_decorator( to_astropy_args=["pot_mass", "pot_a"], to_amuse_args=[ "sc_mass", "sc_rhm", "sc_position", "sc_velocity", "sc_vel_dir", "sc_smoothing_length", "sc_timestep", "DF_rmin", ], ) def make_noPBH_system( # Potential pot_mass: Union[apu.Quantity, Quantity] = 4.79e8 * apu.solMass, pot_a: Union[apu.Quantity, Quantity] = 0.877 * apu.kpc, # cluster sc_mass: Quantity = 1.9e4 | amu.MSun, sc_rhm: Quantity = 10 | amu.parsec, sc_position: Quantity = [0.14, 0, 0] | amu.kpc, sc_velocity=None, sc_vel_dir: amu.kms = [0, 1, 0], sc_evln_func=SSE, sc_gravity_func=BHTree, sc_gravity_kwargs: dict = {}, sc_smoothing_length: Quantity = 0.1 | amu.parsec, sc_opening_angle: float = 0.6, sc_number_of_workers: int = 8, sc_timestep: Quantity = 25 | amu.kyr, # dynamical friction use_DF: bool = True, DF_rmin: Quantity = 0.05 | amu.kpc, # time time=0 | amu.Myr, # util random=0, logger=_LOGFILE, verbose=None, save: Optional[str] = None, use_threading: bool = True, ): """Make Galaxy System. DehnenSphericalPotential approximating a coreNFW potential this should match the potential from Contenta2018 Parameters ---------- use_DF: bool whether to use Dynamical Friction save: bool the folder location where to save all the initial conditions. Returns ------- Systems galaxy cluster unbound cdf_code """ if isinstance(random, (int, float)): print('making RandomState') random = np.random.RandomState(int(random)) ########################################################################### # Making Galaxy logger.newsection(title="Making Galaxy", div="=") ####################################################################### # DehnenSphericalPotential approximating a coreNFW potential # this should match the potential from Contenta2018 logger.newsection( title="Making Potential\n", div="-", ) galaxy = make_Potential_Contenta2018(mass=pot_mass, a=pot_a, cored=True) # plot plot.plot_contenta18_fig2b( pot_core=galaxy.gravity.pot, # savefig="figures/contenta18_fig_2b.png", ) logger.report( ( "\nSmooth potential is a Dehnen Potential approximating a coreNFW." "\nThis should match the potential from Contenta (2018)." "\n![contenta18_fig_2b](figures/contenta18_fig_2b.png)" ), verbose=verbose, ) ####################################################################### # Star Cluster # bound & unbound, so don't have to apply dynamical friction to everything logger.newsection( title="Making Star Cluster\n", div="-", ) ( star_cluster, cluster_inputs, unbound_cluster, unbound_inputs, ) = initialize_star_cluster_Contenta2018( sc_mass, # mass galaxy.gravity, # potential # object properties Rhm=sc_rhm, position=sc_position, velocity=sc_velocity, vel_dir=sc_vel_dir, # for evolution evln_func=sc_evln_func, # for gravity gravity_func=sc_gravity_func, gravity_kwargs=sc_gravity_kwargs, smoothing_length=sc_smoothing_length, opening_angle=sc_opening_angle, number_of_workers=sc_number_of_workers, # util timestep=sc_timestep, random=random, make_unbound=True, # channel_attrs=_SC_CHANNEL_ATTRS, make_plot="figures/star_cluster_Contenta2018.pdf", # logger logger=logger, verbose=verbose, # debugging _scale_to_standard=True, _use_self_gravity=1, # True ) # figure if _PLOT: plot.plot_snapshot_vxvv( star_cluster.particles, timestamp="0 Myr", savefig="figures/initial_star_cluster.png", close=True, ) # TODO table of summary statistics logger.report( "![initial star cluster](../figures/initial_star_cluster.png)", verbose=verbose, ) # TODO table of summary statistics logger.report( f"initial unbound cluster star: {len(unbound_cluster.particles)} particles", verbose=verbose # "![initial star cluster]" "(../figures/initial_unbound_cluster.png)", ) ####################################################################### # Dynamical Friction # The Chandrasekhar dynamical friction is implemented approximately # instead of performing the expensive calculation for each star, # the cluster is approximated by its total mass and mean position # the dynamical friction is then applied to each star if use_DF: cdf_code = chandrasekhar_dynamical_friction( GMs=star_cluster.particles.total_mass().in_(amu.MSun), rhm=Plummer_virial_to_halfmass_radius( star_cluster.particles.virial_radius().in_(amu.pc) ), dens=galaxy.gravity.pot, rmin=DF_rmin, ) # TODO some test statistics # TODO save CDF code logger.report(f"made dynamical friction", verbose=True) # TODO just for testing separate_bound_unbound( star_cluster, unbound_cluster, star_cluster.bound_radius_cutoff, cdf_code=cdf_code, ) # /if ####################################################################### # Collective System # Gravity Bridges # stars in cluster_code depend on gravity from galaxy_code & dyn fric # approximating that the stars do not feel the unbound stars # 1) make bridges dict cluster_bridge = [ "galaxy.gravity", ] if use_DF: # add cdf to bridge cluster_bridge.append("cdf_code") print(cluster_bridge) # TODO delete internal_bridges = { "cluster.gravity": cluster_bridge, "unbound.gravity": ["galaxy.gravity", ], "galaxy.gravity": [], } # 2) make the Systems kw = {} if use_DF: kw.update({"cdf_code": cdf_code}) system = Systems( # systems galaxy=galaxy, cluster=star_cluster, unbound=unbound_cluster, **kw, # util use_threading=use_threading, internal_bridges=internal_bridges, timestep="cluster.gravity.parameters.timestep", ) # --------------------------------------------------- # deprecated # if use_DF: # system.gravity.add_system( # system.cluster.gravity, (system.galaxy.gravity, system.cdf_code) # ) # else: # system.gravity.add_system( # system.cluster.gravity, (system.galaxy.gravity,) # ) # # the unbound stars do not have the dyn fric, for computational speed # # approximating that the unbound stars do not feel the bound stars # system.gravity.add_system(system.unbound.gravity, (system.galaxy.gravity,)) # # galaxy_code still needs to be added so it evolves with time # system.gravity.add_system(system.galaxy.gravity,) # # Set how often to update external potential # system.gravity.timestep = system.cluster.gravity.parameters.timestep # --------------------------------------------------- # set current time system.gravity.time = time system.gravity.synchronize_model() # TODO check this sets all the times logger.write("Made System") if save is not None: # pickle.dump(system, save, use_dill=True) optional_components = [] if use_DF: optional_components.append("cdf_code") sim_utils.save_init_conds( drct=save, system=system, star_inputs=cluster_inputs, unbound_inputs=unbound_inputs, optional_components=optional_components, ) return system
# /def #####################################################################
[docs]@convert_units_decorator( to_amuse_args=[ "timestep", "timestop", "time_snap", "dynfric_time_update", ], ) def simulate_EriII( system, timestep, timestop, time_snap, dynfric_time_update, logger=_LOGFILE, verbose=None, ): """Simulate Eridanus II without PBH. .. todo:: consolidate time stuff into single argument use ``run_system.simulate_EriII_no_PBH`` instead Parameters ---------- system : `System` timestep : Quantity timestop : Quantity time_snap : Quantity dynfric_time_update : Quantity logger : LogFile, optional verbose : int, optional """ logger.report( f"Starting simulation @ time {system.gravity.time}", verbose=verbose, startsection=True, ) ########################################################################### # Getting System time = system.time use_DF = True if hasattr(system, "cdf_code") else False logger.write("use DF = ", use_DF) ########################################################################### # Evolve System logger.report("Evolving System", verbose=verbose) last_snap = time last_dynfic_update = time # save data write_set_to_file( system.cluster.particles, f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) write_set_to_file( system.unbound.particles, f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) # -------------------- print(time, timestep, timestop, last_snap, last_dynfic_update) tq = tqdm( desc="Elapsed Simulation Time", total=int( (timestop - time).value_in(amu.Gyr) / timestep.value_in(amu.Gyr) ), ) while time <= timestop: # evolve stellar, part 1 system.cluster.evolution.evolve_model(time + timestep / 2.0) system.cluster.evolution.channel_to.gravity.copy_attributes(["mass"]) system.cluster.evolution.channel_to.particles.copy() # gravity setup (dynamical friction) if use_DF: xcm, ycm, zcm = system.cluster.particles.center_of_mass() ( vxcm, vycm, vzcm, ) = system.cluster.particles.center_of_mass_velocity() system.cdf_code.set_rv(xcm, ycm, zcm, vxcm, vycm, vzcm) # evolve gravity system.gravity.evolve_model(time + timestep) system.gravity.synchronize_model() system.cluster.gravity.channel_to.particles.copy() system.cluster.gravity.channel_to.evolution.copy() system.unbound.gravity.channel_to.particles.copy() # evolve stellar, the second system.cluster.evolution.evolve_model(time + timestep) system.cluster.evolution.channel_to.gravity.copy_attributes(["mass"]) system.cluster.evolution.channel_to.particles.copy() # set the time time = system.gravity.model_time # --------- # dynamical friction # every _DYNFRIC_TIME_UPDATE time steps the system is updated to: # - stop tracking unbound particles if use_DF: if time - last_dynfic_update > dynfric_time_update: # TODO replace with after test separate_bound_unbound( system.cluster, system.unbound, system.cluster.bound_radius_cutoff, cdf_code=system.cdf_code, ) # TODO test that cdf code is properly updating print("GMS:", system.cluster.particles.total_mass()) print( "rhm: ", system.cluster.particles.LagrangianRadii( mf=[0.5], unit_converter=system.cluster.converter )[0], ) last_dynfic_update = time # reset counter # /if # --------- # snapshot if time - last_snap > time_snap: write_set_to_file( system.cluster.particles, f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) write_set_to_file( system.unbound.particles, f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) if _PLOT: # plot _temp = f"{time.value_in(amu.Myr):06.1f}".replace(".", "_") plot.plot_snapshot_vxvv( system.cluster.particles, timestamp=f"{time.value_in(amu.Myr):06.1f} Myr", savefig=f"figures/vxvv/snapshot_{_temp}Myr.png", close=True, kind="cartesian", ) # TODO plot of whole system # update time last_snap = time # reset counter # /if # update tqdm tq.update(n=1) # /while tq.close() # close progress bar system.cluster.gravity.channel_to.particles.copy() # final copy of properties system.gravity.stop() # ensure code stops running # -------------------- # save final output time = system.gravity.model_time print( "Making ", f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", ) write_set_to_file( system.cluster.particles, f"output/snaps/cluster/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) write_set_to_file( system.unbound.particles, f"output/snaps/unbound/snapshot_{time.value_in(amu.Myr):06.1f}Myr.dat", format="csv", # attribute_names=list(attributes.keys()), # attribute_types=list(attributes.values()), ) if _PLOT: # plot _temp = f"{time.value_in(amu.Myr):06.1f}".replace(".", "_") plot.plot_snapshot_vxvv( system.cluster.particles, timestamp=f"{time.value_in(amu.Myr):06.1f} Myr", savefig=f"figures/vxvv/snapshot_{_temp}Myr.png", close=True, kind="cartesian", ) # TODO plot of whole system # -------------------- # figure # plot.plot_snapshot_vxvv( # system.cluster.particles, # timestamp=f"{time.value_in(amu.Myr):06.1f} Myr", # savefig="figures/final_star_cluster.png", # close=True, kind="cartesian", # ) # logger.write( # ("![final star cluster]" "(../figures/final_star_cluster.png)") # ) # -------------------- # analysis table df = make_summary_table( "output/snaps/cluster", potential=system.galaxy.gravity.pot, save=True, overwrite=True, ) try: make_summary_table( "output/snaps/unbound", potential=system.galaxy.gravity.pot, save=True, overwrite=True, ) except Exception as e: print(e) pass if _PLOT: # mtot plot.plot_all_mtot(df) plt.savefig("figures/mtot.png") plt.close() # rtidal plot.plot_all_rtidal(df) plt.savefig("figures/rtidal.png") plt.close() # rgc plot.plot_all_rgc(df) plt.savefig("figures/rgc.png") plt.close() # movies # plot.make_xy_movie_from_snapshots( # "output", # "figures/Eri_no_PBH_xy_movie.mp4", # fps=20, # title=f"movie: xy ({__date__})", # # comment='', # overwrite=True, # ) # plt.close() # plot.make_rlim_movie_from_snapshots( # "output", # "figures/Eri_no_PBH_rlim_movie.mp4", # fps=20, # title=f"movie: xy ({__date__})", # # comment='', # overwrite=True, # ) # plt.close() return
# /def ############################################################################## # END # /def