Source code for pymgpipe.nmpc

import os
import tqdm
import pandas as pd
import optlang
import time
from collections import namedtuple
from pathlib import Path
from .fva import FVA_TYPE, fva
from .utils import load_dataframe, load_model, set_objective, Constants
from .io import suppress_stdout
from .logger import logger
import cobra
import optlang

[docs] def compute_nmpcs( samples, out_dir="./", out_file="nmpcs.csv", objective_out_file="community_objectives.csv", fluxes_out_file="all_fluxes.csv", reactions=None, regex=None, ex_only=True, solver="gurobi", threads=int(os.cpu_count() / 2), parallel=True, diet_fecal_compartments=True, force=False, threshold=1e-5, write_to_file=True, fva_type=FVA_TYPE.REGULAR, objective_percent=100, scaling=0, mem_aff="none", schedule="dynamic", signed=False ): """Compute NMPCs as well as associated reaction metrics on specified list (or directory) of samples This function is computes NMPCs at the level of the optlang LP problem to leverage speed of low-level representation. Available FVA types are `regular` and `fast`. `Fast` FVA is significantly faster, but requires both a CPLEX license and full install of the VFFVA package. Additionally, all problems need to be in .mps format if using `fast` FVA type. For more information, see VFFVA package- https://github.com/marouenbg/VFFVA. NMPCs are calculated as the max fluxes secreted by the fecal compartment (positive values only) summed with the min flux uptaken by the diet compartment (negative values only). In cases where models are not build with diet/fecal compartments, NMPCs are simply calculated as the sum of the max and min fluxes. Args: samples (list | str): List of samples or directory containing samples out_dir (str): Directory to output results out_file (str): Name of file containing final NMPCs objective_out_file (str): Name of file containing community objectives fluxes_out_file (str): Name of file containing fluxes reactions (list): List of reactions to run NMPCs on regex (str): Regex match for list of reactions to run NMPCs on diet_fecal_compartments (bool): Whether or not models are built with diet/fecal compartmentalization ex_only (bool): Compute NMPCs on exchange reactions only fva_type (str): FVA type used to compute NMPCs, allowed values are `fast` and `regular` solver (str): LP solver used to compute NMPCs, allowed values are `gurobi` and `cplex` obj_optimality (float): Percent of optimal objective value constrained during NMPC computation threshold (float): Fluxes below threshold will be set to 0 write_to_file (bool): Write results to file Notes: If computation is cut short prematurely, this function will pick up where it left off based on which samples are already present in `out_file`. """ start = time.time() out_dir = out_dir + "/" if out_dir[-1] != "/" else out_dir Path(out_dir).mkdir(exist_ok=True) out_file = out_dir + out_file objective_out_file = out_dir + objective_out_file fluxes_out_file = out_dir + fluxes_out_file nmpcs = ( pd.DataFrame() if force or not write_to_file else load_dataframe(out_file, return_empty=True) ) all_fluxes = ( pd.DataFrame() if force or not write_to_file else load_dataframe(fluxes_out_file, return_empty=True) ) obj_values = ( pd.DataFrame() if force or not write_to_file else load_dataframe(objective_out_file, return_empty=True) ) obj_values["communityBiomass"] = ( None if obj_values.empty else obj_values[obj_values.columns[0]] ) if isinstance(samples, str) and os.path.isdir(samples): models = [ os.path.dirname(samples) + "/" + m for m in os.listdir(os.path.dirname(samples)) ] elif isinstance(samples, list): models = samples else: models = [samples] models = [ f for f in models if force or ( f.split("/")[-1].split(".")[0] not in list(nmpcs.columns) if isinstance(f, str) else f.name not in list(nmpcs.columns) ) ] print("Computing NMPCs on %s models using %s..." % (len(models), str(fva_type))) for m in tqdm.tqdm(models, total=len(models)): m_name = m.split("/")[-1].split(".")[0] if isinstance(m, str) else m.name try: res = fva( m, solver=solver, fva_type=fva_type, reactions=reactions, regex=regex, ex_only=ex_only, threads=threads, parallel=parallel, write_to_file=False, threshold=threshold, objective_percent=objective_percent, scaling=scaling, mem_aff=mem_aff, schedule=schedule, ) except Exception as e: logger.warning(f"Cannot solve {m_name} model!\n{e}") continue if res is None: return res["sample_id"] = m_name all_fluxes = pd.concat([all_fluxes, res], axis=0) if diet_fecal_compartments: metabs = [ m for m in res.index.str.split("[").str[0].drop_duplicates() if not m.startswith("Diet") ] df = {} for metab in metabs: fe = res.loc[metab + "[fe]"]["max"] d = res.loc["Diet_" + metab + "[d]"]["min"] df[metab.split("EX_")[1]] = d + fe nmpc = pd.DataFrame({m_name: df}) else: nmpc = res["min"] + res["max"] nmpc.name = m_name nmpcs = pd.concat([nmpcs, nmpc], axis=1).fillna(0) if not signed: nmpcs = abs(nmpcs) if write_to_file: nmpcs.to_csv(out_file) obj_values.to_csv(objective_out_file) all_fluxes.to_csv(fluxes_out_file) res = namedtuple("res", "nmpc objectives fluxes") print("-------------------------------------------------------") print("Finished computing NMPCs!") print("Process took %s minutes to run..." % round((time.time() - start) / 60, 3)) return res(nmpcs, obj_values, all_fluxes)