Hanging Chain#
In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.
The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.
1# %% Hanging chain MPC example
2
3import casadi as cs
4import numpy as np
5from os.path import dirname
6import sys
7
8sys.path.append(dirname(__file__))
9from hanging_chain_dynamics import HangingChain
10
11# %% Build the model
12
13Ts = 0.05 # Time step [s]
14N = 6 # Number of balls
15dim = 2 # Dimension (2D or 3D)
16
17model = HangingChain(N, dim, Ts)
18y_null, u_null = model.initial_state() # Initial states and control inputs
19
20model_param = [0.03, 1.6, 0.033 / N] # Concrete parameters m, D, L
21
22# %% Apply an initial control input to disturb the system
23
24N_dist = 3 # Number of time steps to apply the disturbance for
25u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5] # Disturbance input
26y_dist = model.simulate(N_dist, y_null, u_dist, model_param) # Model states
27y_dist = np.hstack((np.array([y_null]).T, y_dist)) # (including initial state)
28
29# %% Simulate the system without a controller
30
31N_sim = 180 # Number of time steps to simulate for
32y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, model_param) # States
33y_sim = np.hstack((y_dist, y_sim)) # (including disturbed and initial states)
34
35# %% Define MPC cost and constraints
36
37N_horiz = 12 # MPC horizon length (number of time steps)
38
39y_init = cs.MX.sym("y_init", *y_null.shape) # Initial state
40model_params = cs.MX.sym("params", *model.params.shape) # Parameters
41num_var = dim * N_horiz
42U = cs.MX.sym("U", num_var) # Control signals over horizon
43U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
44constr_param = cs.MX.sym("c", 3) # Coefficients of cubic constraint function
45mpc_param = cs.vertcat(y_init, model_params, constr_param) # All parameters
46
47# Cost
48
49# Stage costs for states and input
50stage_y_cost, stage_u_cost = model.generate_cost_funcs()
51# Simulate the model with the input over the horizon
52mpc_sim = model.simulate(N_horiz, y_init, U_mat, model_params)
53# Accumulate the cost of the outputs and inputs
54mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim))
55mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat))
56mpc_cost = mpc_y_cost + mpc_u_cost
57
58# Constraints
59
60# Cubic constraint function for a single ball in one dimension
61g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x
62# Constraint function for one stage (N balls)
63y_c = cs.MX.sym("y_c", y_dist.shape[0])
64constr = []
65for i in range(N): # for each ball in the stage except the last,
66 yx_n = y_c[dim * i] # constrain the x, y position of the ball
67 yy_n = y_c[dim * i + dim - 1]
68 constr += [yy_n - g_constr(constr_param, yx_n)]
69constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])] # Ball N+1
70constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)])
71# Constraint function for all stages in the horizon
72mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param))
73num_constr = (N + 1) * N_horiz
74# Fill in the constraint coefficients c(x-a)³ + d(x-a) + b
75a, b, c, d = 0.6, -1.4, 5, 2.2
76constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
77constr_lb = b - c * a**3 - d * a
78# Box constraints on actuator:
79C = -1 * np.ones(num_var), +1 * np.ones(num_var) # lower bound, upper bound
80# Constant term of the cubic state constraints as a one-sided box:
81D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr)
82
83# Initial parameter value
84
85y_n = np.array(y_dist[:, -1]).ravel() # Initial state of the chain
86n_state = y_n.shape[0]
87param_0 = np.concatenate((y_n, model_param, constr_coeff))
88
89# %% NLP formulation
90
91import alpaqa
92
93# Generate C code for the cost and constraint functions, compile them, and load
94# them as an alpaqa problem description:
95problem = (
96 alpaqa.minimize(mpc_cost, U) # objective and variables f(x; p)
97 .subject_to_box(C) # box constraints on variables x ∊ C
98 .subject_to(mpc_constr, D) # general constraints g(x; p) ∊ D
99 .with_param(mpc_param) # parameter to be changed later p
100 .with_param_value(param_0) # initial parameter value
101 .with_name(f"hanging_chain_{N_horiz}") # name used in generated files
102).compile(sym=cs.MX.sym)
103
104# %% NLP solver
105
106from datetime import timedelta
107
108# Configure an alpaqa solver:
109solver = alpaqa.ALMSolver(
110 alm_params={
111 "tolerance": 1e-3,
112 "dual_tolerance": 1e-3,
113 "initial_penalty": 1e4,
114 "max_iter": 100,
115 "max_time": timedelta(seconds=0.2),
116 },
117 inner_solver=alpaqa.PANOCSolver(
118 panoc_params={
119 "stop_crit": alpaqa.FPRNorm,
120 "max_time": timedelta(seconds=0.02),
121 },
122 lbfgs_params={"memory": N_horiz},
123 ),
124)
125
126# %% MPC controller
127
128
129# Wrap the solver in a class that solves the optimal control problem at each
130# time step, implementing warm starting:
131class MPCController:
132 def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem):
133 self.model = model
134 self.problem = problem
135 self.tot_it = 0
136 self.tot_time = timedelta()
137 self.max_time = timedelta()
138 self.failures = 0
139 self.U = np.zeros(problem.n)
140 self.λ = np.zeros(problem.m)
141
142 def __call__(self, y_n):
143 y_n = np.array(y_n).ravel()
144 # Set the current state as the initial state
145 self.problem.param[: y_n.shape[0]] = y_n
146 # Shift over the previous solution and Lagrange multipliers
147 self.U = np.concatenate((self.U[dim:], self.U[-dim:]))
148 self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :]))
149 # Solve the optimal control problem
150 # (warm start using the shifted previous solution and multipliers)
151 self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
152 # Print some solver statistics
153 print(
154 f'{stats["status"]} outer={stats["outer_iterations"]} '
155 f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} '
156 f'failures={stats["inner_convergence_failures"]}'
157 )
158 self.tot_it += stats["inner"]["iterations"]
159 self.failures += stats["status"] != alpaqa.SolverStatus.Converged
160 self.tot_time += stats["elapsed_time"]
161 self.max_time = max(self.max_time, stats["elapsed_time"])
162 # Print the Lagrange multipliers, shows that constraints are active
163 print(np.linalg.norm(self.λ))
164 # Return the optimal control signal for the first time step
165 return self.model.input_to_matrix(self.U)[:, 0]
166
167
168# %% Simulate the system using the MPC controller
169
170problem.param = param_0
171
172y_mpc = np.empty((n_state, N_sim))
173controller = MPCController(model, problem)
174for n in range(N_sim):
175 # Solve the optimal control problem:
176 u_n = controller(y_n)
177 # Apply the first optimal control input to the system and simulate for
178 # one time step, then update the state:
179 y_n = model.simulate(1, y_n, u_n, model_param).T
180 y_mpc[:, n] = y_n
181y_mpc = np.hstack((y_dist, y_mpc))
182
183print(f"{controller.tot_it} inner iterations, {controller.failures} failures")
184print(
185 f"time: {controller.tot_time} (total), {controller.max_time} (max), "
186 f"{controller.tot_time / N_sim} (avg)"
187)
188
189# %% Visualize the results
190
191import matplotlib.pyplot as plt
192import matplotlib as mpl
193from matplotlib import animation, patheffects
194
195mpl.rcParams["animation.frame_format"] = "svg"
196
197# Plot the chains
198fig, ax = plt.subplots()
199x, y, z = model.state_to_pos(y_null)
200(line,) = ax.plot(x, y, "-o", label="Without MPC")
201(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC")
202plt.legend()
203plt.ylim([-2.5, 1])
204plt.xlim([-0.25, 1.25])
205
206# Plot the state constraints
207x = np.linspace(-0.25, 1.25, 256)
208y = np.linspace(-2.5, 1, 256)
209X, Y = np.meshgrid(x, y)
210Z = g_constr(constr_coeff, X) + constr_lb - Y
211fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
212cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8)
213plt.setp(cgc.collections, path_effects=fx)
214
215
216class Animation:
217 points = []
218
219 def __call__(self, i):
220 x, y, z = model.state_to_pos(y_sim[:, i])
221 y = z if dim == 3 else y
222 for p in self.points:
223 p.remove()
224 self.points = []
225 line.set_xdata(x)
226 line.set_ydata(y)
227 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
228 if np.sum(viol):
229 self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
230 x, y, z = model.state_to_pos(y_mpc[:, i])
231 y = z if dim == 3 else y
232 line_ctrl.set_xdata(x)
233 line_ctrl.set_ydata(y)
234 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
235 if np.sum(viol):
236 self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
237 return [line, line_ctrl] + self.points
238
239
240ani = animation.FuncAnimation(
241 fig,
242 Animation(),
243 interval=1000 * Ts,
244 blit=True,
245 repeat=True,
246 frames=1 + N_dist + N_sim,
247)
248
249# Show the animation
250plt.show()
hanging_chain_dynamics.py
1from typing import Union
2import casadi as cs
3import numpy as np
4from casadi import vertcat as vc
5
6
7class HangingChain:
8
9 def __init__(self, N: int, dim: int, Ts: float = 0.05):
10 self.N = N
11 self.dim = dim
12
13 self.y1 = cs.SX.sym("y1", dim * N, 1) # state: balls 1→N positions
14 self.y2 = cs.SX.sym("y2", dim * N, 1) # state: balls 1→N velocities
15 self.y3 = cs.SX.sym("y3", dim, 1) # state: ball 1+N position
16 self.u = cs.SX.sym("u", dim, 1) # input: ball 1+N velocity
17 self.y = vc(self.y1, self.y2, self.y3) # full state vector
18
19 self.m = cs.SX.sym("m") # mass
20 self.D = cs.SX.sym("D") # spring constant
21 self.L = cs.SX.sym("L") # spring length
22 self.params = vc(self.m, self.D, self.L)
23
24 self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81]) # gravity
25 self.x0 = np.zeros((dim, )) # ball 0 position
26 self.x_end = np.eye(1, dim, 0).ravel() # ball N+1 reference position
27
28 self._build_dynamics(Ts)
29
30 def _build_dynamics(self, Ts):
31 y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
32 dist = lambda xa, xb: cs.norm_2(xa - xb)
33 N, d = self.N, self.dim
34 p = self.params
35
36 # Continuous-time dynamics y' = f(y, u; p)
37
38 f1 = [y2]
39 f2 = []
40 for i in range(N):
41 xi = y1[d * i:d * i + d]
42 xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
43 Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
44 xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
45 Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
46 fi = (Fiip1 - Fim1i) / self.m + self.g
47 f2 += [fi]
48 f3 = [u]
49
50 f_expr = vc(*f1, *f2, *f3)
51 self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
52
53 # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
54
55 # 4th order Runge-Kutta integrator
56 k1 = self.f(y, u, p)
57 k2 = self.f(y + Ts * k1 / 2, u, p)
58 k3 = self.f(y + Ts * k2 / 2, u, p)
59 k4 = self.f(y + Ts * k3, u, p)
60
61 # Discrete-time dynamics
62 f_d_expr = y + (Ts / 6) * (k1 + 2 * k2 + 2 * k3 + k4)
63 self.f_d = cs.Function("f", [y, u, p], [f_d_expr])
64
65 return self.f_d
66
67 def state_to_pos(self, y):
68 N, d = self.N, self.dim
69 rav = lambda x: np.array(x).ravel()
70 xdim = lambda y, i: np.concatenate(
71 ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
72 if d == 3:
73 return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
74 else:
75 return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
76
77 def input_to_matrix(self, u):
78 """
79 Reshape the input signal from a vector into a dim × N_horiz matrix (note
80 that CasADi matrices are stored column-wise and NumPy arrays row-wise)
81 """
82 if isinstance(u, np.ndarray):
83 return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
84 else:
85 return u.reshape((self.dim, u.shape[0] // self.dim))
86
87 def simulate(self, N_sim: int, y_0: np.ndarray,
88 u: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
89 p: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym]):
90 if isinstance(u, list):
91 u = np.array(u)
92 if isinstance(u, np.ndarray):
93 if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
94 if u.shape[0] == self.dim:
95 u = np.tile(u, (N_sim, 1)).T
96 return self.f_d.mapaccum(N_sim)(y_0, u, p)
97
98 def initial_state(self):
99 N, d = self.N, self.dim
100 y1_0 = np.zeros((d * N))
101 y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
102 y2_0 = np.zeros((d * N))
103 y3_0 = np.zeros((d, ))
104 y3_0[0] = 1
105
106 y_null = np.concatenate((y1_0, y2_0, y3_0))
107 u_null = np.zeros((d, ))
108
109 return y_null, u_null
110
111 def generate_cost_funcs(self, α=25, β=1, γ=0.01):
112 N, d = self.N, self.dim
113 y1t = cs.SX.sym("y1t", d * N, 1)
114 y2t = cs.SX.sym("y2t", d * N, 1)
115 y3t = cs.SX.sym("y3t", d, 1)
116 ut = cs.SX.sym("ut", d, 1)
117 yt = cs.vertcat(y1t, y2t, y3t)
118
119 L_cost_x = α * cs.sumsqr(y3t - self.x_end)
120 for i in range(N):
121 xdi = y2t[d * i:d * i + d]
122 L_cost_x += β * cs.sumsqr(xdi)
123 L_cost_u = γ * cs.sumsqr(ut)
124 return cs.Function("L_cost_x", [yt], [L_cost_x]), \
125 cs.Function("L_cost_u", [ut], [L_cost_u])