Hanging Chain#

In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.

The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.

  1## @example mpc/python/hanging-chain/hanging-chain-mpc.py
  2# This example shows how to call the alpaqa solver from Python code, applied
  3# to a challenging model predictive control problem.
  4
  5# %% Hanging chain MPC example
  6
  7import casadi as cs
  8import numpy as np
  9import os
 10from os.path import join, dirname
 11import sys
 12
 13sys.path.append(dirname(__file__))
 14from hanging_chain_dynamics import HangingChain
 15
 16# %% Build the model
 17
 18Ts = 0.05
 19N = 6  # number of balls
 20dim = 2  # dimension
 21
 22model = HangingChain(N, dim)
 23f_d = model.dynamics(Ts)
 24y_null, u_null = model.initial_state()
 25
 26param = [0.03, 1.6, 0.033 / N]  # Concrete parameters m, D, L
 27
 28# %% Apply an initial input to disturb the system
 29
 30N_dist = 3
 31u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]
 32y_dist = model.simulate(N_dist, y_null, u_dist, param)
 33y_dist = np.hstack((np.array([y_null]).T, y_dist))
 34
 35# %% Simulate the system without a controller
 36
 37N_sim = 180
 38y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, param)
 39y_sim = np.hstack((y_dist, y_sim))
 40
 41# %% Define MPC cost and constraints
 42
 43N_horiz = 12
 44
 45L_cost = model.generate_cost_fun()  # stage cost
 46y_init = cs.SX.sym("y_init", *y_null.shape)  # initial state
 47U = cs.SX.sym("U", dim * N_horiz)  # control signals over horizon
 48constr_param = cs.SX.sym("c", 3)  # Coefficients of cubic constraint function
 49mpc_param = cs.vertcat(y_init, model.params, constr_param)  # all parameters
 50U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
 51
 52# Cost
 53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model.params)
 54mpc_cost = 0
 55for n in range(N_horiz):  # Apply the stage cost function to each stage
 56    y_n = mpc_sim[:, n]
 57    u_n = U_mat[:, n]
 58    mpc_cost += L_cost(y_n, u_n)
 59mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_cost])
 60
 61# Constraints
 62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x  # Cubic constr
 63constr = []
 64for n in range(N_horiz):  # For each stage,
 65    y_n = mpc_sim[:, n]
 66    for i in range(N):  # for each ball in the stage,
 67        yx_n = y_n[dim * i]  # constrain the x, y position of the ball
 68        yy_n = y_n[dim * i + dim - 1]
 69        constr += [yy_n - g_constr(constr_param, yx_n)]
 70    constr += [y_n[-1] - g_constr(constr_param, y_n[-dim])]  # Ball N+1
 71mpc_constr_fun = cs.Function("g", [U, mpc_param], [cs.vertcat(*constr)])
 72
 73# Fill in the constraint coefficients c(x-a)³ + d(x - a) + b
 74a, b, c, d = 0.6, -1.4, 5, 2.2
 75constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
 76constr_lb = b - c * a**3 - d * a
 77
 78# %% NLP formulation
 79import panocpy as pa
 80
 81prob = pa.generate_and_compile_casadi_problem(mpc_cost_fun, mpc_constr_fun)
 82prob.C.lowerbound = -1 * np.ones((dim * N_horiz, ))
 83prob.C.upperbound = +1 * np.ones((dim * N_horiz, ))
 84prob.D.lowerbound = constr_lb * np.ones((len(constr), ))
 85
 86# %% NLP solver
 87from datetime import timedelta
 88
 89solver = pa.ALMSolver(
 90    alm_params={
 91        'ε': 1e-4,
 92        'δ': 1e-4,
 93        'Σ_0': 1e5,
 94        'max_time': timedelta(seconds=0.5),
 95    },
 96    inner_solver=pa.StructuredPANOCLBFGSSolver(
 97        panoc_params={
 98            'stop_crit': pa.ProjGradNorm2,
 99            'max_time': timedelta(seconds=0.2),
100            'hessian_step_size_heuristic': 15,
101        },
102        lbfgs_params={'memory': N_horiz},
103    ),
104)
105
106# %% MPC controller
107
108
109class MPCController:
110    tot_it = 0
111    failures = 0
112    U = np.zeros((N_horiz * dim, ))
113    λ = np.zeros(((N + 1) * N_horiz, ))
114
115    def __init__(self, model, problem):
116        self.model = model
117        self.problem = problem
118
119    def __call__(self, y_n):
120        y_n = np.array(y_n).ravel()
121        # Set the current state as the initial state
122        self.problem.param[:y_n.shape[0]] = y_n
123        # Solve the optimal control problem
124        # (warm start using the previous solution and Lagrange multipliers)
125        self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
126        # Print some solver statistics
127        print(stats['status'], stats['outer_iterations'],
128              stats['inner']['iterations'], stats['elapsed_time'],
129              stats['inner_convergence_failures'])
130        self.tot_it += stats['inner']['iterations']
131        self.failures += stats['status'] != pa.SolverStatus.Converged
132        # Print the Lagrange multipliers, shows that constraints are active
133        print(np.linalg.norm(self.λ))
134        # Return the optimal control signal for the first time step
135        return self.model.input_to_matrix(self.U)[:, 0]
136
137
138# %% Simulate the system using the MPC controller
139
140y_n = np.array(y_dist[:, -1]).ravel()  # initial state for controller
141n_state = y_n.shape[0]
142prob.param = np.concatenate((y_n, param, constr_coeff))
143
144y_mpc = np.empty((n_state, N_sim))
145controller = MPCController(model, prob)
146for n in range(N_sim):
147    # Solve the optimal control problem
148    u_n = controller(y_n)
149    # Apply the first optimal control signal to the system and simulate for
150    # one time step, then update the state
151    y_n = model.simulate(1, y_n, u_n, param).T
152    y_mpc[:, n] = y_n
153y_mpc = np.hstack((y_dist, y_mpc))
154
155print(controller.tot_it, controller.failures)
156
157# %% Visualize
158
159import matplotlib.pyplot as plt
160import matplotlib as mpl
161from matplotlib import patheffects
162
163mpl.rcParams['animation.frame_format'] = 'svg'
164
165fig, ax = plt.subplots()
166x, y, z = model.state_to_pos(y_null)
167line, = ax.plot(x, y, '-o', label='Without MPC')
168line_ctrl, = ax.plot(x, y, '-o', label='With MPC')
169plt.legend()
170plt.ylim([-2.5, 1])
171plt.xlim([-0.25, 1.25])
172
173x = np.linspace(-0.25, 1.25, 256)
174y = np.linspace(-2.5, 1, 256)
175X, Y = np.meshgrid(x, y)
176Z = g_constr(constr_coeff, X) + constr_lb - Y
177
178fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
179cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8)
180plt.setp(cgc.collections, path_effects=fx)
181
182
183class Animation:
184    points = []
185
186    def __call__(self, i):
187        x, y, z = model.state_to_pos(y_sim[:, i])
188        for p in self.points:
189            p.remove()
190        self.points = []
191        line.set_xdata(x)
192        line.set_ydata(y)
193        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
194        if np.sum(viol):
195            self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
196        x, y, z = model.state_to_pos(y_mpc[:, i])
197        line_ctrl.set_xdata(x)
198        line_ctrl.set_ydata(y)
199        viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
200        if np.sum(viol):
201            self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
202        return [line, line_ctrl] + self.points
203
204
205ani = mpl.animation.FuncAnimation(fig,
206                                  Animation(),
207                                  interval=1000 * Ts,
208                                  blit=True,
209                                  repeat=True,
210                                  frames=1 + N_dist + N_sim)
211
212# Export the animation
213out = join(dirname(__file__), '..', '..', '..', '..', 'sphinx', 'source',
214           'sphinxstatic', 'hanging-chain.html')
215os.makedirs(dirname(out), exist_ok=True)
216with open(out, "w") as f:
217    f.write('<center>')
218    f.write(ani.to_jshtml())
219    f.write('</center>')
220
221# Show the animation
222plt.show()
  1from typing import Union
  2import casadi as cs
  3import numpy as np
  4from casadi import vertcat as vc
  5
  6
  7class HangingChain:
  8    def __init__(self, N: int, dim: int):
  9        self.N = N
 10        self.dim = dim
 11
 12        self.y1 = cs.SX.sym("y1", dim * N, 1)  # state: balls 1→N positions
 13        self.y2 = cs.SX.sym("y2", dim * N, 1)  # state: balls 1→N velocities
 14        self.y3 = cs.SX.sym("y3", dim, 1)  # state: ball  1+N position
 15        self.u = cs.SX.sym("u", dim, 1)  # input: ball  1+N velocity
 16        self.y = vc(self.y1, self.y2, self.y3)  # full state vector
 17
 18        self.m = cs.SX.sym("m")  # mass
 19        self.D = cs.SX.sym("D")  # spring constant
 20        self.L = cs.SX.sym("L")  # spring length
 21
 22        self.params = vc(self.m, self.D, self.L)
 23
 24        self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81])  # gravity
 25        self.x0 = np.zeros((dim, ))  # ball 0 position
 26        self.x_end = np.eye(1, dim, 0).ravel()  # ball N+1 reference position
 27
 28    def dynamics(self, Ts=0.05):
 29        y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
 30        dist = lambda xa, xb: cs.norm_2(xa - xb)
 31        N, d = self.N, self.dim
 32        p = self.params
 33
 34        # Continuous-time dynamics y' = f(y, u; p)
 35
 36        f1 = [y2]
 37        f2 = []
 38        for i in range(N):
 39            xi = y1[d * i:d * i + d]
 40            xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
 41            Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
 42            xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
 43            Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
 44            fi = (Fiip1 - Fim1i) / self.m + self.g
 45            f2 += [fi]
 46        f3 = [u]
 47
 48        f_expr = vc(*f1, *f2, *f3)
 49        self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
 50
 51        # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
 52
 53        opt = {"tf": Ts, "simplify": True, "number_of_finite_elements": 4}
 54        intg = cs.integrator("intg", "rk", {
 55            "x": y,
 56            "p": vc(u, p),
 57            "ode": f_expr
 58        }, opt)
 59
 60        f_d_expr = intg(x0=y, p=vc(u, p))["xf"]
 61        self.f_d = cs.Function("f_d", [y, u, p], [f_d_expr], ["y", "u", "p"],
 62                               ["y+"])
 63
 64        return self.f_d
 65
 66    def state_to_pos(self, y):
 67        N, d = self.N, self.dim
 68        rav = lambda x: np.array(x).ravel()
 69        xdim = lambda y, i: np.concatenate(
 70            ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
 71        if d == 3:
 72            return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
 73        else:
 74            return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
 75
 76    def input_to_matrix(self, u):
 77        """
 78        Reshape the input signal from a vector into a dim × N_horiz matrix (note
 79        that CasADi matrices are stored column-wise and NumPy arrays row-wise)
 80        """
 81        if isinstance(u, np.ndarray):
 82            return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
 83        else:
 84            return u.reshape((self.dim, u.shape[0] // self.dim))
 85
 86    def simulate(self, N_sim: int, y_0: np.ndarray, u: Union[np.ndarray, list,
 87                                                             cs.SX.sym],
 88                 p: Union[np.ndarray, list, cs.SX.sym]):
 89        if isinstance(u, list):
 90            u = np.array(u)
 91        if isinstance(u, np.ndarray):
 92            if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
 93                if u.shape[0] == self.dim:
 94                    u = np.tile(u, (N_sim, 1)).T
 95        return self.f_d.mapaccum(N_sim)(y_0, u, p)
 96
 97    def initial_state(self):
 98        N, d = self.N, self.dim
 99        y1_0 = np.zeros((d * N))
100        y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
101        y2_0 = np.zeros((d * N))
102        y3_0 = np.zeros((d, ))
103        y3_0[0] = 1
104
105        y_null = np.concatenate((y1_0, y2_0, y3_0))
106        u_null = np.zeros((d, ))
107
108        return y_null, u_null
109
110    def generate_cost_fun(self, α=25, β=1, γ=0.01):
111        N, d = self.N, self.dim
112        y1t = cs.SX.sym("y1t", d * N, 1)
113        y2t = cs.SX.sym("y2t", d * N, 1)
114        y3t = cs.SX.sym("y3t", d, 1)
115        ut = cs.SX.sym("ut", d, 1)
116        yt = cs.vertcat(y1t, y2t, y3t)
117
118        L_cost = α * cs.sumsqr(y3t - self.x_end) + γ * cs.sumsqr(ut)
119        for i in range(N):
120            xdi = y2t[d * i:d * i + d]
121            L_cost += β * cs.sumsqr(xdi)
122        return cs.Function("L_cost", [yt, ut], [L_cost])