Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
96131 views
1
# -*- coding: utf-8 -*-
2
3
"""Copyright 2015 Roger R Labbe Jr.
4
5
FilterPy library.
6
http://github.com/rlabbe/filterpy
7
8
Documentation at:
9
https://filterpy.readthedocs.org
10
11
Supporting book at:
12
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
13
14
This is licensed under an MIT license. See the readme.MD file
15
for more information.
16
"""
17
18
from __future__ import (absolute_import, division, print_function,
19
unicode_literals)
20
import numpy as np
21
import scipy.linalg as linalg
22
from numpy import dot, zeros, eye, isscalar
23
from filterpy.common import setter, setter_1d, setter_scalar, dot3
24
25
26
27
class KalmanFilter(object):
28
""" Implements a Kalman filter. You are responsible for setting the
29
various state variables to reasonable values; the defaults will
30
not give you a functional filter.
31
32
You will have to set the following attributes after constructing this
33
object for the filter to perform properly. Please note that there are
34
various checks in place to ensure that you have made everything the
35
'correct' size. However, it is possible to provide incorrectly sized
36
arrays such that the linear algebra can not perform an operation.
37
It can also fail silently - you can end up with matrices of a size that
38
allows the linear algebra to work, but are the wrong shape for the problem
39
you are trying to solve.
40
41
**Attributes**
42
43
x : numpy.array(dim_x, 1)
44
state estimate vector
45
46
P : numpy.array(dim_x, dim_x)
47
covariance estimate matrix
48
49
R : numpy.array(dim_z, dim_z)
50
measurement noise matrix
51
52
Q : numpy.array(dim_x, dim_x)
53
process noise matrix
54
55
F : numpy.array()
56
State Transition matrix
57
58
H : numpy.array(dim_x, dim_x)
59
60
61
You may read the following attributes.
62
63
**Readable Attributes**
64
65
y : numpy.array
66
Residual of the update step.
67
68
K : numpy.array(dim_x, dim_z)
69
Kalman gain of the update step
70
71
S : numpy.array
72
Systen uncertaintly projected to measurement space
73
74
"""
75
76
77
78
def __init__(self, dim_x, dim_z, dim_u=0):
79
""" Create a Kalman filter. You are responsible for setting the
80
various state variables to reasonable values; the defaults below will
81
not give you a functional filter.
82
83
**Parameters**
84
85
dim_x : int
86
Number of state variables for the Kalman filter. For example, if
87
you are tracking the position and velocity of an object in two
88
dimensions, dim_x would be 4.
89
90
This is used to set the default size of P, Q, and u
91
92
dim_z : int
93
Number of of measurement inputs. For example, if the sensor
94
provides you with position in (x,y), dim_z would be 2.
95
96
dim_u : int (optional)
97
size of the control input, if it is being used.
98
Default value of 0 indicates it is not used.
99
"""
100
101
assert dim_x > 0
102
assert dim_z > 0
103
assert dim_u >= 0
104
105
self.dim_x = dim_x
106
self.dim_z = dim_z
107
self.dim_u = dim_u
108
109
self._x = zeros((dim_x,1)) # state
110
self._P = eye(dim_x) # uncertainty covariance
111
self._Q = eye(dim_x) # process uncertainty
112
self._B = 0 # control transition matrix
113
self._F = 0 # state transition matrix
114
self.H = 0 # Measurement function
115
self.R = eye(dim_z) # state uncertainty
116
self._alpha_sq = 1. # fading memory control
117
118
# gain and residual are computed during the innovation step. We
119
# save them so that in case you want to inspect them for various
120
# purposes
121
self._K = 0 # kalman gain
122
self._y = zeros((dim_z, 1))
123
self._S = 0 # system uncertainty in measurement space
124
125
# identity matrix. Do not alter this.
126
self._I = np.eye(dim_x)
127
128
129
def update(self, z, R=None, H=None):
130
"""
131
Add a new measurement (z) to the kalman filter. If z is None, nothing
132
is changed.
133
134
**Parameters**
135
136
z : np.array
137
measurement for this update.
138
139
R : np.array, scalar, or None
140
Optionally provide R to override the measurement noise for this
141
one call, otherwise self.R will be used.
142
"""
143
144
if z is None:
145
return
146
147
if R is None:
148
R = self.R
149
elif isscalar(R):
150
R = eye(self.dim_z) * R
151
152
# rename for readability and a tiny extra bit of speed
153
if H is None:
154
H = self.H
155
P = self._P
156
x = self._x
157
158
# y = z - Hx
159
# error (residual) between measurement and prediction
160
self._y = z - dot(H, x)
161
162
# S = HPH' + R
163
# project system uncertainty into measurement space
164
S = dot3(H, P, H.T) + R
165
166
# K = PH'inv(S)
167
# map system uncertainty into kalman gain
168
K = dot3(P, H.T, linalg.inv(S))
169
170
# x = x + Ky
171
# predict new x with residual scaled by the kalman gain
172
self._x = x + dot(K, self._y)
173
174
# P = (I-KH)P(I-KH)' + KRK'
175
I_KH = self._I - dot(K, H)
176
self._P = dot3(I_KH, P, I_KH.T) + dot3(K, R, K.T)
177
178
self._S = S
179
self._K = K
180
181
182
def test_matrix_dimensions(self):
183
""" Performs a series of asserts to check that the size of everything
184
is what it should be. This can help you debug problems in your design.
185
186
This is only a test; you do not need to use it while filtering.
187
However, to use you will want to perform at least one predict() and
188
one update() before calling; some bad designs will cause the shapes
189
of x and P to change in a silent and bad way. For example, if you
190
pass in a badly dimensioned z into update that can cause x to be
191
misshapen."""
192
193
assert self._x.shape == (self.dim_x, 1), \
194
"Shape of x must be ({},{}), but is {}".format(
195
self.dim_x, 1, self._x.shape)
196
197
assert self._P.shape == (self.dim_x, self.dim_x), \
198
"Shape of P must be ({},{}), but is {}".format(
199
self.dim_x, self.dim_x, self._P.shape)
200
201
assert self._Q.shape == (self.dim_x, self.dim_x), \
202
"Shape of P must be ({},{}), but is {}".format(
203
self.dim_x, self.dim_x, self._P.shape)
204
205
206
def predict(self, u=0):
207
""" Predict next position.
208
209
**Parameters**
210
211
u : np.array
212
Optional control vector. If non-zero, it is multiplied by B
213
to create the control input into the system.
214
"""
215
216
# x = Fx + Bu
217
self._x = dot(self._F, self.x) + dot(self._B, u)
218
219
# P = FPF' + Q
220
self._P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q
221
222
223
def batch_filter(self, zs, Rs=None, update_first=False):
224
""" Batch processes a sequences of measurements.
225
226
**Parameters**
227
228
zs : list-like
229
list of measurements at each time step `self.dt` Missing
230
measurements must be represented by 'None'.
231
232
Rs : list-like, optional
233
optional list of values to use for the measurement error
234
covariance; a value of None in any position will cause the filter
235
to use `self.R` for that time step.
236
237
update_first : bool, optional,
238
controls whether the order of operations is update followed by
239
predict, or predict followed by update. Default is predict->update.
240
241
**Returns**
242
243
244
means: np.array((n,dim_x,1))
245
array of the state for each time step after the update. Each entry
246
is an np.array. In other words `means[k,:]` is the state at step
247
`k`.
248
249
covariance: np.array((n,dim_x,dim_x))
250
array of the covariances for each time step after the update.
251
In other words `covariance[k,:,:]` is the covariance at step `k`.
252
253
means_predictions: np.array((n,dim_x,1))
254
array of the state for each time step after the predictions. Each
255
entry is an np.array. In other words `means[k,:]` is the state at
256
step `k`.
257
258
covariance_predictions: np.array((n,dim_x,dim_x))
259
array of the covariances for each time step after the prediction.
260
In other words `covariance[k,:,:]` is the covariance at step `k`.
261
"""
262
263
try:
264
z = zs[0]
265
except:
266
assert not isscalar(zs), 'zs must be list-like'
267
268
if self.dim_z == 1:
269
assert isscalar(z) or (z.ndim==1 and len(z) == 1), \
270
'zs must be a list of scalars or 1D, 1 element arrays'
271
272
else:
273
assert len(z) == self.dim_z, 'each element in zs must be a'
274
'1D array of length {}'.format(self.dim_z)
275
276
n = np.size(zs,0)
277
if Rs is None:
278
Rs = [None]*n
279
280
# mean estimates from Kalman Filter
281
if self.x.ndim == 1:
282
means = zeros((n, self.dim_x))
283
means_p = zeros((n, self.dim_x))
284
else:
285
means = zeros((n, self.dim_x, 1))
286
means_p = zeros((n, self.dim_x, 1))
287
288
# state covariances from Kalman Filter
289
covariances = zeros((n, self.dim_x, self.dim_x))
290
covariances_p = zeros((n, self.dim_x, self.dim_x))
291
292
if update_first:
293
for i, (z, r) in enumerate(zip(zs, Rs)):
294
self.update(z, r)
295
means[i,:] = self._x
296
covariances[i,:,:] = self._P
297
298
self.predict()
299
means_p[i,:] = self._x
300
covariances_p[i,:,:] = self._P
301
else:
302
for i, (z, r) in enumerate(zip(zs, Rs)):
303
self.predict()
304
means_p[i,:] = self._x
305
covariances_p[i,:,:] = self._P
306
307
self.update(z, r)
308
means[i,:] = self._x
309
covariances[i,:,:] = self._P
310
311
return (means, covariances, means_p, covariances_p)
312
313
314
315
def rts_smoother(self, Xs, Ps, Qs=None):
316
""" Runs the Rauch-Tung-Striebal Kalman smoother on a set of
317
means and covariances computed by a Kalman filter. The usual input
318
would come from the output of `KalmanFilter.batch_filter()`.
319
320
**Parameters**
321
322
Xs : numpy.array
323
array of the means (state variable x) of the output of a Kalman
324
filter.
325
326
Ps : numpy.array
327
array of the covariances of the output of a kalman filter.
328
329
Q : list-like collection of numpy.array, optional
330
Process noise of the Kalman filter at each time step. Optional,
331
if not provided the filter's self.Q will be used
332
333
334
**Returns**
335
336
'x' : numpy.ndarray
337
smoothed means
338
339
'P' : numpy.ndarray
340
smoothed state covariances
341
342
'K' : numpy.ndarray
343
smoother gain at each step
344
345
346
**Example**::
347
348
zs = [t + random.randn()*4 for t in range (40)]
349
350
(mu, cov, _, _) = kalman.batch_filter(zs)
351
(x, P, K) = rts_smoother(mu, cov, fk.F, fk.Q)
352
353
"""
354
355
assert len(Xs) == len(Ps)
356
shape = Xs.shape
357
n = shape[0]
358
dim_x = shape[1]
359
360
F = self._F
361
if not Qs:
362
Qs = [self.Q] * n
363
364
# smoother gain
365
K = zeros((n,dim_x,dim_x))
366
367
x, P = Xs.copy(), Ps.copy()
368
369
for k in range(n-2,-1,-1):
370
P_pred = dot3(F, P[k], F.T) + Qs[k]
371
372
K[k] = dot3(P[k], F.T, linalg.inv(P_pred))
373
x[k] += dot (K[k], x[k+1] - dot(F, x[k]))
374
P[k] += dot3 (K[k], P[k+1] - P_pred, K[k].T)
375
376
return (x, P, K)
377
378
379
def get_prediction(self, u=0):
380
""" Predicts the next state of the filter and returns it. Does not
381
alter the state of the filter.
382
383
**Parameters**
384
385
u : np.array
386
optional control input
387
388
**Returns**
389
390
(x, P)
391
State vector and covariance array of the prediction.
392
"""
393
394
x = dot(self._F, self._x) + dot(self._B, u)
395
P = self._alpha_sq * dot3(self._F, self._P, self._F.T) + self._Q
396
return (x, P)
397
398
399
def residual_of(self, z):
400
""" returns the residual for the given measurement (z). Does not alter
401
the state of the filter.
402
"""
403
return z - dot(self.H, self._x)
404
405
406
def measurement_of_state(self, x):
407
""" Helper function that converts a state into a measurement.
408
409
**Parameters**
410
411
x : np.array
412
kalman state vector
413
414
**Returns**
415
416
z : np.array
417
measurement corresponding to the given state
418
"""
419
420
return dot(self.H, x)
421
422
423
@property
424
def alpha(self):
425
""" Fading memory setting. 1.0 gives the normal Kalman filter, and
426
values slightly larger than 1.0 (such as 1.02) give a fading
427
memory effect - previous measurements have less influence on the
428
filter's estimates. This formulation of the Fading memory filter
429
(there are many) is due to Dan Simon [1].
430
431
** References **
432
433
[1] Dan Simon. "Optimal State Estimation." John Wiley & Sons.
434
p. 208-212. (2006)
435
"""
436
437
return self._alpha_sq**.5
438
439
440
@alpha.setter
441
def alpha(self, value):
442
assert np.isscalar(value)
443
assert value > 0
444
445
self._alpha_sq = value**2
446
447
@property
448
def Q(self):
449
""" Process uncertainty"""
450
return self._Q
451
452
453
@Q.setter
454
def Q(self, value):
455
self._Q = setter_scalar(value, self.dim_x)
456
457
@property
458
def P(self):
459
""" covariance matrix"""
460
return self._P
461
462
463
@P.setter
464
def P(self, value):
465
self._P = setter_scalar(value, self.dim_x)
466
467
468
@property
469
def F(self):
470
""" state transition matrix"""
471
return self._F
472
473
474
@F.setter
475
def F(self, value):
476
self._F = setter(value, self.dim_x, self.dim_x)
477
478
@property
479
def B(self):
480
""" control transition matrix"""
481
return self._B
482
483
484
@B.setter
485
def B(self, value):
486
""" control transition matrix"""
487
if np.isscalar(value):
488
self._B = value
489
else:
490
self._B = setter (value, self.dim_x, self.dim_u)
491
492
493
@property
494
def x(self):
495
""" filter state vector."""
496
return self._x
497
498
499
@x.setter
500
def x(self, value):
501
self._x = setter_1d(value, self.dim_x)
502
503
504
@property
505
def K(self):
506
""" Kalman gain """
507
return self._K
508
509
510
@property
511
def y(self):
512
""" measurement residual (innovation) """
513
return self._y
514
515
@property
516
def S(self):
517
""" system uncertainty in measurement space """
518
return self._S
519
520