Source code for pymgpipe.fva

import os
import gc
import sys
import numpy as np
import pandas as pd
from multiprocessing import Pool
from functools import partial
from optlang.interface import Objective
from pathlib import Path
from .utils import (
    load_model,
    Constants,
    get_reactions,
    _get_reverse_id,
    solve_model,
    load_dataframe,
)
from .io import suppress_stdout
from .logger import logger
from enum import Enum

[docs] class FVA_TYPE(Enum): REGULAR = 1 FAST = 2
[docs] class VFFVA(object): def __init__(self): self._path = None @property def path(self): return self._path @path.setter def path(self, value): self._path = value
vffva_config = VFFVA()
[docs] def fva( model=None, fva_type=FVA_TYPE.REGULAR, reactions=None, regex=None, ex_only=True, solver="gurobi", threads=int(os.cpu_count() / 2), write_to_file=False, out_file=None, parallel=True, threshold=1e-5, scaling=0, mem_aff="none", schedule="dynamic", objective_percent=None, force=False, ): """Run Flux Variability Analysis (FVA) on target reactions Available FVA types are `regular` and `fast`. `Fast` FVA is significantly faster, but requires both a CPLEX license and full install of the VFFVA package. Additionally, all problems need to be in .mps format if using `fast` FVA type. For more information, see VFFVA package- https://github.com/marouenbg/VFFVA. Args: model (str | optlang.Model): model you want to run FVA on fva_type (str): FVA type used to compute min/max values, allowed values are `fast` and `regular` reactions (list): List of reactions you want to target regex (str): Regex match for list of reactions you want to target ex_only (bool): Run FVA on exchange reactions only solver (str): LP solver used, allowed values are `gurobi` and `cplex` threads (float): How many threads to use to run FVA write_to_file (bool): Setting this to True will save results to `out_file` out_file (str): Name of file you want to save results (should have .csv extension) parallel (bool): If set to True, will use number of threads specified by `threads` parameter scaling (float): VFFVA specific parameter, see VFFVA package- https://github.com/marouenbg/VFFVA. mem_aff (str): VFFVA specific parameter, see VFFVA package- https://github.com/marouenbg/VFFVA. schedule (str): VFFVA specific parameter, see VFFVA package- https://github.com/marouenbg/VFFVA. objective_percent (float): Takes value between 0-100. If not set to None, will compute objective and constrain to specified percentage of maximum value before running FVA force (bool): Will compute FVA and overwrite existing file (if file is found with target reactions) Notes: If computation is cut short prematurely, this function will pick up where it left off based on which reactions are already present in `out_file`. """ gc.enable() if fva_type == FVA_TYPE.FAST: assert isinstance( model, str ), "For fast FVA, `model` needs to be passed in as path to .mps file" assert os.path.isdir(vffva_config.path), 'Could not find VFFVA folder at %s'%vffva_config.path path = model model = load_model(path=model, solver=solver) with suppress_stdout(): model.optimize() if model.status == "infeasible": raise Exception("%s model is infeasible!" % model.name) if reactions is None and regex is None and ex_only is True: regex = Constants.EX_REGEX reactions_to_run = [r.name for r in get_reactions(model, reactions, regex)] out_df = pd.DataFrame() if write_to_file: out_file = out_file if out_file is not None else "%s_fva.csv" % model.name out_df = load_dataframe(out_file, return_empty=True) if not force else pd.DataFrame() metabs_to_skip = list(out_df.index) if len(metabs_to_skip) > 0: print("\nFound existing file, skipping %s metabolites..." % len(metabs_to_skip)) reactions_to_run = [r for r in reactions_to_run if r not in metabs_to_skip] if len(reactions_to_run) == 0: print("---Finished! No reactions left to run---") return if parallel is False: threads = 1 parallel = False else: threads = ( os.cpu_count() if threads == -1 or threads > os.cpu_count() else threads ) threads = min(threads, len(reactions_to_run)) parallel = False if threads <= 1 else parallel split_reactions = np.array_split(reactions_to_run, threads) logger.info( "Starting parallel FVA on %s reactions using %s chunks on %s threads...\n" % (len(reactions_to_run), threads, len(split_reactions)) ) if fva_type == FVA_TYPE.REGULAR: # VFFVA does this automatically if objective_percent is not None: print(f'Constraining objective value to {objective_percent}% of optimal value...') prev_bounds = {} primals = {v:v.primal for v in model.objective.expression.free_symbols} for v, value in primals.items(): prev_bounds[v.name]=(v.lb,v.ub) v.lb = float(value) * (objective_percent/100) v.ub = float(value) print(v) model.update() _func = partial(_optlang_worker, threshold) if parallel: p = Pool(processes=threads, initializer=partial(_pool_init, model)) res = p.imap(_func, split_reactions) else: _pool_init(model) res = map(_func, split_reactions) for result in res: result_df = pd.DataFrame.from_records(result, index="id") out_df = pd.concat([out_df, result_df], axis=0) out_df.sort_index(inplace=True) if write_to_file: out_df.to_csv(out_file) try: p.close() p.join() except Exception: pass # reset if objective_percent is not None: for v, (lower, upper) in prev_bounds.items(): model.variables[v].set_bounds(lower, upper) model.update() elif fva_type == FVA_TYPE.FAST: status = os.system("mpirun --version") if status != 0: raise ValueError( [ "MPI and/or CPLEX nont installed, please follow the install guide" "or use the quick install script" ] ) # Set schedule and chunk size parameters os.environ["OMP_SCHEDUELE"] = schedule + str(len(split_reactions[0])) objective_percent = objective_percent if objective_percent is not None else -1 var_dict = {i: v.name for i, v in enumerate(model.variables)} var_dict_inv = {v: k for k, v in var_dict.items()} ex_indices = [var_dict_inv[r] for r in reactions_to_run] # Set reactions to optimize rxns_file = "rxns_%s.txt" % model.name with open(rxns_file, "w") as f: for num in ex_indices: f.write(str(num) + "\n") assert vffva_config.path is not None, "Please set value of `vffva_config.path` to location of VFFVA executable" try: status = os.system( "mpirun -np " + str(threads) + " --bind-to " + str(mem_aff) + " -x OMP_NUM_THREADS=" + str(1) + f" {vffva_config.path}/lib/veryfastFVA " + path + " " + str(objective_percent) + " " + str(scaling) + " " + rxns_file ) except: raise Exception( "Ran into issue when submitting VFFVA mpirun, please check installation- %s" % status ) # Fetch results resultFile = path[:-4] + "output.csv" if not os.path.exists(resultFile): raise Exception( "Ran into issue when running VFFVA, could not find results file..." ) result_df = pd.read_csv(resultFile, header=0) os.system("rm " + resultFile) os.system("rm " + rxns_file) result_df.rename( {i: var_dict[x] for i, x in enumerate(ex_indices) if i in result_df.index}, axis=0, inplace=True, ) result_df.index.rename("id", inplace=True) result_df.columns = ['min', 'max'] out_df = pd.concat([out_df, result_df], axis=0) else: raise Exception('`fva_type` must be equal to either FVA_TYPE.REGULAR or FVA_TYPE.FAST! Received %s'%fva_type) if threshold is not None: out_df[abs(out_df) < threshold] = 0 out_df.sort_index(inplace=True) out_df.columns = ['min', 'max'] if write_to_file: out_df.to_csv(out_file) return out_df
# works for both cplex and gurobi def _optlang_worker(threshold, metabolites): global global_model result = [] for m in metabolites: net = global_model.variables[m] reverse_id = _get_reverse_id(m) if reverse_id in global_model.variables: net -= global_model.variables[reverse_id] global_model.objective = Objective(net, direction="max") max_sol = solve_model(model=global_model, reactions=[m]).to_dict()[ global_model.name ][m] global_model.objective = Objective(net, direction="min") min_sol = solve_model(model=global_model, reactions=[m]).to_dict()[ global_model.name ][m] result.append({"id": m, "min": min_sol, "max": max_sol}) return result def _pool_init(sample_model): sys.stdout = open(os.devnull, "w") global global_model global_model = sample_model global global_problem global_problem = sample_model.problem