Hanging Chain#
In this example, a mode predictive controller (MPC) is used to stabilize a system of weights connected by springs. The rightmost weight is fixed in place at the origin, whereas the velocity of the leftmost weight can be controlled by an actuator. The six weights in the middle move under the influence of gravity and the forces of the springs between them.
The goal of the controller is to stabilize the system (i.e. drive the velocity of all weights to zero) with the rightmost weight at position \((1, 0)\). Additionally, a non-convex cubic constraint on the weights’ position is imposed, shown in green on the figure below.
1## @example mpc/python/hanging-chain/hanging-chain-mpc.py
2# This example shows how to call the alpaqa solver from Python code, applied
3# to a challenging model predictive control problem.
4
5# %% Hanging chain MPC example
6
7import casadi as cs
8import numpy as np
9import os
10from os.path import join, dirname
11import sys
12
13sys.path.append(dirname(__file__))
14from hanging_chain_dynamics import HangingChain
15
16# %% Build the model
17
18Ts = 0.05
19N = 6 # number of balls
20dim = 2 # dimension
21
22model = HangingChain(N, dim)
23f_d = model.dynamics(Ts)
24y_null, u_null = model.initial_state()
25
26param = [0.03, 1.6, 0.033 / N] # Concrete parameters m, D, L
27
28# %% Apply an initial input to disturb the system
29
30N_dist = 3
31u_dist = [-0.5, 0.5, 0.5] if dim == 3 else [-0.5, 0.5]
32y_dist = model.simulate(N_dist, y_null, u_dist, param)
33y_dist = np.hstack((np.array([y_null]).T, y_dist))
34
35# %% Simulate the system without a controller
36
37N_sim = 180
38y_sim = model.simulate(N_sim, y_dist[:, -1], u_null, param)
39y_sim = np.hstack((y_dist, y_sim))
40
41# %% Define MPC cost and constraints
42
43N_horiz = 12
44
45L_cost = model.generate_cost_fun() # stage cost
46y_init = cs.SX.sym("y_init", *y_null.shape) # initial state
47U = cs.SX.sym("U", dim * N_horiz) # control signals over horizon
48constr_param = cs.SX.sym("c", 3) # Coefficients of cubic constraint function
49mpc_param = cs.vertcat(y_init, model.params, constr_param) # all parameters
50U_mat = model.input_to_matrix(U) # Input as dim by N_horiz matrix
51
52# Cost
53mpc_sim = model.simulate(N_horiz, y_init, U_mat, model.params)
54mpc_cost = 0
55for n in range(N_horiz): # Apply the stage cost function to each stage
56 y_n = mpc_sim[:, n]
57 u_n = U_mat[:, n]
58 mpc_cost += L_cost(y_n, u_n)
59mpc_cost_fun = cs.Function('f_mpc', [U, mpc_param], [mpc_cost])
60
61# Constraints
62g_constr = lambda c, x: c[0] * x**3 + c[1] * x**2 + c[2] * x # Cubic constr
63constr = []
64for n in range(N_horiz): # For each stage,
65 y_n = mpc_sim[:, n]
66 for i in range(N): # for each ball in the stage,
67 yx_n = y_n[dim * i] # constrain the x, y position of the ball
68 yy_n = y_n[dim * i + dim - 1]
69 constr += [yy_n - g_constr(constr_param, yx_n)]
70 constr += [y_n[-1] - g_constr(constr_param, y_n[-dim])] # Ball N+1
71mpc_constr_fun = cs.Function("g", [U, mpc_param], [cs.vertcat(*constr)])
72
73# Fill in the constraint coefficients c(x-a)³ + d(x - a) + b
74a, b, c, d = 0.6, -1.4, 5, 2.2
75constr_coeff = [c, -3 * a * c, 3 * a * a * c + d]
76constr_lb = b - c * a**3 - d * a
77
78# %% NLP formulation
79import panocpy as pa
80
81prob = pa.generate_and_compile_casadi_problem(mpc_cost_fun, mpc_constr_fun)
82prob.C.lowerbound = -1 * np.ones((dim * N_horiz, ))
83prob.C.upperbound = +1 * np.ones((dim * N_horiz, ))
84prob.D.lowerbound = constr_lb * np.ones((len(constr), ))
85
86# %% NLP solver
87from datetime import timedelta
88
89solver = pa.ALMSolver(
90 alm_params={
91 'ε': 1e-4,
92 'δ': 1e-4,
93 'Σ_0': 1e5,
94 'max_time': timedelta(seconds=0.5),
95 },
96 inner_solver=pa.StructuredPANOCLBFGSSolver(
97 panoc_params={
98 'stop_crit': pa.ProjGradNorm2,
99 'max_time': timedelta(seconds=0.2),
100 'hessian_step_size_heuristic': 15,
101 },
102 lbfgs_params={'memory': N_horiz},
103 ),
104)
105
106# %% MPC controller
107
108
109class MPCController:
110 tot_it = 0
111 failures = 0
112 U = np.zeros((N_horiz * dim, ))
113 λ = np.zeros(((N + 1) * N_horiz, ))
114
115 def __init__(self, model, problem):
116 self.model = model
117 self.problem = problem
118
119 def __call__(self, y_n):
120 y_n = np.array(y_n).ravel()
121 # Set the current state as the initial state
122 self.problem.param[:y_n.shape[0]] = y_n
123 # Solve the optimal control problem
124 # (warm start using the previous solution and Lagrange multipliers)
125 self.U, self.λ, stats = solver(self.problem, self.U, self.λ)
126 # Print some solver statistics
127 print(stats['status'], stats['outer_iterations'],
128 stats['inner']['iterations'], stats['elapsed_time'],
129 stats['inner_convergence_failures'])
130 self.tot_it += stats['inner']['iterations']
131 self.failures += stats['status'] != pa.SolverStatus.Converged
132 # Print the Lagrange multipliers, shows that constraints are active
133 print(np.linalg.norm(self.λ))
134 # Return the optimal control signal for the first time step
135 return self.model.input_to_matrix(self.U)[:, 0]
136
137
138# %% Simulate the system using the MPC controller
139
140y_n = np.array(y_dist[:, -1]).ravel() # initial state for controller
141n_state = y_n.shape[0]
142prob.param = np.concatenate((y_n, param, constr_coeff))
143
144y_mpc = np.empty((n_state, N_sim))
145controller = MPCController(model, prob)
146for n in range(N_sim):
147 # Solve the optimal control problem
148 u_n = controller(y_n)
149 # Apply the first optimal control signal to the system and simulate for
150 # one time step, then update the state
151 y_n = model.simulate(1, y_n, u_n, param).T
152 y_mpc[:, n] = y_n
153y_mpc = np.hstack((y_dist, y_mpc))
154
155print(controller.tot_it, controller.failures)
156
157# %% Visualize
158
159import matplotlib.pyplot as plt
160import matplotlib as mpl
161from matplotlib import patheffects
162
163mpl.rcParams['animation.frame_format'] = 'svg'
164
165fig, ax = plt.subplots()
166x, y, z = model.state_to_pos(y_null)
167line, = ax.plot(x, y, '-o', label='Without MPC')
168line_ctrl, = ax.plot(x, y, '-o', label='With MPC')
169plt.legend()
170plt.ylim([-2.5, 1])
171plt.xlim([-0.25, 1.25])
172
173x = np.linspace(-0.25, 1.25, 256)
174y = np.linspace(-2.5, 1, 256)
175X, Y = np.meshgrid(x, y)
176Z = g_constr(constr_coeff, X) + constr_lb - Y
177
178fx = [patheffects.withTickedStroke(spacing=7, linewidth=0.8)]
179cgc = plt.contour(X, Y, Z, [0], colors='tab:green', linewidths=0.8)
180plt.setp(cgc.collections, path_effects=fx)
181
182
183class Animation:
184 points = []
185
186 def __call__(self, i):
187 x, y, z = model.state_to_pos(y_sim[:, i])
188 for p in self.points:
189 p.remove()
190 self.points = []
191 line.set_xdata(x)
192 line.set_ydata(y)
193 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
194 if np.sum(viol):
195 self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
196 x, y, z = model.state_to_pos(y_mpc[:, i])
197 line_ctrl.set_xdata(x)
198 line_ctrl.set_ydata(y)
199 viol = y - g_constr(constr_coeff, x) + 1e-5 < constr_lb
200 if np.sum(viol):
201 self.points += ax.plot(x[viol], y[viol], 'rx', markersize=12)
202 return [line, line_ctrl] + self.points
203
204
205ani = mpl.animation.FuncAnimation(fig,
206 Animation(),
207 interval=1000 * Ts,
208 blit=True,
209 repeat=True,
210 frames=1 + N_dist + N_sim)
211
212# Export the animation
213out = join(dirname(__file__), '..', '..', '..', '..', 'sphinx', 'source',
214 'sphinxstatic', 'hanging-chain.html')
215os.makedirs(dirname(out), exist_ok=True)
216with open(out, "w") as f:
217 f.write('<center>')
218 f.write(ani.to_jshtml())
219 f.write('</center>')
220
221# Show the animation
222plt.show()
1from typing import Union
2import casadi as cs
3import numpy as np
4from casadi import vertcat as vc
5
6
7class HangingChain:
8 def __init__(self, N: int, dim: int):
9 self.N = N
10 self.dim = dim
11
12 self.y1 = cs.SX.sym("y1", dim * N, 1) # state: balls 1→N positions
13 self.y2 = cs.SX.sym("y2", dim * N, 1) # state: balls 1→N velocities
14 self.y3 = cs.SX.sym("y3", dim, 1) # state: ball 1+N position
15 self.u = cs.SX.sym("u", dim, 1) # input: ball 1+N velocity
16 self.y = vc(self.y1, self.y2, self.y3) # full state vector
17
18 self.m = cs.SX.sym("m") # mass
19 self.D = cs.SX.sym("D") # spring constant
20 self.L = cs.SX.sym("L") # spring length
21
22 self.params = vc(self.m, self.D, self.L)
23
24 self.g = np.array([0, 0, -9.81] if dim == 3 else [0, -9.81]) # gravity
25 self.x0 = np.zeros((dim, )) # ball 0 position
26 self.x_end = np.eye(1, dim, 0).ravel() # ball N+1 reference position
27
28 def dynamics(self, Ts=0.05):
29 y, y1, y2, y3, u = self.y, self.y1, self.y2, self.y3, self.u
30 dist = lambda xa, xb: cs.norm_2(xa - xb)
31 N, d = self.N, self.dim
32 p = self.params
33
34 # Continuous-time dynamics y' = f(y, u; p)
35
36 f1 = [y2]
37 f2 = []
38 for i in range(N):
39 xi = y1[d * i:d * i + d]
40 xip1 = y1[d * i + d:d * i + d * 2] if i < N - 1 else y3
41 Fiip1 = self.D * (1 - self.L / dist(xip1, xi)) * (xip1 - xi)
42 xim1 = y1[d * i - d:d * i] if i > 0 else self.x0
43 Fim1i = self.D * (1 - self.L / dist(xi, xim1)) * (xi - xim1)
44 fi = (Fiip1 - Fim1i) / self.m + self.g
45 f2 += [fi]
46 f3 = [u]
47
48 f_expr = vc(*f1, *f2, *f3)
49 self.f = cs.Function("f", [y, u, p], [f_expr], ["y", "u", "p"], ["y'"])
50
51 # Discretize dynamics y[k+1] = f_d(y[k], u[k]; p)
52
53 opt = {"tf": Ts, "simplify": True, "number_of_finite_elements": 4}
54 intg = cs.integrator("intg", "rk", {
55 "x": y,
56 "p": vc(u, p),
57 "ode": f_expr
58 }, opt)
59
60 f_d_expr = intg(x0=y, p=vc(u, p))["xf"]
61 self.f_d = cs.Function("f_d", [y, u, p], [f_d_expr], ["y", "u", "p"],
62 ["y+"])
63
64 return self.f_d
65
66 def state_to_pos(self, y):
67 N, d = self.N, self.dim
68 rav = lambda x: np.array(x).ravel()
69 xdim = lambda y, i: np.concatenate(
70 ([0], rav(y[i:d * N:d]), rav(y[-d + i])))
71 if d == 3:
72 return (xdim(y, 0), xdim(y, 1), xdim(y, 2))
73 else:
74 return (xdim(y, 0), xdim(y, 1), np.zeros((N + 1, )))
75
76 def input_to_matrix(self, u):
77 """
78 Reshape the input signal from a vector into a dim × N_horiz matrix (note
79 that CasADi matrices are stored column-wise and NumPy arrays row-wise)
80 """
81 if isinstance(u, np.ndarray):
82 return u.reshape((self.dim, u.shape[0] // self.dim), order='F')
83 else:
84 return u.reshape((self.dim, u.shape[0] // self.dim))
85
86 def simulate(self, N_sim: int, y_0: np.ndarray, u: Union[np.ndarray, list,
87 cs.SX.sym],
88 p: Union[np.ndarray, list, cs.SX.sym]):
89 if isinstance(u, list):
90 u = np.array(u)
91 if isinstance(u, np.ndarray):
92 if u.ndim == 1 or (u.ndim == 2 and u.shape[1] == 1):
93 if u.shape[0] == self.dim:
94 u = np.tile(u, (N_sim, 1)).T
95 return self.f_d.mapaccum(N_sim)(y_0, u, p)
96
97 def initial_state(self):
98 N, d = self.N, self.dim
99 y1_0 = np.zeros((d * N))
100 y1_0[0::d] = np.arange(1, N + 1) / (N + 1)
101 y2_0 = np.zeros((d * N))
102 y3_0 = np.zeros((d, ))
103 y3_0[0] = 1
104
105 y_null = np.concatenate((y1_0, y2_0, y3_0))
106 u_null = np.zeros((d, ))
107
108 return y_null, u_null
109
110 def generate_cost_fun(self, α=25, β=1, γ=0.01):
111 N, d = self.N, self.dim
112 y1t = cs.SX.sym("y1t", d * N, 1)
113 y2t = cs.SX.sym("y2t", d * N, 1)
114 y3t = cs.SX.sym("y3t", d, 1)
115 ut = cs.SX.sym("ut", d, 1)
116 yt = cs.vertcat(y1t, y2t, y3t)
117
118 L_cost = α * cs.sumsqr(y3t - self.x_end) + γ * cs.sumsqr(ut)
119 for i in range(N):
120 xdi = y2t[d * i:d * i + d]
121 L_cost += β * cs.sumsqr(xdi)
122 return cs.Function("L_cost", [yt, ut], [L_cost])