# -*- coding: utf-8 -*-
"""
This module contains functions for formulating systems of Ordinary Differential
Equations (ODE-systems) which may be integrated numerically to model temporal
evolution of concentrations in reaction systems.
"""
from __future__ import (absolute_import, division, print_function)
from collections import OrderedDict
from functools import reduce, partial
from itertools import chain
from operator import attrgetter, mul
import math
import warnings
try:
import numpy as np
except ImportError:
np = None
from ..units import (
to_unitless, get_derived_unit, rescale, magnitude, unitless_in_registry,
default_unit_in_registry, default_units as u
)
from ..util.pyutil import deprecated
from ..util._expr import Expr, Symbol
from .rates import RateExpr, MassAction
def _get_derived_unit(reg, key):
try:
return get_derived_unit(reg, key)
except KeyError:
return get_derived_unit(reg, '_'.join(key.split('_')[:-1]))
[docs]def law_of_mass_action_rates(conc, rsys, variables=None):
""" Returns a generator of reaction rate expressions
Rates from the law of mass action (:attr:`Reaction.inact_reac` ignored)
from a :class:`ReactionSystem`.
Parameters
----------
conc : array_like
concentrations (floats or symbolic objects)
rsys : ReactionSystem instance
See :class:`ReactionSystem`
variables : dict (optional)
to override parameters in the rate expressions of the reactions
Examples
--------
>>> from chempy import ReactionSystem, Reaction
>>> line, keys = 'H2O -> H+ + OH- ; 1e-4', 'H2O H+ OH-'
>>> rxn = Reaction.from_string(line, keys)
>>> rsys = ReactionSystem([rxn], keys)
>>> next(law_of_mass_action_rates([55.4, 1e-7, 1e-7], rsys))
0.00554
>>> from chempy.kinetics.rates import Arrhenius, MassAction
>>> rxn.param = MassAction(Arrhenius({'A': 1e10, 'Ea_over_R': 9314}))
>>> print('%.5g' % next(law_of_mass_action_rates([55.4, 1e-7, 1e-7], rsys, {'temperature': 293})))
0.0086693
"""
for idx_r, rxn in enumerate(rsys.rxns):
if isinstance(rxn.param, RateExpr):
if isinstance(rxn.param, MassAction):
yield rxn.param(dict(chain(variables.items(), zip(rsys.substances.keys(), conc))), reaction=rxn)
else:
raise ValueError("Not mass-action rate in reaction %d" % idx_r)
else:
rate = 1
for substance_key, coeff in rxn.reac.items():
s_idx = rsys.as_substance_index(substance_key)
rate *= conc[s_idx]**coeff
yield rate*rxn.param
[docs]def dCdt_list(rsys, rates):
""" Returns a list of the time derivatives of the concentrations
Parameters
----------
rsys: ReactionSystem instance
rates: array_like
rates (to be weighted by stoichiometries) of the reactions
in ``rsys``
Examples
--------
>>> from chempy import ReactionSystem, Reaction
>>> line, keys = 'H2O -> H+ + OH- ; 1e-4', 'H2O H+ OH-'
>>> rsys = ReactionSystem([Reaction.from_string(line, keys)], keys)
>>> dCdt_list(rsys, [0.0054])
[-0.0054, 0.0054, 0.0054]
"""
f = [0]*rsys.ns
net_stoichs = rsys.net_stoichs()
for idx_s in range(rsys.ns):
for idx_r in range(rsys.nr):
f[idx_s] += net_stoichs[idx_r, idx_s]*rates[idx_r]
return f
[docs]def get_odesys(rsys, include_params=True, substitutions=None, SymbolicSys=None, unit_registry=None,
output_conc_unit=None, output_time_unit=None, cstr=False, constants=None, **kwargs):
""" Creates a :class:`pyneqsys.SymbolicSys` from a :class:`ReactionSystem`
The parameters passed to RateExpr will contain the key ``'time'`` corresponding to the
independent variable of the IVP.
Parameters
----------
rsys : ReactionSystem
Each reaction of ``rsys`` will have their :meth:`Reaction.rate_expr()` invoked.
Note that if :attr:`Reaction.param` is not a :class:`RateExpr` (or convertible to
one through :meth:`as_RateExpr`) it will be used to construct a :class:`MassAction`
instance.
include_params : bool (default: True)
Whether rate constants should be included into the rate expressions or
left as free parameters in the :class:`pyneqsys.SymbolicSys` instance.
substitutions : dict, optional
Variable substitutions used by rate expressions (in respective Reaction.param).
values are allowed to be values of instances of :class:`Expr`.
SymbolicSys : class (optional)
Default : :class:`pyneqsys.SymbolicSys`.
unit_registry: dict (optional)
See :func:`chempy.units.get_derived_units`.
output_conc_unit : unit (Optional)
output_time_unit : unit (Optional)
cstr : bool
Generate expressions for continuously stirred tank reactor.
constants : module
e.g. ``chempy.units.default_constants``, parameter keys not found in
substitutions will be looked for as an attribute of ``constants`` when provided.
\\*\\*kwargs :
Keyword arguemnts passed on to `SymbolicSys`.
Returns
-------
pyodesys.symbolic.SymbolicSys
extra : dict, with keys:
- param_keys : list of str instances
- unique : OrderedDict mapping str to value (possibly None)
- p_units : list of units
- max_euler_step_cb : callable or None
- linear_dependencies : None or factory of solver callback
- rate_exprs_cb : callable
- cstr_fr_fc : None or (feed-ratio-key, subtance-key-to-feed-conc-key-map)
Examples
--------
>>> from chempy import Equilibrium, ReactionSystem
>>> eq = Equilibrium({'Fe+3', 'SCN-'}, {'FeSCN+2'}, 10**2)
>>> substances = 'Fe+3 SCN- FeSCN+2'.split()
>>> rsys = ReactionSystem(eq.as_reactions(kf=3.0), substances)
>>> odesys, extra = get_odesys(rsys)
>>> init_conc = {'Fe+3': 1.0, 'SCN-': .3, 'FeSCN+2': 0}
>>> tout, Cout, info = odesys.integrate(5, init_conc)
>>> Cout[-1, :].round(4)
array([0.7042, 0.0042, 0.2958])
"""
if SymbolicSys is None:
from pyodesys.symbolic import SymbolicSys
r_exprs = [rxn.rate_expr() for rxn in rsys.rxns]
_ori_pk = set.union(*(ratex.all_parameter_keys() for ratex in r_exprs))
_ori_uk = set.union(*(ratex.all_unique_keys() for ratex in r_exprs))
_subst_pk = set()
_active_subst = OrderedDict()
_passive_subst = OrderedDict()
substitutions = substitutions or {}
unique = OrderedDict()
unique_units = {}
cstr_fr_fc = (
'feedratio',
OrderedDict([(sk, 'fc_'+sk) for sk in rsys.substances])
) if cstr is True else cstr
if cstr_fr_fc:
_ori_pk.add(cstr_fr_fc[0])
for k in cstr_fr_fc[1].values():
_ori_pk.add(k)
def _reg_unique_unit(k, arg_dim, idx):
if unit_registry is None:
return
unique_units[k] = reduce(mul, [1]+[unit_registry[dim]**v for dim, v in arg_dim[idx].items()])
def _get_arg_dim(expr, rxn):
if unit_registry is None:
return None
else:
return expr.args_dimensionality(reaction=rxn)
def _reg_unique(expr, rxn=None):
if not isinstance(expr, Expr):
raise NotImplementedError("Currently only Expr sub classes are supported.")
if isinstance(expr, MassAction):
if expr.args is None:
uk, = expr.unique_keys
if uk not in substitutions:
unique[uk] = None
_reg_unique_unit(uk, _get_arg_dim(expr, rxn), 0)
return
else:
arg, = expr.args
if isinstance(arg, Symbol):
uk, = arg.unique_keys
if uk not in substitutions:
unique[uk] = None
_reg_unique_unit(uk, _get_arg_dim(expr, rxn), 0)
return
if expr.args is None:
for idx, k in enumerate(expr.unique_keys):
if k not in substitutions:
unique[k] = None
_reg_unique_unit(k, _get_arg_dim(expr, rxn), idx)
else:
for idx, arg in enumerate(expr.args):
if isinstance(arg, Expr):
_reg_unique(arg, rxn=rxn)
elif expr.unique_keys is not None and idx < len(expr.unique_keys):
uk = expr.unique_keys[idx]
if uk not in substitutions:
unique[uk] = arg
_reg_unique_unit(uk, _get_arg_dim(expr, rxn), idx)
for sk, sv in substitutions.items():
if sk not in _ori_pk and sk not in _ori_uk:
raise ValueError("Substitution: '%s' does not appear in any rate expressions." % sk)
if isinstance(sv, Expr):
_subst_pk.update(sv.parameter_keys)
_active_subst[sk] = sv
if not include_params:
_reg_unique(sv)
else:
# if unit_registry is None:
if unit_registry is not None:
sv = unitless_in_registry(sv, unit_registry)
_passive_subst[sk] = sv
all_pk = []
for pk in filter(lambda x: x not in substitutions and x != 'time',
_ori_pk.union(_subst_pk)):
if hasattr(constants, pk):
const = getattr(constants, pk)
if unit_registry is None:
const = magnitude(const)
else:
const = unitless_in_registry(const, unit_registry)
_passive_subst[pk] = const
else:
all_pk.append(pk)
if not include_params:
for rxn, ratex in zip(rsys.rxns, r_exprs):
_reg_unique(ratex, rxn)
all_pk_with_unique = list(chain(all_pk, filter(lambda k: k not in all_pk, unique.keys())))
if include_params:
param_names_for_odesys = all_pk
else:
param_names_for_odesys = all_pk_with_unique
if unit_registry is None:
p_units = None
else:
# We need to make rsys_params unitless and create
# a pre- & post-processor for SymbolicSys
pk_units = [_get_derived_unit(unit_registry, k) for k in all_pk]
p_units = pk_units if include_params else (pk_units + [unique_units[k] for k in unique])
new_r_exprs = []
for ratex in r_exprs:
_pu, _new_ratex = ratex.dedimensionalisation(unit_registry)
new_r_exprs.append(_new_ratex)
r_exprs = new_r_exprs
time_unit = get_derived_unit(unit_registry, 'time')
conc_unit = get_derived_unit(unit_registry, 'concentration')
def post_processor(x, y, p):
time = x*time_unit
if output_time_unit is not None:
time = rescale(time, output_time_unit)
conc = y*conc_unit
if output_conc_unit is not None:
conc = rescale(conc, output_conc_unit)
return time, conc, np.array([elem*p_unit for elem, p_unit in zip(p.T, p_units)], dtype=object).T
kwargs['to_arrays_callbacks'] = (
lambda x: to_unitless(x, time_unit),
lambda y: to_unitless(y, conc_unit),
lambda p: np.array([to_unitless(px, p_unit) for px, p_unit in zip(
p.T if hasattr(p, 'T') else p, p_units)]).T
)
kwargs['post_processors'] = kwargs.get('post_processors', []) + [post_processor]
def dydt(t, y, p, backend=math):
variables = dict(chain(y.items(), p.items()))
if 'time' in variables:
raise ValueError("Key 'time' is reserved.")
variables['time'] = t
for k, act in _active_subst.items():
if unit_registry is not None and act.args:
_, act = act.dedimensionalisation(unit_registry)
variables[k] = act(variables, backend=backend)
variables.update(_passive_subst)
return rsys.rates(variables, backend=backend, ratexs=r_exprs, cstr_fr_fc=cstr_fr_fc)
def reaction_rates(t, y, p, backend=math):
variables = dict(chain(y.items(), p.items()))
if 'time' in variables:
raise ValueError("Key 'time' is reserved.")
variables['time'] = t
for k, act in _active_subst.items():
if unit_registry is not None and act.args:
_, act = act.dedimensionalisation(unit_registry)
variables[k] = act(variables, backend=backend)
variables.update(_passive_subst)
return [ratex(variables, backend=backend, reaction=rxn) for
rxn, ratex in zip(rsys.rxns, r_exprs)]
names = [s.name for s in rsys.substances.values()]
latex_names = [None if s.latex_name is None else ('\\mathrm{' + s.latex_name + '}')
for s in rsys.substances.values()]
compo_vecs, compo_names = rsys.composition_balance_vectors()
odesys = SymbolicSys.from_callback(
dydt, dep_by_name=True, par_by_name=True, names=names,
latex_names=latex_names, param_names=param_names_for_odesys,
linear_invariants=None if len(compo_vecs) == 0 else compo_vecs,
linear_invariant_names=None if len(compo_names) == 0 else list(map(str, compo_names)),
**kwargs)
symbolic_ratexs = reaction_rates(
odesys.indep, dict(zip(odesys.names, odesys.dep)),
dict(zip(odesys.param_names, odesys.params)), backend=odesys.be)
rate_exprs_cb = odesys._callback_factory(symbolic_ratexs)
if rsys.check_balance(strict=True):
# Composition available, we can provide callback for calculating
# maximum allowed Euler forward step at start of integration.
def max_euler_step_cb(x, y, p=()):
_x, _y, _p = odesys.pre_process(*odesys.to_arrays(x, y, p))
upper_bounds = rsys.upper_conc_bounds(_y)
fvec = odesys.f_cb(_x[0], _y, _p)
h = []
for idx, fcomp in enumerate(fvec):
if fcomp == 0:
h.append(float('inf'))
elif fcomp > 0:
h.append((upper_bounds[idx] - _y[idx])/fcomp)
else: # fcomp < 0
h.append(-_y[idx]/fcomp)
min_h = min(h)
return min(min_h, 1)
def linear_dependencies(preferred=None):
if preferred is not None:
if len(preferred) == 0:
raise ValueError("No preferred substance keys provided")
if len(preferred) >= len(rsys.substances):
raise ValueError("Cannot remove all concentrations from linear dependencies")
for k in preferred:
if k not in rsys.substances:
raise ValueError("Unknown substance key: %s" % k)
def analytic_solver(x0, y0, p0, be):
if preferred is None:
_preferred = None
else:
_preferred = list(preferred)
A = be.Matrix(compo_vecs)
rA, pivots = A.rref()
analytic_exprs = OrderedDict()
for ri, ci1st in enumerate(pivots):
for idx in range(ci1st, odesys.ny):
key = odesys.names[idx]
if rA[ri, idx] == 0:
continue
if _preferred is None or key in _preferred:
terms = [rA[ri, di]*(odesys.dep[di] - y0[odesys.dep[di]])
for di in range(ci1st, odesys.ny) if di != idx]
analytic_exprs[odesys[key]] = y0[odesys.dep[idx]] - sum(terms)/rA[ri, idx]
if _preferred is not None:
_preferred.remove(key)
break
for k in reversed(list(analytic_exprs.keys())):
analytic_exprs[k] = analytic_exprs[k].subs(analytic_exprs)
if _preferred is not None and len(_preferred) > 0:
raise ValueError("Failed to obtain analytic expression for: %s" % ', '.join(_preferred))
return analytic_exprs
return analytic_solver
else:
max_euler_step_cb = None
linear_dependencies = None
return odesys, {
'param_keys': all_pk,
'unique': unique,
'p_units': p_units,
'max_euler_step_cb': max_euler_step_cb,
'linear_dependencies': linear_dependencies,
'rate_exprs_cb': rate_exprs_cb,
'cstr_fr_fc': cstr_fr_fc,
'unit_registry': unit_registry
}
[docs]@deprecated(last_supported_version='0.5.3', will_be_missing_in='0.8.0',
use_instead='pyodesys.chained_parameter_variation')
def chained_parameter_variation(odesys, durations, init_conc, varied_params, default_params, integrate_kwargs=None):
""" Integrate an ODE-system for a serie of durations with some parameters changed in-between
Parameters
----------
odesys : :class:`pyodesys.ODESys` instance
durations : iterable of floats
init_conc : dict or array_like
varied_params : dict mapping parameter name to array_like
Each array_like need to be of same length as durations.
default_params : dict or array_like
Default values for the parameters of the ODE system.
integrate_kwargs : dict
Keyword arguments passed on to :meth:`pyodesys.ODESys.integrate`.
"""
for k, v in varied_params.items():
if len(v) != len(durations):
raise ValueError("Mismathced lengths of durations and varied_params")
integrate_kwargs = integrate_kwargs or {}
touts = []
couts = []
infos = {}
c0 = init_conc.copy()
for idx, duration in enumerate(durations):
params = default_params.copy()
for k, v in varied_params.items():
params[k] = v[idx]
tout, cout, info = odesys.integrate(duration, c0, params, **integrate_kwargs)
c0 = cout[-1, :]
idx0 = 0 if idx == 0 else 1
t_global = 0 if idx == 0 else touts[-1][-1]
touts.append(tout[idx0:] + t_global)
couts.append(cout[idx0:, ...])
for k, v in info.items():
if k.startswith('internal'):
continue
if k in infos:
infos[k] += (v,)
else:
infos[k] = (v,)
return np.concatenate(touts), np.concatenate(couts), infos
def _create_odesys(rsys, substance_symbols=None, parameter_symbols=None, pretty_replace=lambda x: x,
backend=None, SymbolicSys=None, time_symbol=None, unit_registry=None, rates_kw=None,
parameter_expressions=None, symbolic_kw=None):
""" This will be a simpler version of get_odesys without the unit handling code.
The motivation is to reduce complexity (the code of get_odesys is long with multiple closures).
This will also rely on SymPy explicitly and the user will be expected to deal with SymPy
expressions.
Only when this function has the same capabilities as get_odesys will it become and public API
(along with a deprecation of get_odesys).
Parameters
----------
rsys : ReactionSystem instance
substance_symbols : OrderedDict
If ``None``: ``rsys.substances`` will be used.
parameter_symbols : OrderedDict
backend : str or module
Symbolic backend (e.g. sympy). The package ``sym`` is used as a wrapper.
SymbolicSys: class
See ``pyodesys`` for API.
time_symbol : Symbol
unit_registry : object
e.g. ``chempy.units.SI_base_registry``
rates_kw : dict
Keyword arguments passed to the ``rates`` method of rsys.
parameter_expressions : dict
Optional overrides.
symbolic_kw : dict
Keyword arguments passed on to SymbolicSys.
Returns
-------
SymbolicSys (subclass of ``pyodesys.ODESys``)
dict :
- ``'symbols'``: dict mapping str to symbol.
- ``'validate'``: callable acppeting a dictionary mapping str to quantities
"""
if backend is None:
from sym import Backend
backend = Backend(backend)
if SymbolicSys is None:
from pyodesys.symbolic import SymbolicSys
if substance_symbols is None:
substance_symbols = OrderedDict([(key, backend.Symbol(key)) for key in rsys.substances])
if isinstance(substance_symbols, OrderedDict):
if list(substance_symbols) != list(rsys.substances):
raise ValueError("substance_symbols needs to have same (oredered) keys as rsys.substances")
if parameter_symbols is None:
keys = []
for rxnpar in map(attrgetter('param'), rsys.rxns):
if isinstance(rxnpar, str):
if rxnpar in (parameter_expressions or {}):
for pk in parameter_expressions[rxnpar].all_parameter_keys():
keys.append(pk)
else:
keys.append(rxnpar)
elif isinstance(rxnpar, Expr):
keys.extend(rxnpar.all_unique_keys())
for pk in rxnpar.all_parameter_keys():
if pk not in keys:
keys.append(pk)
else:
raise NotImplementedError("Unknown")
if rates_kw and 'cstr_fr_fc' in rates_kw:
flowrate_volume, feed_conc = rates_kw['cstr_fr_fc']
keys.append(flowrate_volume)
keys.extend(feed_conc.values())
assert all(sk in rsys.substances for sk in feed_conc)
if len(keys) != len(set(keys)):
raise ValueError("Duplicates in keys")
parameter_symbols = OrderedDict([(key, backend.Symbol(key)) for key in keys])
if not isinstance(parameter_symbols, OrderedDict):
raise ValueError("parameter_symbols needs to be an OrderedDict")
symbols = OrderedDict(chain(substance_symbols.items(), parameter_symbols.items()))
symbols['time'] = time_symbol or backend.Symbol('t')
if any(symbols['time'] == v for k, v in symbols.items() if k != 'time'):
raise ValueError("time_symbol already in use (name clash?)")
varbls = dict(symbols, **parameter_symbols)
varbls.update(parameter_expressions or {})
rates = rsys.rates(varbls, **(rates_kw or {}))
compo_vecs, compo_names = rsys.composition_balance_vectors()
odesys = SymbolicSys(
zip([substance_symbols[key] for key in rsys.substances], [rates[key] for key in rsys.substances]),
symbols['time'],
parameter_symbols.values(),
names=list(rsys.substances.keys()),
latex_names=[s.latex_name for s in rsys.substances.values()],
param_names=parameter_symbols.keys(),
latex_param_names=[pretty_replace(n) for n in parameter_symbols.keys()],
linear_invariants=compo_vecs,
linear_invariant_names=list(map(str, compo_names)),
backend=backend,
dep_by_name=True,
par_by_name=True,
**(symbolic_kw or {})
)
validate = partial(_validate, rsys=rsys, symbols=symbols, odesys=odesys, backend=backend)
return odesys, {
'symbols': symbols,
'validate': validate,
'unit_aware_solve': _mk_unit_aware_solve(odesys, unit_registry, validate=validate) if unit_registry else None
}
def _mk_dedim(unit_registry):
unit_time = get_derived_unit(unit_registry, 'time')
unit_conc = get_derived_unit(unit_registry, 'concentration')
def dedim_tcp(t, c, p, param_unit=lambda k, v: default_unit_in_registry(v, unit_registry)):
_t = to_unitless(t, unit_time)
_c = to_unitless(c, unit_conc)
_p, pu = {}, {}
for k, v in p.items():
pu[k] = param_unit(k, v)
_p[k] = to_unitless(v, pu[k])
return (_t, _c, _p), dict(unit_time=unit_time, unit_conc=unit_conc, param_units=pu)
return locals()
def _mk_unit_aware_solve(odesys, unit_registry, validate):
dedim_ctx = _mk_dedim(unit_registry)
def solve(t, c, p, **kwargs):
for name in odesys.names:
c[name] # to e.g. populate defaultdict
validate(dict(c, **p))
tcp, dedim_extra = dedim_ctx['dedim_tcp'](t, c, p)
result = odesys.integrate(*tcp, **kwargs)
result.xout = result.xout * dedim_extra['unit_time']
result.yout = result.yout * dedim_extra['unit_conc'] # assumes only concentrations in y
return result, dedim_extra
return solve
def _exact(v):
if hasattr(v, '_uncertainty'):
return v.magnitude*v.units
else:
return v
def _validate(conditions, rsys, symbols, odesys, backend=None, transform=None, ignore=('time',),
check_conditions_no_extra=False):
""" For use with create_odesys
Parameters
----------
conditions : OrderedDict
Parameters, values with units from ``chempy.units``.
rsys : ReactionSystem
symbols : dict
Mapping variable name to symbols.
backend : module
Module for symbolic mathematics. (defaults to SymPy)
transform : callable for rewriting expressions
check_conditions_no_extra : bool
When True, conditions may not contain keys not referenced in any expression.
Raises
------
``KeyError`` if a key in conditions is not in odesys.names or odesys.param_names
"""
if backend is None:
from sym import Backend
backend = Backend(backend)
if transform is None:
if backend.__name__ != 'sympy':
warnings.warn("Backend != SymPy, provide your own transform function.")
def transform(arg):
expr = backend.logcombine(arg, force=True)
v, w = map(backend.Wild, 'v w'.split())
return expr.replace(backend.log(w**v), v*backend.log(w))
rates = {}
seen = set()
for k, v in rsys.rates(symbols).items():
expr = transform(v)
if expr == 0:
rate = 0 * u.molar/u.second
else:
rate = None
for term in (expr.args if hasattr(expr, 'is_Add') and expr.is_Add else (expr,)):
args = sorted(expr.free_symbols, key=lambda e: e.name)
values = [conditions[s.name] for s in args]
result = backend.lambdify(args, term)(*map(_exact, values))
to_unitless(result, u.molar/u.second) # raises an exception upon unit error
if rate is None:
rate = result
else:
rate += result
rates[k] = rate
seen |= set([s.name for s in expr.free_symbols])
if check_conditions_no_extra:
for k in conditions:
if k not in odesys.param_names and k not in odesys.names and k not in ignore:
raise KeyError("Unknown param: %s" % k)
return {'not_seen': (set(rsys.substances) | set(conditions)) - seen, 'rates': rates}