Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
96129 views
1
# -*- coding: utf-8 -*-
2
"""Copyright 2015 Roger R Labbe Jr.
3
4
FilterPy library.
5
http://github.com/rlabbe/filterpy
6
7
Documentation at:
8
https://filterpy.readthedocs.org
9
10
Supporting book at:
11
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
12
13
This is licensed under an MIT license. See the readme.MD file
14
for more information.
15
"""
16
17
from __future__ import (absolute_import, division, print_function,
18
unicode_literals)
19
20
from filterpy.common import dot3
21
from filterpy.kalman import unscented_transform
22
import numpy as np
23
from numpy import eye, zeros, dot, isscalar, outer
24
from scipy.linalg import inv, cholesky
25
26
27
class UnscentedKalmanFilter(object):
28
# pylint: disable=too-many-instance-attributes
29
# pylint: disable=C0103
30
r""" Implements the Scaled Unscented Kalman filter (UKF) as defined by
31
Simon Julier in [1], using the formulation provided by Wan and Merle
32
in [2]. This filter scales the sigma points to avoid strong nonlinearities.
33
34
35
You will have to set the following attributes after constructing this
36
object for the filter to perform properly.
37
38
**Attributes**
39
40
x : numpy.array(dim_x)
41
state estimate vector
42
43
P : numpy.array(dim_x, dim_x)
44
covariance estimate matrix
45
46
R : numpy.array(dim_z, dim_z)
47
measurement noise matrix
48
49
Q : numpy.array(dim_x, dim_x)
50
process noise matrix
51
52
53
You may read the following attributes.
54
55
**Readable Attributes**
56
57
xp : numpy.array(dim_x)
58
predicted state (result of predict())
59
60
Pp : numpy.array(dim_x, dim_x)
61
predicted covariance matrix (result of predict())
62
63
64
**References**
65
66
.. [1] Julier, Simon J. "The scaled unscented transformation,"
67
American Control Converence, 2002, pp 4555-4559, vol 6.
68
69
Online copy:
70
https://www.cs.unc.edu/~welch/kalman/media/pdf/ACC02-IEEE1357.PDF
71
72
73
.. [2] E. A. Wan and R. Van der Merwe, “The unscented Kalman filter for
74
nonlinear estimation,” in Proc. Symp. Adaptive Syst. Signal
75
Process., Commun. Contr., Lake Louise, AB, Canada, Oct. 2000.
76
77
Online Copy:
78
https://www.seas.harvard.edu/courses/cs281/papers/unscented.pdf
79
"""
80
81
def __init__(self, dim_x, dim_z, dt, hx, fx, points,
82
sqrt_fn=None, x_mean_fn=None, z_mean_fn=None,
83
residual_x=None,
84
residual_z=None):
85
r""" Create a Kalman filter. You are responsible for setting the
86
various state variables to reasonable values; the defaults below will
87
not give you a functional filter.
88
89
**Parameters**
90
91
dim_x : int
92
Number of state variables for the filter. For example, if
93
you are tracking the position and velocity of an object in two
94
dimensions, dim_x would be 4.
95
96
97
dim_z : int
98
Number of of measurement inputs. For example, if the sensor
99
provides you with position in (x,y), dim_z would be 2.
100
101
dt : float
102
Time between steps in seconds.
103
104
hx : function(x)
105
Measurement function. Converts state vector x into a measurement
106
vector of shape (dim_z).
107
108
fx : function(x,dt)
109
function that returns the state x transformed by the
110
state transistion function. dt is the time step in seconds.
111
112
points : class
113
Class which computes the sigma points and weights for a UKF
114
algorithm. You can vary the UKF implementation by changing this
115
class. For example, MerweScaledSigmaPoints implements the alpha,
116
beta, kappa parameterization of Van der Merwe, and
117
JulierSigmaPoints implements Julier's original kappa
118
parameterization. See either of those for the required
119
signature of this class if you want to implement your own.
120
121
sqrt_fn : callable(ndarray), default = scipy.linalg.cholesky
122
Defines how we compute the square root of a matrix, which has
123
no unique answer. Cholesky is the default choice due to its
124
speed. Typically your alternative choice will be
125
scipy.linalg.sqrtm. Different choices affect how the sigma points
126
are arranged relative to the eigenvectors of the covariance matrix.
127
Usually this will not matter to you; if so the default cholesky()
128
yields maximal performance. As of van der Merwe's dissertation of
129
2004 [6] this was not a well reseached area so I have no advice
130
to give you.
131
132
If your method returns a triangular matrix it must be upper
133
triangular. Do not use numpy.linalg.cholesky - for historical
134
reasons it returns a lower triangular matrix. The SciPy version
135
does the right thing.
136
137
x_mean_fn : callable (sigma_points, weights), optional
138
Function that computes the mean of the provided sigma points
139
and weights. Use this if your state variable contains nonlinear
140
values such as angles which cannot be summed.
141
142
.. code-block:: Python
143
144
def state_mean(sigmas, Wm):
145
x = np.zeros(3)
146
sum_sin, sum_cos = 0., 0.
147
148
for i in range(len(sigmas)):
149
s = sigmas[i]
150
x[0] += s[0] * Wm[i]
151
x[1] += s[1] * Wm[i]
152
sum_sin += sin(s[2])*Wm[i]
153
sum_cos += cos(s[2])*Wm[i]
154
x[2] = atan2(sum_sin, sum_cos)
155
return x
156
157
z_mean_fn : callable (sigma_points, weights), optional
158
Same as x_mean_fn, except it is called for sigma points which
159
form the measurements after being passed through hx().
160
161
residual_x : callable (x, y), optional
162
residual_z : callable (x, y), optional
163
Function that computes the residual (difference) between x and y.
164
You will have to supply this if your state variable cannot support
165
subtraction, such as angles (359-1 degreees is 2, not 358). x and y
166
are state vectors, not scalars. One is for the state variable,
167
the other is for the measurement state.
168
169
.. code-block:: Python
170
171
def residual(a, b):
172
y = a[0] - b[0]
173
if y > np.pi:
174
y -= 2*np.pi
175
if y < -np.pi:
176
y = 2*np.pi
177
return y
178
179
**References**
180
181
.. [3] S. Julier, J. Uhlmann, and H. Durrant-Whyte. "A new method for
182
the nonlinear transformation of means and covariances in filters
183
and estimators," IEEE Transactions on Automatic Control, 45(3),
184
pp. 477-482 (March 2000).
185
186
.. [4] E. A. Wan and R. Van der Merwe, “The Unscented Kalman filter for
187
Nonlinear Estimation,” in Proc. Symp. Adaptive Syst. Signal
188
Process., Commun. Contr., Lake Louise, AB, Canada, Oct. 2000.
189
190
https://www.seas.harvard.edu/courses/cs281/papers/unscented.pdf
191
192
.. [5] Wan, Merle "The Unscented Kalman Filter," chapter in *Kalman
193
Filtering and Neural Networks*, John Wiley & Sons, Inc., 2001.
194
195
.. [6] R. Van der Merwe "Sigma-Point Kalman Filters for Probabilitic
196
Inference in Dynamic State-Space Models" (Doctoral dissertation)
197
"""
198
199
self.Q = eye(dim_x)
200
self.R = eye(dim_z)
201
self.x = zeros(dim_x)
202
self.P = eye(dim_x)
203
self._dim_x = dim_x
204
self._dim_z = dim_z
205
self._dt = dt
206
self._num_sigmas = 2*dim_x + 1
207
self.hx = hx
208
self.fx = fx
209
self.points_fn = points
210
self.x_mean = x_mean_fn
211
self.z_mean = z_mean_fn
212
213
if sqrt_fn is None:
214
self.msqrt = cholesky
215
else:
216
self.msqrt = sqrt_fn
217
218
# weights for the means and covariances.
219
self.Wm, self.Wc = self.points_fn.weights()
220
221
if residual_x is None:
222
self.residual_x = np.subtract
223
else:
224
self.residual_x = residual_x
225
226
if residual_z is None:
227
self.residual_z = np.subtract
228
else:
229
self.residual_z = residual_z
230
231
# sigma points transformed through f(x) and h(x)
232
# variables for efficiency so we don't recreate every update
233
self.sigmas_f = zeros((2*self._dim_x+1, self._dim_x))
234
self.sigmas_h = zeros((self._num_sigmas, self._dim_z))
235
236
237
238
def predict(self, dt=None, UT=None, fx_args=()):
239
r""" Performs the predict step of the UKF. On return, self.x and
240
self.P contain the predicted state (x) and covariance (P). '
241
242
Important: this MUST be called before update() is called for the first
243
time.
244
245
**Parameters**
246
247
dt : double, optional
248
If specified, the time step to be used for this prediction.
249
self._dt is used if this is not provided.
250
251
UT : function(sigmas, Wm, Wc, noise_cov), optional
252
Optional function to compute the unscented transform for the sigma
253
points passed through hx. Typically the default function will
254
work - you can use x_mean_fn and z_mean_fn to alter the behavior
255
of the unscented transform.
256
257
fx_args : tuple, optional, default (,)
258
optional arguments to be passed into fx() after the required state
259
variable.
260
261
"""
262
if dt is None:
263
dt = self._dt
264
265
if not isinstance(fx_args, tuple):
266
fx_args = (fx_args,)
267
268
if UT is None:
269
UT = unscented_transform
270
271
# calculate sigma points for given mean and covariance
272
sigmas = self.points_fn.sigma_points(self.x, self.P)
273
274
for i in range(self._num_sigmas):
275
self.sigmas_f[i] = self.fx(sigmas[i], dt, *fx_args)
276
277
self.x, self.P = UT(self.sigmas_f, self.Wm, self.Wc, self.Q,
278
self.x_mean, self.residual_x)
279
280
281
def update(self, z, R=None, UT=None, hx_args=()):
282
""" Update the UKF with the given measurements. On return,
283
self.x and self.P contain the new mean and covariance of the filter.
284
285
**Parameters**
286
287
z : numpy.array of shape (dim_z)
288
measurement vector
289
290
R : numpy.array((dim_z, dim_z)), optional
291
Measurement noise. If provided, overrides self.R for
292
this function call.
293
294
UT : function(sigmas, Wm, Wc, noise_cov), optional
295
Optional function to compute the unscented transform for the sigma
296
points passed through hx. Typically the default function will
297
work - you can use x_mean_fn and z_mean_fn to alter the behavior
298
of the unscented transform.
299
300
hx_args : tuple, optional, default (,)
301
arguments to be passed into Hx function after the required state
302
variable.
303
304
residual_x : function (x, x2), optional
305
Optional function that computes the residual (difference) between
306
the two state vectors. If you do not provide this, then the
307
built in minus operator will be used. You will normally want to use
308
the built in unless your residual computation is nonlinear (for
309
example, if they are angles)
310
311
residual_h : function (z, z2), optional
312
Optional function that computes the residual (difference) between
313
the two measurement vectors. If you do not provide this, then the
314
built in minus operator will be used.
315
"""
316
317
if z is None:
318
return
319
320
if not isinstance(hx_args, tuple):
321
hx_args = (hx_args,)
322
323
if UT is None:
324
UT = unscented_transform
325
326
if R is None:
327
R = self.R
328
elif isscalar(R):
329
R = eye(self._dim_z) * R
330
331
for i in range(self._num_sigmas):
332
self.sigmas_h[i] = self.hx(self.sigmas_f[i], *hx_args)
333
334
# mean and covariance of prediction passed through unscented transform
335
zp, Pz = UT(self.sigmas_h, self.Wm, self.Wc, R, self.z_mean, self.residual_z)
336
337
# compute cross variance of the state and the measurements
338
Pxz = zeros((self._dim_x, self._dim_z))
339
for i in range(self._num_sigmas):
340
dx = self.residual_x(self.sigmas_f[i], self.x)
341
dz = self.residual_z(self.sigmas_h[i], zp)
342
Pxz += self.Wc[i] * outer(dx, dz)
343
344
K = dot(Pxz, inv(Pz)) # Kalman gain
345
y = self.residual_z(z, zp) #residual
346
self.x = self.x + dot(K, y)
347
self.P = self.P - dot3(K, Pz, K.T)
348
349
350
def batch_filter(self, zs, Rs=None, residual=None, UT=None):
351
""" Performs the UKF filter over the list of measurement in `zs`.
352
353
354
**Parameters**
355
356
zs : list-like
357
list of measurements at each time step `self._dt` Missing
358
measurements must be represented by 'None'.
359
360
Rs : list-like, optional
361
optional list of values to use for the measurement error
362
covariance; a value of None in any position will cause the filter
363
to use `self.R` for that time step.
364
365
residual : function (z, z2), optional
366
Optional function that computes the residual (difference) between
367
the two measurement vectors. If you do not provide this, then the
368
built in minus operator will be used. You will normally want to use
369
the built in unless your residual computation is nonlinear (for
370
example, if they are angles)
371
372
UT : function(sigmas, Wm, Wc, noise_cov), optional
373
Optional function to compute the unscented transform for the sigma
374
points passed through hx. Typically the default function will
375
work - you can use x_mean_fn and z_mean_fn to alter the behavior
376
of the unscented transform.
377
378
**Returns**
379
380
means: ndarray((n,dim_x,1))
381
array of the state for each time step after the update. Each entry
382
is an np.array. In other words `means[k,:]` is the state at step
383
`k`.
384
385
covariance: ndarray((n,dim_x,dim_x))
386
array of the covariances for each time step after the update.
387
In other words `covariance[k,:,:]` is the covariance at step `k`.
388
389
"""
390
391
try:
392
z = zs[0]
393
except:
394
assert not isscalar(zs), 'zs must be list-like'
395
396
if self._dim_z == 1:
397
assert isscalar(z) or (z.ndim==1 and len(z) == 1), \
398
'zs must be a list of scalars or 1D, 1 element arrays'
399
400
else:
401
assert len(z) == self._dim_z, 'each element in zs must be a' \
402
'1D array of length {}'.format(self._dim_z)
403
404
405
if residual is None:
406
residual = np.subtract
407
408
z_n = np.size(zs, 0)
409
if Rs is None:
410
Rs = [None] * z_n
411
412
# mean estimates from Kalman Filter
413
if self.x.ndim == 1:
414
means = zeros((z_n, self._dim_x))
415
else:
416
means = zeros((z_n, self._dim_x, 1))
417
418
# state covariances from Kalman Filter
419
covariances = zeros((z_n, self._dim_x, self._dim_x))
420
421
for i, (z, r) in enumerate(zip(zs, Rs)):
422
self.predict()
423
self.update(z, r)
424
means[i,:] = self.x
425
covariances[i,:,:] = self.P
426
427
return (means, covariances)
428
429
430
def rts_smoother(self, Xs, Ps, Qs=None, dt=None):
431
""" Runs the Rauch-Tung-Striebal Kalman smoother on a set of
432
means and covariances computed by the UKF. The usual input
433
would come from the output of `batch_filter()`.
434
435
**Parameters**
436
437
Xs : numpy.array
438
array of the means (state variable x) of the output of a Kalman
439
filter.
440
441
Ps : numpy.array
442
array of the covariances of the output of a kalman filter.
443
444
Qs: list-like collection of numpy.array, optional
445
Process noise of the Kalman filter at each time step. Optional,
446
if not provided the filter's self.Q will be used
447
448
dt : optional, float or array-like of float
449
If provided, specifies the time step of each step of the filter.
450
If float, then the same time step is used for all steps. If
451
an array, then each element k contains the time at step k.
452
Units are seconds.
453
454
**Returns**
455
456
x : numpy.ndarray
457
smoothed means
458
459
P : numpy.ndarray
460
smoothed state covariances
461
462
K : numpy.ndarray
463
smoother gain at each step
464
465
466
**Example**
467
468
.. code-block:: Python
469
470
zs = [t + random.randn()*4 for t in range (40)]
471
472
(mu, cov, _, _) = kalman.batch_filter(zs)
473
(x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)
474
475
"""
476
assert len(Xs) == len(Ps)
477
n, dim_x = Xs.shape
478
479
if dt is None:
480
dt = [self._dt] * n
481
elif isscalar(dt):
482
dt = [dt] * n
483
484
if Qs is None:
485
Qs = [self.Q] * n
486
487
# smoother gain
488
Ks = zeros((n,dim_x,dim_x))
489
490
num_sigmas = 2*dim_x + 1
491
492
xs, ps = Xs.copy(), Ps.copy()
493
sigmas_f = zeros((num_sigmas, dim_x))
494
495
496
for k in range(n-2,-1,-1):
497
# create sigma points from state estimate, pass through state func
498
sigmas = self.points_fn.sigma_points(xs[k], ps[k])
499
for i in range(num_sigmas):
500
sigmas_f[i] = self.fx(sigmas[i], dt[k])
501
502
# compute backwards prior state and covariance
503
xb = dot(self.Wm, sigmas_f)
504
Pb = 0
505
x = Xs[k]
506
for i in range(num_sigmas):
507
y = sigmas_f[i] - x
508
Pb += self.Wm[i] * outer(y, y)
509
Pb += Qs[k]
510
511
# compute cross variance
512
Pxb = 0
513
for i in range(num_sigmas):
514
z = sigmas[i] - Xs[k]
515
y = sigmas_f[i] - xb
516
Pxb += self.Wm[i] * outer(z, y)
517
518
# compute gain
519
K = dot(Pxb, inv(Pb))
520
521
# update the smoothed estimates
522
xs[k] += dot (K, xs[k+1] - xb)
523
ps[k] += dot3(K, ps[k+1] - Pb, K.T)
524
Ks[k] = K
525
526
return (xs, ps, Ks)
527
528