Hanging Chain

In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.

The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.

  1# %% Hanging chain MPC example
  2
  3import sys
  4from os.path import dirname
  5
  6import casadi as cs
  7import numpy as np
  8
  9sys.path.append(dirname(__file__))
 10from hanging_chain_dynamics import HangingChain
 11
 12# %% Build the model
 13
 14Ts = 0.05  # Time step [s]
 15N = 6  # Number of balls
 16dim = 2  # Dimension (2D or 3D)
 17
 18model = HangingChain(N, dim, Ts)
 19y_null, u_null = model.initial_state()  # Initial states and control inputs
 20
 21model_param = [0.03, 1.6, 0.033 / N]  # Concrete parameters m, D, L
 22
 23# %% Apply an initial control input to disturb the system
 24
 25N_dist = 3  # Number of time steps to apply the disturbance for
 26u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]  # Disturbance input
 27y_dist = model.simulate(N_dist, y_null, u_dist, model_param)  # Model states
 28y_dist = np.hstack((np.array([y_null]).T, y_dist))  # (including initial state)
 29
 30# %% Simulate the system without a controller
 31
 32N_sim = 180  # Number of time steps to simulate for
 33y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, model_param)  # States
 34y_sim = np.hstack((y_dist, y_sim))  # (including disturbed and initial states)
 35
 36# %% Define MPC cost and constraints
 37
 38N_horiz = 12  # MPC horizon length (number of time steps)
 39
 40y_init = cs.MX.sym("y_init", *y_null.shape)  # Initial state
 41model_params = cs.MX.sym("params", *model.params.shape)  # Parameters
 42num_var = dim * N_horiz
 43U = cs.MX.sym("U", num_var)  # Control signals over horizon
 44U_mat = model.input_to_matrix(U)  # Input as dim by N_horiz matrix
 45constr_param = cs.MX.sym("c", 3)  # Coefficients of cubic constraint function
 46mpc_param = cs.vertcat(y_init, model_params, constr_param)  # All parameters
 47
 48# Cost
 49
 50# Stage costs for states and input
 51stage_y_cost, stage_u_cost = model.generate_cost_funcs()
 52# Simulate the model with the input over the horizon
 53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model_params)
 54# Accumulate the cost of the outputs and inputs
 55mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim))
 56mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat))
 57mpc_cost = mpc_y_cost + mpc_u_cost
 58
 59# Constraints
 60
 61# Cubic constraint function for a single ball in one dimension
 62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x
 63# Constraint function for one stage (N balls)
 64y_c = cs.MX.sym("y_c", y_dist.shape[0])
 65constr = []
 66for i in range(N):  # for each ball in the stage except the last,
 67    yx_n = y_c[dim * i]  # constrain the x, y position of the ball
 68    yy_n = y_c[dim * i + dim - 1]
 69    constr += [yy_n - g_constr(constr_param, yx_n)]
 70constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])]  # Ball N+1
 71constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)])
 72# Constraint function for all stages in the horizon
 73mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param))
 74num_constr = (N + 1) * N_horiz
 75# Fill in the constraint coefficients c(x-a)³ + d(x-a) + b
 76a, b, c, d = 0.6, -1.4, 5, 2.2
 77constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
 78constr_lb = b - c * a**3 - d * a
 79# Box constraints on actuator:
 80C = -1 * np.ones(num_var), +1 * np.ones(num_var)  # lower bound, upper bound
 81# Constant term of the cubic state constraints as a one-sided box:
 82D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr)
 83
 84# Initial parameter value
 85
 86y_n = np.array(y_dist[:, -1]).ravel()  # Initial state of the chain
 87n_state = y_n.shape[0]
 88param_0 = np.concatenate((y_n, model_param, constr_coeff))
 89
 90# %% NLP formulation
 91
 92import alpaqa
 93
 94# Generate C code for the cost and constraint functions, compile them, and load
 95# them as an alpaqa problem description:
 96problem = (
 97    alpaqa.minimize(mpc_cost, U)  # objective and variables         f(x; p)
 98    .subject_to_box(C)  #           box constraints on variables    x ∊ C
 99    .subject_to(mpc_constr, D)  #   general constraints             g(x; p) ∊ D
100    .with_param(mpc_param)  #       parameter to be changed later   p
101    .with_param_value(param_0)  #   initial parameter value
102    .with_name(f"hanging_chain_{N_horiz}")  #  name used in generated files
103).compile(sym=cs.MX.sym)
104
105# %% NLP solver
106
107from datetime import timedelta
108
109# Configure an alpaqa solver:
110solver = alpaqa.ALMSolver(
111    alm_params={
112        "tolerance": 1e-3,
113        "dual_tolerance": 1e-3,
114        "initial_penalty": 1e4,
115        "max_iter": 100,
116        "max_time": timedelta(seconds=0.2),
117    },
118    inner_solver=alpaqa.PANOCSolver(
119        panoc_params={
120            "stop_crit": alpaqa.FPRNorm,
121            "max_time": timedelta(seconds=0.02),
122        },
123        lbfgs_params={"memory": N_horiz},
124    ),
125)
126
127# %% MPC controller
128
129
130# Wrap the solver in a class that solves the optimal control problem at each
131# time step, implementing warm starting:
132class MPCController:
133    def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem):
134        self.model = model
135        self.problem = problem
136        self.tot_it = 0
137        self.tot_time = timedelta()
138        self.max_time = timedelta()
139        self.failures = 0
140        self.U = np.zeros(problem.num_variables)
141        self.λ = np.zeros(problem.num_constraints)
142
143    def __call__(self, y_n):
144        y_n = np.array(y_n).ravel()
145        # Set the current state as the initial state
146        self.problem.param[: y_n.shape[0]] = y_n
147        # Shift over the previous solution and Lagrange multipliers
148        self.U = np.concatenate((self.U[dim:], self.U[-dim:]))
149        self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :]))
150        # Solve the optimal control problem
151        # (warm start using the shifted previous solution and multipliers)
152        self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
153        # Print some solver statistics
154        print(
155            f'{stats["status"]} outer={stats["outer_iterations"]} '
156            f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} '
157            f'failures={stats["inner_convergence_failures"]}'
158        )
159        self.tot_it += stats["inner"]["iterations"]
160        self.failures += stats["status"] != alpaqa.SolverStatus.Converged
161        self.tot_time += stats["elapsed_time"]
162        self.max_time = max(self.max_time, stats["elapsed_time"])
163        # Print the Lagrange multipliers, shows that constraints are active
164        print(np.linalg.norm(self.λ))
165        # Return the optimal control signal for the first time step
166        return self.model.input_to_matrix(self.U)[:, 0]
167
168
169# %% Simulate the system using the MPC controller
170
171problem.param = param_0
172
173y_mpc = np.empty((n_state, N_sim))
174controller = MPCController(model, problem)
175for n in range(N_sim):
176    # Solve the optimal control problem:
177    u_n = controller(y_n)
178    # Apply the first optimal control input to the system and simulate for
179    # one time step, then update the state:
180    y_n = model.simulate(1, y_n, u_n, model_param).T
181    y_mpc[:, n] = y_n
182y_mpc = np.hstack((y_dist, y_mpc))
183
184print(f"{controller.tot_it} inner iterations, {controller.failures} failures")
185print(
186    f"time: {controller.tot_time} (total), {controller.max_time} (max), "
187    f"{controller.tot_time / N_sim} (avg)"
188)
189
190# %% Visualize the results
191
192import matplotlib as mpl
193import matplotlib.pyplot as plt
194from matplotlib import animation, patheffects
195
196mpl.rcParams["animation.frame_format"] = "svg"
197
198# Plot the chains
199fig, ax = plt.subplots()
200x, y, z = model.state_to_pos(y_null)
201(line,) = ax.plot(x, y, "-o", label="Without MPC")
202(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC")
203plt.legend()
204plt.ylim([-2.5, 1])
205plt.xlim([-0.25, 1.25])
206
207# Plot the state constraints
208x = np.linspace(-0.25, 1.25, 256)
209y = np.linspace(-2.5, 1, 256)
210X, Y = np.meshgrid(x, y)
211Z = g_constr(constr_coeff, X) + constr_lb - Y
212fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
213cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8)
214cgc.set_path_effects(fx)
215
216
217class Animation:
218    def __init__(self) -> None:
219        self.points = []
220
221    def __call__(self, i):
222        x, y, z = model.state_to_pos(y_sim[:, i])
223        y = z if dim == 3 else y
224        for p in self.points:
225            p.remove()
226        self.points = []
227        line.set_xdata(x)
228        line.set_ydata(y)
229        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
230        if np.sum(viol):
231            self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
232        x, y, z = model.state_to_pos(y_mpc[:, i])
233        y = z if dim == 3 else y
234        line_ctrl.set_xdata(x)
235        line_ctrl.set_ydata(y)
236        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
237        if np.sum(viol):
238            self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
239        return [line, line_ctrl, *self.points]
240
241
242ani = animation.FuncAnimation(
243    fig,
244    Animation(),
245    interval=1000 * Ts,
246    blit=True,
247    repeat=True,
248    frames=1 + N_dist + N_sim,
249)
250
251# Show the animation
252plt.show()

hanging_chain_dynamics.py

  1from typing import Union
  2
  3import casadi as cs
  4import numpy as np
  5from casadi import vertcat as vc
  6
  7
  8class HangingChain:
  9
 10    def __init__(self, N: int, dim: int, Ts: float = 0.05):
 11        self.N = N
 12        self.dim = dim
 13        self.nx = 2 * dim * N + dim
 14
 15        self.y1 = cs.SX.sym("y1", dim * N, 1)  # state: balls 1→N positions
 16        self.y2 = cs.SX.sym("y2", dim * N, 1)  # state: balls 1→N velocities
 17        self.y3 = cs.SX.sym("y3", dim, 1)  # state: ball  1+N position
 18        self.u = cs.SX.sym("u", dim, 1)  # input: ball  1+N velocity
 19        self.y = vc(self.y1, self.y2, self.y3)  # full state vector
 20
 21        self.m = cs.SX.sym("m")  # mass
 22        self.D = cs.SX.sym("D")  # spring constant
 23        self.L = cs.SX.sym("L")  # spring length
 24        self.params = vc(self.m, self.D, self.L)
 25
 26        self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81])  # gravity
 27        self.x0 = np.zeros((dim,))  # ball 0 position
 28        self.x_end = np.eye(1, dim, 0).ravel()  # ball N+1 reference position
 29
 30        self._build_dynamics(Ts)
 31
 32    def _build_dynamics(self, Ts):
 33        y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
 34        dist = lambda xa, xb: cs.norm_2(xa - xb)
 35        N, d = self.N, self.dim
 36        p = self.params
 37
 38        # Continuous-time dynamics y' = f(y, u; p)
 39
 40        f1 = [y2]
 41        f2 = []
 42        for i in range(N):
 43            xi = y1[d * i : d * i + d]
 44            xip1 = y1[d * i + d : d * i + d * 2] if i < N - 1 else y3
 45            Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
 46            xim1 = y1[d * i - d : d * i] if i > 0 else self.x0
 47            Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
 48            fi = (Fiip1 - Fim1i) / self.m + self.g
 49            f2 += [fi]
 50        f3 = [u]
 51
 52        f_expr = vc(*f1, *f2, *f3)
 53        self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
 54
 55        # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
 56
 57        # 4th order Runge-Kutta integrator
 58        k1 = self.f(y, u, p)
 59        k2 = self.f(y + Ts * k1 / 2, u, p)
 60        k3 = self.f(y + Ts * k2 / 2, u, p)
 61        k4 = self.f(y + Ts * k3, u, p)
 62
 63        # Discrete-time dynamics
 64        f_d_expr = y + (Ts / 6) * (k1 + 2 * k2 + 2 * k3 + k4)
 65        self.f_d = cs.Function("f", [y, u, p], [f_d_expr])
 66
 67        return self.f_d
 68
 69    def state_to_pos(self, y):
 70        N, d = self.N, self.dim
 71        rav = lambda x: np.array(x).ravel()
 72        xdim = lambda y, i: np.concatenate(([0], rav(y[i : d * N : d]), rav(y[-d + i])))
 73        if d == 3:
 74            return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
 75        else:
 76            return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1,)))
 77
 78    def input_to_matrix(self, u):
 79        """
 80        Reshape the input signal from a vector into a dim × N_horiz matrix (note
 81        that CasADi matrices are stored column-wise and NumPy arrays row-wise)
 82        """
 83        if isinstance(u, np.ndarray):
 84            return u.reshape((self.dim, u.shape[0] // self.dim), order="F")
 85        else:
 86            return u.reshape((self.dim, u.shape[0] // self.dim))
 87
 88    def state_to_matrix(self, y):
 89        """
 90        Reshape the state sequence from a vector into a n × (N_horiz+1) matrix (note
 91        that CasADi matrices are stored column-wise and NumPy arrays row-wise)
 92        """
 93        if isinstance(y, np.ndarray):
 94            return y.reshape((self.nx, y.shape[0] // self.nx), order="F")
 95        else:
 96            return y.reshape((self.nx, y.shape[0] // self.nx))
 97
 98    def simulate(
 99        self,
100        N_sim: int,
101        y_0: np.ndarray,
102        u: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
103        p: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
104    ):
105        if isinstance(u, list):
106            u = np.array(u)
107        if (
108            isinstance(u, np.ndarray)
109            and (u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1))
110            and u.shape[0] == self.dim
111        ):
112            u = np.tile(u, (N_sim, 1)).T
113        return self.f_d.mapaccum(N_sim)(y_0, u, p)
114
115    def initial_state(self):
116        N, d = self.N, self.dim
117        y1_0 = np.zeros(d * N)
118        y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
119        y2_0 = np.zeros(d * N)
120        y3_0 = np.zeros((d,))
121        y3_0[0] = 1
122
123        y_null = np.concatenate((y1_0, y2_0, y3_0))
124        u_null = np.zeros((d,))
125
126        assert self.nx == len(y_null)
127
128        return y_null, u_null
129
130    def generate_cost_funcs(self, α=25, β=1, γ=0.01):
131        N, d = self.N, self.dim
132        y1t = cs.SX.sym("y1t", d * N, 1)
133        y2t = cs.SX.sym("y2t", d * N, 1)
134        y3t = cs.SX.sym("y3t", d, 1)
135        ut = cs.SX.sym("ut", d, 1)
136        yt = cs.vertcat(y1t, y2t, y3t)
137
138        L_cost_x = α * cs.sumsqr(y3t - self.x_end)
139        for i in range(N):
140            xdi = y2t[d * i : d * i + d]
141            L_cost_x += β * cs.sumsqr(xdi)
142        L_cost_u = γ * cs.sumsqr(ut)
143        return cs.Function("L_cost_x", [yt], [L_cost_x]), cs.Function(
144            "L_cost_u", [ut], [L_cost_u]
145        )