Source code for alpaqa.casadi_generator

import casadi as cs
import numpy as np
from os.path import splitext
from typing import Tuple, Optional, Literal, get_args, Callable, Dict

SECOND_ORDER_SPEC = Literal["no", "full", "prod", "L", "L_prod", "psi", "psi_prod"]


def _prepare_casadi_problem(
    f: cs.Function,
    g: Optional[cs.Function],
    second_order: SECOND_ORDER_SPEC = "no",
    sym: Callable = cs.SX.sym,
) -> Dict[str, cs.Function]:
    """Convert the objective and constraint functions, their gradients,
    Lagrangians, etc. into CasADi functions."""

    assert second_order in get_args(SECOND_ORDER_SPEC)

    assert f.n_in() in [1, 2]
    assert f.n_out() == 1
    assert f.size1_out(0) == 1
    assert f.size2_out(0) == 1
    n = f.size1_in(0)
    assert f.size2_in(0) == 1
    with_param = f.n_in() == 2
    if with_param:
        assert f.size2_in(1) == 1
    if g is not None:
        assert f.n_in() == g.n_in()
        assert f.size1_in(0) == g.size1_in(0)
        if with_param:
            assert f.size1_in(1) == g.size1_in(1)
            assert g.size2_in(1) == 1
        assert g.n_out() <= 1
        m = g.size1_out(0) if g.n_out() == 1 else 0
        if g.n_out() == 1:
            assert g.size2_out(0) == 1
    else:
        m = 0
    x = sym("x", n)
    p = sym("p", f.size1_in(1) if with_param else 0)
    xp = (x, p) if with_param else (x,)
    xp_def = (x, p)
    xp_names = (f.name_in(0), f.name_in(1)) if with_param else (f.name_in(0), "p")
    x = xp[0]
    y = sym("y", m)
    s = sym("s")
    v = sym("v", n)
    Σ = sym("Σ", m)
    zl = sym("zl", m)
    zu = sym("zu", m)

    if m > 0:
        sL = s * f(*xp) + cs.dot(y, g(*xp))
        L = f(*xp) + cs.dot(y, g(*xp))
        ζ = g(*xp) + (y / Σ)
         = cs.fmax(zl, cs.fmin(ζ, zu))
        d = ζ - 
        ŷ = Σ * d
         = s * f(*xp) + 0.5 * cs.dot(ŷ, d)
        ψ = f(*xp) + 0.5 * cs.dot(ŷ, d)
    else:
        sL = s * f(*xp)
         = s * f(*xp)
        L = f(*xp)
        ψ = f(*xp)

    functions = {
        "f": cs.Function(
            "f",
            [*xp_def],
            [f(*xp)],
            [*xp_names],
            ["f"],
        ),
        "f_grad_f": cs.Function(
            "f_grad_f",
            [*xp_def],
            [f(*xp), cs.gradient(f(*xp), x)],
            [*xp_names],
            ["f", "grad_f"],
        ),
        "g": cs.Function(
            "g",
            [*xp_def],
            [g(*xp)] if m > 0 else [],
            [*xp_names],
            ["g"] if m > 0 else [],
        ),
        "psi_grad_psi": cs.Function(
            "psi_grad_psi",
            [*xp_def, y, Σ, zl, zu],
            [ψ, cs.gradient(ψ, x)],
            [*xp_names, "y", "Σ", "zl", "zu"],
            ["ψ", "grad_ψ"],
        ),
    }
    if False:
        functions["grad_psi"] = cs.Function(
            "grad_psi",
            [*xp_def, y, Σ, zl, zu],
            [cs.gradient(ψ, x)],
            [*xp_names, "y", "Σ", "zl", "zu"],
            ["grad_ψ"],
        )
    if m > 0:
        functions["grad_L"] = cs.Function(
            "grad_L",
            [*xp_def, y],
            [cs.gradient(L, x)],
            [*xp_names, "y"],
            ["grad_L"],
        )
        functions["psi"] = cs.Function(
            "psi",
            [*xp_def, y, Σ, zl, zu],
            [ψ, ŷ],
            [*xp_names, "y", "Σ", "zl", "zu"],
            ["ψ", "ŷ"],
        )

    if second_order in ["full", "L"]:
        functions["jacobian_g"] = cs.Function(
            "jacobian_g",
            [*xp_def],
            [cs.jacobian(g(*xp), x)],
            [*xp_names],
            ["jac_g"],
        )
        HL = cs.hessian(sL, x)[0]
        if not HL.is_dense():
            HL = cs.triu(HL)
        functions["hess_L"] = cs.Function(
            "hess_L",
            [*xp_def, y, s],
            [HL],
            [*xp_names, "y", "s"],
            ["hess_L"],
        )
    if second_order in ["full", "prod", "L_prod"]:
        functions["hess_L_prod"] = cs.Function(
            "hess_L_prod",
            [*xp_def, y, s, v],
            [cs.gradient(cs.jtimes(sL, x, v, False), x)],
            [*xp_names, "y", "s", "v"],
            ["hess_L_prod"],
        )
    if second_order in ["full", "psi"]:
         = cs.hessian(, x)[0]
        if not .is_dense():
             = cs.triu()
        functions["hess_psi"] = cs.Function(
            "hess_psi",
            [*xp_def, y, Σ, s, zl, zu],
            [],
            [*xp_names, "y", "Σ", "s", "zl", "zu"],
            ["hess_psi"],
        )
    if second_order in ["full", "prod", "psi_prod"]:
        functions["hess_psi_prod"] = cs.Function(
            "hess_psi_prod",
            [*xp_def, y, Σ, s, zl, zu, v],
            [cs.gradient(cs.jtimes(, x, v, False), x)],
            [*xp_names, "y", "Σ", "s", "zl", "zu", "v"],
            ["hess_psi_prod"],
        )
    return functions

[docs] def generate_casadi_problem( f: cs.Function, g: Optional[cs.Function], second_order: SECOND_ORDER_SPEC = 'no', name: str = "alpaqa_problem", sym: Callable = cs.SX.sym, ) -> cs.CodeGenerator: """Convert the objective and constraint functions into a CasADi code generator. :param f: Objective function. :param g: Constraint function. :param second_order: Whether to generate functions for evaluating Hessians. :param name: Optional string description of the problem (used for filename). :param sym: Symbolic variable constructor, usually either ``casadi.SX.sym`` (default) or ``casadi.MX.sym``. :return: Code generator that generates the functions and derivatives used by the solvers. """ functions = _prepare_casadi_problem(f, g, second_order, sym) cgname = f"{name}.c" cg = cs.CodeGenerator(cgname) for func in functions.values(): cg.add(func) return cg
def _add_parameter(f: cs.Function, expected_inputs: int) -> Tuple[cs.Function, cs.SX, str]: if f.n_in() == expected_inputs + 1: # Okay, we already have a parameter argument return f, f.sx_in(expected_inputs), f.name_in(expected_inputs) elif f.n_in() == expected_inputs: # We don't have a parameter argument param = cs.SX.sym("p", 0) fx = f(*(f.sx_in(i) for i in range(expected_inputs))) return cs.Function( f.name(), [f.sx_in(i) for i in range(expected_inputs)] + [param], [fx] if f.n_out() == 1 else fx, [f.name_in(i) for i in range(expected_inputs)] + ["p"], [f.name_out(i) for i in range(f.n_out())], ), param, "p" else: raise RuntimeError(f"Incorrect number of inputs for {f.name()} " f"(expected {expected_inputs} inputs with optional " f"additional parameter)")
[docs] def generate_casadi_control_problem( f: cs.Function, l: cs.Function, l_N: cs.Function, h: cs.Function = None, h_N: cs.Function = None, c: cs.Function = None, c_N: cs.Function = None, name: str = "alpaqa_control_problem", ) -> cs.CodeGenerator: """Convert the dynamics and cost functions into a CasADi code generator. :param f: Dynamics. :param name: Optional string description of the problem (used for filename). :return: Code generator that generates the functions and derivatives used by the solvers. """ cgname = f"{name}.c" cg = cs.CodeGenerator(cgname) assert f.n_in() in [2, 3] assert f.n_out() == 1 f, p_var, p_name = _add_parameter(f, 2) # x, u assert f.size2_in(0) == 1 nx = f.size1_in(0) assert f.size2_in(1) == 1 nu = f.size1_in(1) assert f.size2_in(2) == 1 p = p_var.size1() assert f.size1_out(0) == nx assert f.size2_out(0) == 1 x_var = f.sx_in(0) u_var = f.sx_in(1) xu_var = cs.vertcat(x_var, u_var) v_var = cs.SX.sym("v", nx) # dynamics and their derivatives cg.add(cs.Function( "f", [x_var, u_var, p_var], [f(x_var, u_var, p_var)], [f.name_in(i) for i in range(3)], [f.name_out(0)], )) cg.add(cs.Function( "jacobian_f", [x_var, u_var, p_var], [cs.densify(cs.jacobian(f(x_var, u_var, p_var), xu_var))], [f.name_in(i) for i in range(3)], ["jac_" + f.name_out(0)], )) cg.add(cs.Function( "grad_f_prod", [x_var, u_var, p_var, v_var], [cs.jtimes(f(x_var, u_var, p_var), xu_var, v_var, True)], [f.name_in(i) for i in range(3)] + ["v"], ["grad_" + f.name_out(0) + "_prod"], )) # output mapping if h is None: h = cs.Function("h", [x_var, u_var, p_var], [xu_var]) else: assert h.n_in() in [2, 3] h, _, _ = _add_parameter(h, 2) # x, u assert h.size1_in(0) == nx assert h.size2_in(0) == 1 assert h.size1_in(1) == nu assert h.size2_in(1) == 1 assert h.size1_in(2) == p assert h.size2_in(2) == 1 nh = h.size1_out(0) assert h.size2_out(0) == 1 cg.add(cs.Function( "h", [x_var, u_var, p_var], [h(x_var, u_var, p_var)], [h.name_in(i) for i in range(3)], [h.name_out(0)], )) # terminal output mapping if h_N is None: h_N = cs.Function("h_N", [x_var, p_var], [x_var]) else: assert h_N.n_in() in [1, 2] h_N, _, _ = _add_parameter(h_N, 1) # x assert h_N.size1_in(0) == nx assert h_N.size2_in(0) == 1 assert h_N.size1_in(1) == p assert h_N.size2_in(1) == 1 nh_N = h_N.size1_out(0) assert h_N.size2_out(0) == 1 cg.add(cs.Function( "h_N", [x_var, p_var], [h_N(x_var, p_var)], [h_N.name_in(i) for i in range(2)], [h_N.name_out(0)], )) # cost assert l.n_in() in [1, 2] # h l, _, _ = _add_parameter(l, 1) assert l.size1_in(0) == nh assert l.size2_in(0) == 1 assert l.size1_in(1) == p assert l.size2_in(1) == 1 assert l.n_out() == 1 assert l.size1_out(0) == 1 assert l.size2_out(0) == 1 h_var = cs.SX.sym("h", *l.sx_in(0).shape) cg.add(cs.Function( "l", [h_var, p_var], [l(h_var, p_var)], [l.name_in(i) for i in range(2)], [l.name_out(0)], )) cg.add(cs.Function( "qr", [xu_var, h_var, p_var], [cs.jtimes(h(x_var, u_var, p_var), xu_var, cs.gradient(l(h_var, p_var), h_var), True)], ["xu", "h", "p"], # TODO ["qr"], # TODO )) # (JhᵀΛ)ᵀ = ΛJh # JhᵀΛ = cs.jtimes(h(x_var, u_var, p_var), x_var, cs.hessian(l(h_var, p_var), h_var)[0], True) # JhᵀΛJh = cs.jtimes(h(x_var, u_var, p_var), x_var, cs.transpose(JhTΛ), True) Jhx = cs.jacobian(h(x_var, u_var, p_var), x_var) Λ = cs.hessian(l(h_var, p_var), h_var)[0] Q = Jhx.T @ Λ @ Jhx cg.add(cs.Function( "Q", [xu_var, h_var, p_var], [Q], ["xu", "h", "p"], # TODO ["Q"], # TODO )) # (JhᵀΛ)ᵀ = ΛJh # JhᵀΛ = cs.jtimes(h(x_var, u_var, p_var), u_var, cs.hessian(l(h_var, p_var), h_var)[0], True) # JhᵀΛJh = cs.jtimes(h(x_var, u_var, p_var), u_var, cs.transpose(JhTΛ), True) Jhu = cs.jacobian(h(x_var, u_var, p_var), u_var) R = Jhu.T @ Λ @ Jhu cg.add(cs.Function( "R", [xu_var, h_var, p_var], [R], ["xu", "h", "p"], # TODO ["R"], # TODO )) # (JhᵀΛ)ᵀ = ΛJh # JhᵀΛ = cs.jtimes(h(x_var, u_var, p_var), x_var, cs.hessian(l(h_var, p_var), h_var)[0], True) # JhᵀΛJh = cs.jtimes(h(x_var, u_var, p_var), u_var, cs.transpose(JhTΛ), True) S = Jhu.T @ Λ @ Jhx cg.add(cs.Function( "S", [xu_var, h_var, p_var], [S], ["xu", "h", "p"], # TODO ["S"], # TODO )) # terminal cost assert l_N.n_in() in [1, 2] # h l_N, _, _ = _add_parameter(l_N, 1) assert l_N.size1_in(0) == nh_N assert l_N.size2_in(0) == 1 assert l_N.size1_in(1) == p assert l_N.size2_in(1) == 1 assert l_N.n_out() == 1 assert l_N.size1_out(0) == 1 assert l_N.size2_out(0) == 1 hN_var = cs.SX.sym("hN", *l_N.sx_in(0).shape) cg.add(cs.Function( "l_N", [hN_var, p_var], [l_N(hN_var, p_var)], [l_N.name_in(i) for i in range(2)], [l_N.name_out(0)], )) cg.add(cs.Function( "q_N", [x_var, hN_var, p_var], [cs.jtimes(h_N(x_var, p_var), x_var, cs.gradient(l_N(hN_var, p_var), hN_var), True)], ["x", "h", "p"], # TODO ["q_N"], # TODO )) # (JhᵀΛ)ᵀ = ΛJh # JhᵀΛ = cs.jtimes(h_N(x_var, p_var), x_var, cs.hessian(l_N(hN_var, p_var), hN_var)[0], True) # JhᵀΛJh = cs.jtimes(h_N(x_var, p_var), x_var, cs.transpose(JhTΛ), True) JhN = cs.jacobian(h_N(x_var, p_var), x_var) ΛN = cs.hessian(l_N(hN_var, p_var), hN_var)[0] Q_N = JhN.T @ ΛN @ JhN cg.add(cs.Function( "Q_N", [x_var, hN_var, p_var], [Q_N], ["x", "h", "p"], # TODO ["Q_N"], # TODO )) # constraints if c is None: c = cs.Function("c", [x_var, p_var], [cs.vertcat()]) else: assert c.n_in() in [1, 2] c, _, _ = _add_parameter(c, 1) assert c.size1_in(0) == nx assert c.size2_in(0) == 1 assert c.size1_in(1) == p assert c.size2_in(1) == 1 assert c.n_out() == 1 nc = c.size1_out(0) assert c.size2_out(0) in [0, 1] w_var = cs.SX.sym("w", nc) cg.add(cs.Function( "c", [x_var, p_var], [c(x_var, p_var)], [c.name_in(i) for i in range(2)], [c.name_out(0)], )) cg.add(cs.Function( "grad_c_prod", [x_var, p_var, w_var], [cs.jtimes(c(x_var, p_var), x_var, w_var, True) if nc > 0 else cs.DM.zeros(nx)], [c.name_in(i) for i in range(2)] + ["w"], ["grad_" + c.name_out(0) + "_prod"], )) m_var = cs.SX.sym("m", nc) # (JhᵀM)ᵀ = MJh # JhᵀM = cs.jtimes(c(x_var, p_var), x_var, cs.diag(m_var), True) # JhᵀMJh = cs.jtimes(c(x_var, p_var), x_var, cs.transpose(JhᵀM), True) Jc = cs.jacobian(c(x_var, p_var), x_var) JhᵀMJh = Jc.T @ cs.diag(m_var) @ Jc cg.add(cs.Function( "gn_hess_c", [x_var, p_var, m_var], [JhᵀMJh], [c.name_in(i) for i in range(2)] + ["m"], ["gn_hess_" + c.name_out(0)], )) # constraints if c_N is None: c_N = cs.Function("c_N", [x_var, p_var], [cs.vertcat()]) else: assert c_N.n_in() in [1, 2] c_N, _, _ = _add_parameter(c_N, 1) assert c_N.size1_in(0) == nx assert c_N.size2_in(0) == 1 assert c_N.size1_in(1) == p assert c_N.size2_in(1) == 1 assert c_N.n_out() == 1 nc_N = c_N.size1_out(0) assert c_N.size2_out(0) in [0, 1] wN_var = cs.SX.sym("wN", nc_N) cg.add(cs.Function( "c_N", [x_var, p_var], [c_N(x_var, p_var)], [c_N.name_in(i) for i in range(2)], [c_N.name_out(0)], )) cg.add(cs.Function( "grad_c_prod_N", [x_var, p_var, wN_var], [cs.jtimes(c_N(x_var, p_var), x_var, wN_var, True) if nc_N > 0 else cs.DM.zeros(nx)], [c_N.name_in(i) for i in range(2)] + ["w"], ["grad_" + c_N.name_out(0) + "_prod"], )) mN_var = cs.SX.sym("mN", nc_N) JcN = cs.jacobian(c_N(x_var, p_var), x_var) JhᵀMJhN = JcN.T @ cs.diag(mN_var) @ JcN cg.add(cs.Function( "gn_hess_c_N", [x_var, p_var, mN_var], [JhᵀMJhN], [c_N.name_in(i) for i in range(2)] + ["m"], ["gn_hess_" + c_N.name_out(0)], )) return cg
[docs] def write_casadi_problem_data(sofile, C, D, param, l1_reg, penalty_alm_split): if all(i is None for i in (C, D, param, l1_reg, penalty_alm_split)): return C = ([], []) if C is None else C D = ([], []) if D is None else D param = [] if param is None else param l1_reg = [] if l1_reg is None else l1_reg penalty_alm_split = 0 if penalty_alm_split is None else penalty_alm_split with open(f"{splitext(sofile)[0]}.csv", "w") as f: opt = dict(delimiter=",", newline="\n") ravelrow = lambda x: np.reshape(x, (1, -1), order="A") writerow = lambda x: np.savetxt(f, ravelrow(x), **opt) try_lb = lambda x: x.lowerbound if hasattr(x, "lowerbound") else x[0] try_ub = lambda x: x.upperbound if hasattr(x, "upperbound") else x[1] writerow(try_lb(C)) writerow(try_ub(C)) writerow(try_lb(D)) writerow(try_ub(D)) writerow(param) writerow(l1_reg) f.write(str(penalty_alm_split))
[docs] def write_casadi_control_problem_data( sofile, U, D, D_N, x_init, param, penalty_alm_split=0, penalty_alm_split_N=None ): if U is None and D is None and D_N is None and x_init is None and param is None: return U = ([], []) if U is None else U D = ([], []) if D is None else D D_N = ([], []) if D_N is None else D_N x_init = [] if x_init is None else x_init param = [] if param is None else param if penalty_alm_split_N is None: penalty_alm_split_N = penalty_alm_split with open(f"{splitext(sofile)[0]}.csv", "w") as f: opt = dict(delimiter=",", newline="\n") ravelrow = lambda x: np.reshape(x, (1, -1), order="A") writerow = lambda x: np.savetxt(f, ravelrow(x), **opt) try_lb = lambda x: x.lowerbound if hasattr(x, "lowerbound") else x[0] try_ub = lambda x: x.upperbound if hasattr(x, "upperbound") else x[1] writerow(try_lb(U)) writerow(try_ub(U)) writerow(try_lb(D)) writerow(try_ub(D)) writerow(try_lb(D_N)) writerow(try_ub(D_N)) writerow(x_init) writerow(param) f.write(f"{penalty_alm_split} {penalty_alm_split_N}")