Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
96129 views
1
# -*- coding: utf-8 -*-
2
3
"""Copyright 2015 Roger R Labbe Jr.
4
5
FilterPy library.
6
http://github.com/rlabbe/filterpy
7
8
Documentation at:
9
https://filterpy.readthedocs.org
10
11
Supporting book at:
12
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
13
14
This is licensed under an MIT license. See the readme.MD file
15
for more information.
16
"""
17
18
19
from __future__ import (absolute_import, division, print_function,
20
unicode_literals)
21
import numpy as np
22
from scipy.linalg import inv
23
from numpy import dot, zeros, eye, outer
24
from numpy.random import multivariate_normal
25
from filterpy.common import dot3
26
27
28
class EnsembleKalmanFilter(object):
29
""" This implements the ensemble Kalman filter (EnKF). The EnKF uses
30
an ensemble of hundreds to thousands of state vectors that are randomly
31
sampled around the estimate, and adds perturbations at each update and
32
predict step. It is useful for extremely large systems such as found
33
in hydrophysics. As such, this class is admittedly a toy as it is far
34
too slow with large N.
35
36
There are many versions of this sort of this filter. This formulation is
37
due to Crassidis and Junkins [1]. It works with both linear and nonlinear
38
systems.
39
40
**References**
41
42
- [1] John L Crassidis and John L. Junkins. "Optimal Estimation of
43
Dynamic Systems. CRC Press, second edition. 2012. pp, 257-9.
44
"""
45
46
def __init__(self, x, P, dim_z, dt, N, hx, fx):
47
""" Create a Kalman filter. You are responsible for setting the
48
various state variables to reasonable values; the defaults below will
49
not give you a functional filter.
50
51
**Parameters**
52
53
x : np.array(dim_z)
54
state mean
55
56
P : np.array((dim_x, dim_x))
57
covariance of the state
58
59
dim_z : int
60
Number of of measurement inputs. For example, if the sensor
61
provides you with position in (x,y), dim_z would be 2.
62
63
dt : float
64
time step in seconds
65
66
N : int
67
number of sigma points (ensembles). Must be greater than 1.
68
69
hx : function hx(x)
70
Measurement function. May be linear or nonlinear - converts state
71
x into a measurement. Return must be an np.array of the same
72
dimensionality as the measurement vector.
73
74
fx : function fx(x, dt)
75
State transition function. May be linear or nonlinear. Projects
76
state x into the next time period. Returns the projected state x.
77
78
**Example**
79
80
.. code::
81
82
def hx(x):
83
return np.array([x[0]])
84
85
F = np.array([[1., 1.],
86
[0., 1.]])
87
def fx(x, dt):
88
return np.dot(F, x)
89
90
x = np.array([0., 1.])
91
P = np.eye(2) * 100.
92
dt = 0.1
93
f = EnKF(x=x, P=P, dim_z=1, dt=dt, N=8,
94
hx=hx, fx=fx)
95
96
std_noise = 3.
97
f.R *= std_noise**2
98
f.Q = Q_discrete_white_noise(2, dt, .01)
99
100
while True:
101
z = read_sensor()
102
f.predict()
103
f.update(np.asarray([z]))
104
105
"""
106
107
assert dim_z > 0
108
109
self.dim_x = len(x)
110
self.dim_z = dim_z
111
self.dt = dt
112
self.N = N
113
self.hx = hx
114
self.fx = fx
115
116
self.Q = eye(self.dim_x) # process uncertainty
117
self.R = eye(self.dim_z) # state uncertainty
118
self.mean = [0]*self.dim_x
119
self.initialize(x, P)
120
121
122
def initialize(self, x, P):
123
""" Initializes the filter with the specified mean and
124
covariance. Only need to call this if you are using the filter
125
to filter more than one set of data; this is called by __init__
126
127
**Parameters**
128
129
x : np.array(dim_z)
130
state mean
131
132
P : np.array((dim_x, dim_x))
133
covariance of the state
134
"""
135
assert x.ndim == 1
136
self.sigmas = multivariate_normal(mean=x, cov=P, size=self.N)
137
138
self.x = x
139
self.P = P
140
141
142
def update(self, z, R=None):
143
"""
144
Add a new measurement (z) to the kalman filter. If z is None, nothing
145
is changed.
146
147
**Parameters**
148
149
z : np.array
150
measurement for this update.
151
152
R : np.array, scalar, or None
153
Optionally provide R to override the measurement noise for this
154
one call, otherwise self.R will be used.
155
"""
156
157
if z is None:
158
return
159
160
if R is None:
161
R = self.R
162
if np.isscalar(R):
163
R = eye(self.dim_z) * R
164
165
N = self.N
166
dim_z = len(z)
167
sigmas_h = zeros((N, dim_z))
168
169
# transform sigma points into measurement space
170
for i in range(N):
171
sigmas_h[i] = self.hx(self.sigmas[i])
172
173
z_mean = np.mean(sigmas_h, axis=0)
174
175
P_zz = 0
176
for sigma in sigmas_h:
177
s = sigma - z_mean
178
P_zz += outer(s, s)
179
P_zz = P_zz / (N-1) + R
180
181
P_xz = 0
182
for i in range(N):
183
P_xz += outer(self.sigmas[i] - self.x, sigmas_h[i] - z_mean)
184
P_xz /= N-1
185
186
K = dot(P_xz, inv(P_zz))
187
188
e_r = multivariate_normal([0]*dim_z, R, N)
189
for i in range(N):
190
self.sigmas[i] += dot(K, z + e_r[i] - sigmas_h[i])
191
192
self.x = np.mean(self.sigmas, axis=0)
193
self.P = self.P - dot3(K, P_zz, K.T)
194
195
196
def predict(self):
197
""" Predict next position. """
198
199
N = self.N
200
for i, s in enumerate(self.sigmas):
201
self.sigmas[i] = self.fx(s, self.dt)
202
203
e = multivariate_normal(self.mean, self.Q, N)
204
self.sigmas += e
205
#self.x = np.mean(self.sigmas , axis=0)
206
207
P = 0
208
for s in self.sigmas:
209
sx = s - self.x
210
P += outer(sx, sx)
211
212
self.P = P / (N-1)
213
214