Hanging Chain#

In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.

The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.

  1# %% Hanging chain MPC example
  2
  3import casadi as cs
  4import numpy as np
  5from os.path import dirname
  6import sys
  7
  8sys.path.append(dirname(__file__))
  9from hanging_chain_dynamics import HangingChain
 10
 11# %% Build the model
 12
 13Ts = 0.05  # Time step [s]
 14N = 6  # Number of balls
 15dim = 2  # Dimension (2D or 3D)
 16
 17model = HangingChain(N, dim, Ts)
 18y_null, u_null = model.initial_state()  # Initial states and control inputs
 19
 20model_param = [0.03, 1.6, 0.033 / N]  # Concrete parameters m, D, L
 21
 22# %% Apply an initial control input to disturb the system
 23
 24N_dist = 3  # Number of time steps to apply the disturbance for
 25u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]  # Disturbance input
 26y_dist = model.simulate(N_dist, y_null, u_dist, model_param)  # Model states
 27y_dist = np.hstack((np.array([y_null]).T, y_dist))  # (including initial state)
 28
 29# %% Simulate the system without a controller
 30
 31N_sim = 180  # Number of time steps to simulate for
 32y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, model_param)  # States
 33y_sim = np.hstack((y_dist, y_sim))  # (including disturbed and initial states)
 34
 35# %% Define MPC cost and constraints
 36
 37N_horiz = 12  # MPC horizon length (number of time steps)
 38
 39y_init = cs.MX.sym("y_init", *y_null.shape)  # Initial state
 40model_params = cs.MX.sym("params", *model.params.shape)  # Parameters
 41num_var = dim * N_horiz
 42U = cs.MX.sym("U", num_var)  # Control signals over horizon
 43U_mat = model.input_to_matrix(U)  # Input as dim by N_horiz matrix
 44constr_param = cs.MX.sym("c", 3)  # Coefficients of cubic constraint function
 45mpc_param = cs.vertcat(y_init, model_params, constr_param)  # All parameters
 46
 47# Cost
 48
 49# Stage costs for states and input
 50stage_y_cost, stage_u_cost = model.generate_cost_funcs()
 51# Simulate the model with the input over the horizon
 52mpc_sim = model.simulate(N_horiz, y_init, U_mat, model_params)
 53# Accumulate the cost of the outputs and inputs
 54mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim))
 55mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat))
 56mpc_cost = mpc_y_cost + mpc_u_cost
 57
 58# Constraints
 59
 60# Cubic constraint function for a single ball in one dimension
 61g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x
 62# Constraint function for one stage (N balls)
 63y_c = cs.MX.sym("y_c", y_dist.shape[0])
 64constr = []
 65for i in range(N):  # for each ball in the stage except the last,
 66    yx_n = y_c[dim * i]  # constrain the x, y position of the ball
 67    yy_n = y_c[dim * i + dim - 1]
 68    constr += [yy_n - g_constr(constr_param, yx_n)]
 69constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])]  # Ball N+1
 70constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)])
 71# Constraint function for all stages in the horizon
 72mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param))
 73num_constr = (N + 1) * N_horiz
 74# Fill in the constraint coefficients c(x-a)³ + d(x-a) + b
 75a, b, c, d = 0.6, -1.4, 5, 2.2
 76constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
 77constr_lb = b - c * a**3 - d * a
 78# Box constraints on actuator:
 79C = -1 * np.ones(num_var), +1 * np.ones(num_var)  # lower bound, upper bound
 80# Constant term of the cubic state constraints as a one-sided box:
 81D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr)
 82
 83# Initial parameter value
 84
 85y_n = np.array(y_dist[:, -1]).ravel()  # Initial state of the chain
 86n_state = y_n.shape[0]
 87param_0 = np.concatenate((y_n, model_param, constr_coeff))
 88
 89# %% NLP formulation
 90
 91import alpaqa
 92
 93# Generate C code for the cost and constraint functions, compile them, and load
 94# them as an alpaqa problem description:
 95problem = (
 96    alpaqa.minimize(mpc_cost, U)  # objective and variables         f(x; p)
 97    .subject_to_box(C)  #           box constraints on variables    x ∊ C
 98    .subject_to(mpc_constr, D)  #   general constraints             g(x; p) ∊ D
 99    .with_param(mpc_param)  #       parameter to be changed later   p
100    .with_param_value(param_0)  #   initial parameter value
101    .with_name(f"hanging_chain_{N_horiz}")  #  name used in generated files
102).compile(sym=cs.MX.sym)
103
104# %% NLP solver
105
106from datetime import timedelta
107
108# Configure an alpaqa solver:
109solver = alpaqa.ALMSolver(
110    alm_params={
111        "tolerance": 1e-3,
112        "dual_tolerance": 1e-3,
113        "initial_penalty": 1e4,
114        "max_iter": 100,
115        "max_time": timedelta(seconds=0.2),
116    },
117    inner_solver=alpaqa.PANOCSolver(
118        panoc_params={
119            "stop_crit": alpaqa.FPRNorm,
120            "max_time": timedelta(seconds=0.02),
121        },
122        lbfgs_params={"memory": N_horiz},
123    ),
124)
125
126# %% MPC controller
127
128
129# Wrap the solver in a class that solves the optimal control problem at each
130# time step, implementing warm starting:
131class MPCController:
132    def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem):
133        self.model = model
134        self.problem = problem
135        self.tot_it = 0
136        self.tot_time = timedelta()
137        self.max_time = timedelta()
138        self.failures = 0
139        self.U = np.zeros(problem.n)
140        self.λ = np.zeros(problem.m)
141
142    def __call__(self, y_n):
143        y_n = np.array(y_n).ravel()
144        # Set the current state as the initial state
145        self.problem.param[: y_n.shape[0]] = y_n
146        # Shift over the previous solution and Lagrange multipliers
147        self.U = np.concatenate((self.U[dim:], self.U[-dim:]))
148        self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :]))
149        # Solve the optimal control problem
150        # (warm start using the shifted previous solution and multipliers)
151        self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
152        # Print some solver statistics
153        print(
154            f'{stats["status"]} outer={stats["outer_iterations"]} '
155            f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} '
156            f'failures={stats["inner_convergence_failures"]}'
157        )
158        self.tot_it += stats["inner"]["iterations"]
159        self.failures += stats["status"] != alpaqa.SolverStatus.Converged
160        self.tot_time += stats["elapsed_time"]
161        self.max_time = max(self.max_time, stats["elapsed_time"])
162        # Print the Lagrange multipliers, shows that constraints are active
163        print(np.linalg.norm(self.λ))
164        # Return the optimal control signal for the first time step
165        return self.model.input_to_matrix(self.U)[:, 0]
166
167
168# %% Simulate the system using the MPC controller
169
170problem.param = param_0
171
172y_mpc = np.empty((n_state, N_sim))
173controller = MPCController(model, problem)
174for n in range(N_sim):
175    # Solve the optimal control problem:
176    u_n = controller(y_n)
177    # Apply the first optimal control input to the system and simulate for
178    # one time step, then update the state:
179    y_n = model.simulate(1, y_n, u_n, model_param).T
180    y_mpc[:, n] = y_n
181y_mpc = np.hstack((y_dist, y_mpc))
182
183print(f"{controller.tot_it} inner iterations, {controller.failures} failures")
184print(
185    f"time: {controller.tot_time} (total), {controller.max_time} (max), "
186    f"{controller.tot_time / N_sim} (avg)"
187)
188
189# %% Visualize the results
190
191import matplotlib.pyplot as plt
192import matplotlib as mpl
193from matplotlib import animation, patheffects
194
195mpl.rcParams["animation.frame_format"] = "svg"
196
197# Plot the chains
198fig, ax = plt.subplots()
199x, y, z = model.state_to_pos(y_null)
200(line,) = ax.plot(x, y, "-o", label="Without MPC")
201(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC")
202plt.legend()
203plt.ylim([-2.5, 1])
204plt.xlim([-0.25, 1.25])
205
206# Plot the state constraints
207x = np.linspace(-0.25, 1.25, 256)
208y = np.linspace(-2.5, 1, 256)
209X, Y = np.meshgrid(x, y)
210Z = g_constr(constr_coeff, X) + constr_lb - Y
211fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
212cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8)
213plt.setp(cgc.collections, path_effects=fx)
214
215
216class Animation:
217    points = []
218
219    def __call__(self, i):
220        x, y, z = model.state_to_pos(y_sim[:, i])
221        y = z if dim == 3 else y
222        for p in self.points:
223            p.remove()
224        self.points = []
225        line.set_xdata(x)
226        line.set_ydata(y)
227        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
228        if np.sum(viol):
229            self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
230        x, y, z = model.state_to_pos(y_mpc[:, i])
231        y = z if dim == 3 else y
232        line_ctrl.set_xdata(x)
233        line_ctrl.set_ydata(y)
234        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
235        if np.sum(viol):
236            self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
237        return [line, line_ctrl] + self.points
238
239
240ani = animation.FuncAnimation(
241    fig,
242    Animation(),
243    interval=1000 * Ts,
244    blit=True,
245    repeat=True,
246    frames=1 + N_dist + N_sim,
247)
248
249# Show the animation
250plt.show()

hanging_chain_dynamics.py

  1from typing import Union
  2import casadi as cs
  3import numpy as np
  4from casadi import vertcat as vc
  5
  6
  7class HangingChain:
  8
  9    def __init__(self, N: int, dim: int, Ts: float = 0.05):
 10        self.N = N
 11        self.dim = dim
 12
 13        self.y1 = cs.SX.sym("y1", dim * N, 1)  # state: balls 1→N positions
 14        self.y2 = cs.SX.sym("y2", dim * N, 1)  # state: balls 1→N velocities
 15        self.y3 = cs.SX.sym("y3", dim, 1)  # state: ball  1+N position
 16        self.u = cs.SX.sym("u", dim, 1)  # input: ball  1+N velocity
 17        self.y = vc(self.y1, self.y2, self.y3)  # full state vector
 18
 19        self.m = cs.SX.sym("m")  # mass
 20        self.D = cs.SX.sym("D")  # spring constant
 21        self.L = cs.SX.sym("L")  # spring length
 22        self.params = vc(self.m, self.D, self.L)
 23
 24        self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81])  # gravity
 25        self.x0 = np.zeros((dim, ))  # ball 0 position
 26        self.x_end = np.eye(1, dim, 0).ravel()  # ball N+1 reference position
 27
 28        self._build_dynamics(Ts)
 29
 30    def _build_dynamics(self, Ts):
 31        y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
 32        dist = lambda xa, xb: cs.norm_2(xa - xb)
 33        N, d = self.N, self.dim
 34        p = self.params
 35
 36        # Continuous-time dynamics y' = f(y, u; p)
 37
 38        f1 = [y2]
 39        f2 = []
 40        for i in range(N):
 41            xi = y1[d * i:d * i + d]
 42            xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
 43            Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
 44            xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
 45            Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
 46            fi = (Fiip1 - Fim1i) / self.m + self.g
 47            f2 += [fi]
 48        f3 = [u]
 49
 50        f_expr = vc(*f1, *f2, *f3)
 51        self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
 52
 53        # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
 54
 55        # 4th order Runge-Kutta integrator
 56        k1 = self.f(y, u, p)
 57        k2 = self.f(y + Ts * k1 / 2, u, p)
 58        k3 = self.f(y + Ts * k2 / 2, u, p)
 59        k4 = self.f(y + Ts * k3, u, p)
 60
 61        # Discrete-time dynamics
 62        f_d_expr = y + (Ts / 6) * (k1 + 2 * k2 + 2 * k3 + k4)
 63        self.f_d = cs.Function("f", [y, u, p], [f_d_expr])
 64
 65        return self.f_d
 66
 67    def state_to_pos(self, y):
 68        N, d = self.N, self.dim
 69        rav = lambda x: np.array(x).ravel()
 70        xdim = lambda y, i: np.concatenate(
 71            ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
 72        if d == 3:
 73            return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
 74        else:
 75            return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
 76
 77    def input_to_matrix(self, u):
 78        """
 79        Reshape the input signal from a vector into a dim × N_horiz matrix (note
 80        that CasADi matrices are stored column-wise and NumPy arrays row-wise)
 81        """
 82        if isinstance(u, np.ndarray):
 83            return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
 84        else:
 85            return u.reshape((self.dim, u.shape[0] // self.dim))
 86
 87    def simulate(self, N_sim: int, y_0: np.ndarray,
 88                 u: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
 89                 p: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym]):
 90        if isinstance(u, list):
 91            u = np.array(u)
 92        if isinstance(u, np.ndarray):
 93            if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
 94                if u.shape[0] == self.dim:
 95                    u = np.tile(u, (N_sim, 1)).T
 96        return self.f_d.mapaccum(N_sim)(y_0, u, p)
 97
 98    def initial_state(self):
 99        N, d = self.N, self.dim
100        y1_0 = np.zeros((d * N))
101        y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
102        y2_0 = np.zeros((d * N))
103        y3_0 = np.zeros((d, ))
104        y3_0[0] = 1
105
106        y_null = np.concatenate((y1_0, y2_0, y3_0))
107        u_null = np.zeros((d, ))
108
109        return y_null, u_null
110
111    def generate_cost_funcs(self, α=25, β=1, γ=0.01):
112        N, d = self.N, self.dim
113        y1t = cs.SX.sym("y1t", d * N, 1)
114        y2t = cs.SX.sym("y2t", d * N, 1)
115        y3t = cs.SX.sym("y3t", d, 1)
116        ut = cs.SX.sym("ut", d, 1)
117        yt = cs.vertcat(y1t, y2t, y3t)
118
119        L_cost_x = α * cs.sumsqr(y3t - self.x_end)
120        for i in range(N):
121            xdi = y2t[d * i:d * i + d]
122            L_cost_x += β * cs.sumsqr(xdi)
123        L_cost_u = γ * cs.sumsqr(ut)
124        return cs.Function("L_cost_x", [yt], [L_cost_x]), \
125            cs.Function("L_cost_u", [ut], [L_cost_u])