Hanging Chain¶
In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.
The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.
1# %% Hanging chain MPC example
2
3import sys
4from os.path import dirname
5
6import casadi as cs
7import numpy as np
8
9sys.path.append(dirname(__file__))
10from hanging_chain_dynamics import HangingChain
11
12# %% Build the model
13
14Ts = 0.05 # Time step [s]
15N = 6 # Number of balls
16dim = 2 # Dimension (2D or 3D)
17
18model = HangingChain(N, dim, Ts)
19y_null, u_null = model.initial_state() # Initial states and control inputs
20
21model_param = [0.03, 1.6, 0.033 / N] # Concrete parameters m, D, L
22
23# %% Apply an initial control input to disturb the system
24
25N_dist = 3 # Number of time steps to apply the disturbance for
26u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5] # Disturbance input
27y_dist = model.simulate(N_dist, y_null, u_dist, model_param) # Model states
28y_dist = np.hstack((np.array([y_null]).T, y_dist)) # (including initial state)
29
30# %% Simulate the system without a controller
31
32N_sim = 180 # Number of time steps to simulate for
33y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, model_param) # States
34y_sim = np.hstack((y_dist, y_sim)) # (including disturbed and initial states)
35
36# %% Define MPC cost and constraints
37
38N_horiz = 12 # MPC horizon length (number of time steps)
39
40y_init = cs.MX.sym("y_init", *y_null.shape) # Initial state
41model_params = cs.MX.sym("params", *model.params.shape) # Parameters
42num_var = dim * N_horiz
43U = cs.MX.sym("U", num_var) # Control signals over horizon
44U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
45constr_param = cs.MX.sym("c", 3) # Coefficients of cubic constraint function
46mpc_param = cs.vertcat(y_init, model_params, constr_param) # All parameters
47
48# Cost
49
50# Stage costs for states and input
51stage_y_cost, stage_u_cost = model.generate_cost_funcs()
52# Simulate the model with the input over the horizon
53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model_params)
54# Accumulate the cost of the outputs and inputs
55mpc_y_cost = cs.sum2(stage_y_cost.map(N_horiz)(mpc_sim))
56mpc_u_cost = cs.sum2(stage_u_cost.map(N_horiz)(U_mat))
57mpc_cost = mpc_y_cost + mpc_u_cost
58
59# Constraints
60
61# Cubic constraint function for a single ball in one dimension
62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x
63# Constraint function for one stage (N balls)
64y_c = cs.MX.sym("y_c", y_dist.shape[0])
65constr = []
66for i in range(N): # for each ball in the stage except the last,
67 yx_n = y_c[dim * i] # constrain the x, y position of the ball
68 yy_n = y_c[dim * i + dim - 1]
69 constr += [yy_n - g_constr(constr_param, yx_n)]
70constr += [y_c[-1] - g_constr(constr_param, y_c[-dim])] # Ball N+1
71constr_fun = cs.Function("c", [y_c, constr_param], [cs.vertcat(*constr)])
72# Constraint function for all stages in the horizon
73mpc_constr = cs.vec(constr_fun.map(N_horiz)(mpc_sim, constr_param))
74num_constr = (N + 1) * N_horiz
75# Fill in the constraint coefficients c(x-a)³ + d(x-a) + b
76a, b, c, d = 0.6, -1.4, 5, 2.2
77constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
78constr_lb = b - c * a**3 - d * a
79# Box constraints on actuator:
80C = -1 * np.ones(num_var), +1 * np.ones(num_var) # lower bound, upper bound
81# Constant term of the cubic state constraints as a one-sided box:
82D = constr_lb * np.ones(num_constr), +np.inf * np.ones(num_constr)
83
84# Initial parameter value
85
86y_n = np.array(y_dist[:, -1]).ravel() # Initial state of the chain
87n_state = y_n.shape[0]
88param_0 = np.concatenate((y_n, model_param, constr_coeff))
89
90# %% NLP formulation
91
92import alpaqa
93
94# Generate C code for the cost and constraint functions, compile them, and load
95# them as an alpaqa problem description:
96problem = (
97 alpaqa.minimize(mpc_cost, U) # objective and variables f(x; p)
98 .subject_to_box(C) # box constraints on variables x ∊ C
99 .subject_to(mpc_constr, D) # general constraints g(x; p) ∊ D
100 .with_param(mpc_param) # parameter to be changed later p
101 .with_param_value(param_0) # initial parameter value
102 .with_name(f"hanging_chain_{N_horiz}") # name used in generated files
103).compile(sym=cs.MX.sym)
104
105# %% NLP solver
106
107from datetime import timedelta
108
109# Configure an alpaqa solver:
110solver = alpaqa.ALMSolver(
111 alm_params={
112 "tolerance": 1e-3,
113 "dual_tolerance": 1e-3,
114 "initial_penalty": 1e4,
115 "max_iter": 100,
116 "max_time": timedelta(seconds=0.2),
117 },
118 inner_solver=alpaqa.PANOCSolver(
119 panoc_params={
120 "stop_crit": alpaqa.FPRNorm,
121 "max_time": timedelta(seconds=0.02),
122 },
123 lbfgs_params={"memory": N_horiz},
124 ),
125)
126
127# %% MPC controller
128
129
130# Wrap the solver in a class that solves the optimal control problem at each
131# time step, implementing warm starting:
132class MPCController:
133 def __init__(self, model: HangingChain, problem: alpaqa.CasADiProblem):
134 self.model = model
135 self.problem = problem
136 self.tot_it = 0
137 self.tot_time = timedelta()
138 self.max_time = timedelta()
139 self.failures = 0
140 self.U = np.zeros(problem.num_variables)
141 self.λ = np.zeros(problem.num_constraints)
142
143 def __call__(self, y_n):
144 y_n = np.array(y_n).ravel()
145 # Set the current state as the initial state
146 self.problem.param[: y_n.shape[0]] = y_n
147 # Shift over the previous solution and Lagrange multipliers
148 self.U = np.concatenate((self.U[dim:], self.U[-dim:]))
149 self.λ = np.concatenate((self.λ[N + 1 :], self.λ[-N - 1 :]))
150 # Solve the optimal control problem
151 # (warm start using the shifted previous solution and multipliers)
152 self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
153 # Print some solver statistics
154 print(
155 f'{stats["status"]} outer={stats["outer_iterations"]} '
156 f'inner={stats["inner"]["iterations"]} time={stats["elapsed_time"]} '
157 f'failures={stats["inner_convergence_failures"]}'
158 )
159 self.tot_it += stats["inner"]["iterations"]
160 self.failures += stats["status"] != alpaqa.SolverStatus.Converged
161 self.tot_time += stats["elapsed_time"]
162 self.max_time = max(self.max_time, stats["elapsed_time"])
163 # Print the Lagrange multipliers, shows that constraints are active
164 print(np.linalg.norm(self.λ))
165 # Return the optimal control signal for the first time step
166 return self.model.input_to_matrix(self.U)[:, 0]
167
168
169# %% Simulate the system using the MPC controller
170
171problem.param = param_0
172
173y_mpc = np.empty((n_state, N_sim))
174controller = MPCController(model, problem)
175for n in range(N_sim):
176 # Solve the optimal control problem:
177 u_n = controller(y_n)
178 # Apply the first optimal control input to the system and simulate for
179 # one time step, then update the state:
180 y_n = model.simulate(1, y_n, u_n, model_param).T
181 y_mpc[:, n] = y_n
182y_mpc = np.hstack((y_dist, y_mpc))
183
184print(f"{controller.tot_it} inner iterations, {controller.failures} failures")
185print(
186 f"time: {controller.tot_time} (total), {controller.max_time} (max), "
187 f"{controller.tot_time / N_sim} (avg)"
188)
189
190# %% Visualize the results
191
192import matplotlib as mpl
193import matplotlib.pyplot as plt
194from matplotlib import animation, patheffects
195
196mpl.rcParams["animation.frame_format"] = "svg"
197
198# Plot the chains
199fig, ax = plt.subplots()
200x, y, z = model.state_to_pos(y_null)
201(line,) = ax.plot(x, y, "-o", label="Without MPC")
202(line_ctrl,) = ax.plot(x, y, "-o", label="With MPC")
203plt.legend()
204plt.ylim([-2.5, 1])
205plt.xlim([-0.25, 1.25])
206
207# Plot the state constraints
208x = np.linspace(-0.25, 1.25, 256)
209y = np.linspace(-2.5, 1, 256)
210X, Y = np.meshgrid(x, y)
211Z = g_constr(constr_coeff, X) + constr_lb - Y
212fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
213cgc = plt.contour(X, Y, Z, [0], colors="tab:green", linewidths=0.8)
214cgc.set_path_effects(fx)
215
216
217class Animation:
218 def __init__(self) -> None:
219 self.points = []
220
221 def __call__(self, i):
222 x, y, z = model.state_to_pos(y_sim[:, i])
223 y = z if dim == 3 else y
224 for p in self.points:
225 p.remove()
226 self.points = []
227 line.set_xdata(x)
228 line.set_ydata(y)
229 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
230 if np.sum(viol):
231 self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
232 x, y, z = model.state_to_pos(y_mpc[:, i])
233 y = z if dim == 3 else y
234 line_ctrl.set_xdata(x)
235 line_ctrl.set_ydata(y)
236 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
237 if np.sum(viol):
238 self.points += ax.plot(x[viol], y[viol], "rx", markersize=12)
239 return [line, line_ctrl, *self.points]
240
241
242ani = animation.FuncAnimation(
243 fig,
244 Animation(),
245 interval=1000 * Ts,
246 blit=True,
247 repeat=True,
248 frames=1 + N_dist + N_sim,
249)
250
251# Show the animation
252plt.show()
hanging_chain_dynamics.py
1from typing import Union
2
3import casadi as cs
4import numpy as np
5from casadi import vertcat as vc
6
7
8class HangingChain:
9
10 def __init__(self, N: int, dim: int, Ts: float = 0.05):
11 self.N = N
12 self.dim = dim
13 self.nx = 2 * dim * N + dim
14
15 self.y1 = cs.SX.sym("y1", dim * N, 1) # state: balls 1→N positions
16 self.y2 = cs.SX.sym("y2", dim * N, 1) # state: balls 1→N velocities
17 self.y3 = cs.SX.sym("y3", dim, 1) # state: ball 1+N position
18 self.u = cs.SX.sym("u", dim, 1) # input: ball 1+N velocity
19 self.y = vc(self.y1, self.y2, self.y3) # full state vector
20
21 self.m = cs.SX.sym("m") # mass
22 self.D = cs.SX.sym("D") # spring constant
23 self.L = cs.SX.sym("L") # spring length
24 self.params = vc(self.m, self.D, self.L)
25
26 self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81]) # gravity
27 self.x0 = np.zeros((dim,)) # ball 0 position
28 self.x_end = np.eye(1, dim, 0).ravel() # ball N+1 reference position
29
30 self._build_dynamics(Ts)
31
32 def _build_dynamics(self, Ts):
33 y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
34 dist = lambda xa, xb: cs.norm_2(xa - xb)
35 N, d = self.N, self.dim
36 p = self.params
37
38 # Continuous-time dynamics y' = f(y, u; p)
39
40 f1 = [y2]
41 f2 = []
42 for i in range(N):
43 xi = y1[d * i : d * i + d]
44 xip1 = y1[d * i + d : d * i + d * 2] if i < N - 1 else y3
45 Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
46 xim1 = y1[d * i - d : d * i] if i > 0 else self.x0
47 Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
48 fi = (Fiip1 - Fim1i) / self.m + self.g
49 f2 += [fi]
50 f3 = [u]
51
52 f_expr = vc(*f1, *f2, *f3)
53 self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
54
55 # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
56
57 # 4th order Runge-Kutta integrator
58 k1 = self.f(y, u, p)
59 k2 = self.f(y + Ts * k1 / 2, u, p)
60 k3 = self.f(y + Ts * k2 / 2, u, p)
61 k4 = self.f(y + Ts * k3, u, p)
62
63 # Discrete-time dynamics
64 f_d_expr = y + (Ts / 6) * (k1 + 2 * k2 + 2 * k3 + k4)
65 self.f_d = cs.Function("f", [y, u, p], [f_d_expr])
66
67 return self.f_d
68
69 def state_to_pos(self, y):
70 N, d = self.N, self.dim
71 rav = lambda x: np.array(x).ravel()
72 xdim = lambda y, i: np.concatenate(([0], rav(y[i : d * N : d]), rav(y[-d + i])))
73 if d == 3:
74 return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
75 else:
76 return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1,)))
77
78 def input_to_matrix(self, u):
79 """
80 Reshape the input signal from a vector into a dim × N_horiz matrix (note
81 that CasADi matrices are stored column-wise and NumPy arrays row-wise)
82 """
83 if isinstance(u, np.ndarray):
84 return u.reshape((self.dim, u.shape[0] // self.dim), order="F")
85 else:
86 return u.reshape((self.dim, u.shape[0] // self.dim))
87
88 def state_to_matrix(self, y):
89 """
90 Reshape the state sequence from a vector into a n × (N_horiz+1) matrix (note
91 that CasADi matrices are stored column-wise and NumPy arrays row-wise)
92 """
93 if isinstance(y, np.ndarray):
94 return y.reshape((self.nx, y.shape[0] // self.nx), order="F")
95 else:
96 return y.reshape((self.nx, y.shape[0] // self.nx))
97
98 def simulate(
99 self,
100 N_sim: int,
101 y_0: np.ndarray,
102 u: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
103 p: Union[np.ndarray, list, cs.SX.sym, cs.MX.sym],
104 ):
105 if isinstance(u, list):
106 u = np.array(u)
107 if (
108 isinstance(u, np.ndarray)
109 and (u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1))
110 and u.shape[0] == self.dim
111 ):
112 u = np.tile(u, (N_sim, 1)).T
113 return self.f_d.mapaccum(N_sim)(y_0, u, p)
114
115 def initial_state(self):
116 N, d = self.N, self.dim
117 y1_0 = np.zeros(d * N)
118 y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
119 y2_0 = np.zeros(d * N)
120 y3_0 = np.zeros((d,))
121 y3_0[0] = 1
122
123 y_null = np.concatenate((y1_0, y2_0, y3_0))
124 u_null = np.zeros((d,))
125
126 assert self.nx == len(y_null)
127
128 return y_null, u_null
129
130 def generate_cost_funcs(self, α=25, β=1, γ=0.01):
131 N, d = self.N, self.dim
132 y1t = cs.SX.sym("y1t", d * N, 1)
133 y2t = cs.SX.sym("y2t", d * N, 1)
134 y3t = cs.SX.sym("y3t", d, 1)
135 ut = cs.SX.sym("ut", d, 1)
136 yt = cs.vertcat(y1t, y2t, y3t)
137
138 L_cost_x = α * cs.sumsqr(y3t - self.x_end)
139 for i in range(N):
140 xdi = y2t[d * i : d * i + d]
141 L_cost_x += β * cs.sumsqr(xdi)
142 L_cost_u = γ * cs.sumsqr(ut)
143 return cs.Function("L_cost_x", [yt], [L_cost_x]), cs.Function(
144 "L_cost_u", [ut], [L_cost_u]
145 )