Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Download
96131 views
1
# -*- coding: utf-8 -*-
2
3
"""Copyright 2015 Roger R Labbe Jr.
4
5
FilterPy library.
6
http://github.com/rlabbe/filterpy
7
8
Documentation at:
9
https://filterpy.readthedocs.org
10
11
Supporting book at:
12
https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python
13
14
This is licensed under an MIT license. See the readme.MD file
15
for more information.
16
6"""
17
18
from __future__ import (absolute_import, division, print_function,
19
unicode_literals)
20
21
22
import matplotlib.pyplot as plt
23
import numpy.random as random
24
from numpy.random import randn
25
import numpy as np
26
from filterpy.kalman import UnscentedKalmanFilter as UKF
27
from filterpy.kalman import (unscented_transform, MerweScaledSigmaPoints,
28
JulierSigmaPoints)
29
from filterpy.common import stats, Q_discrete_white_noise
30
from math import cos, sin
31
32
33
DO_PLOT = False
34
35
36
def test_sigma_plot():
37
""" Test to make sure sigma's correctly mirror the shape and orientation
38
of the covariance array."""
39
40
x = np.array([[1, 2]])
41
P = np.array([[2, 1.2],
42
[1.2, 2]])
43
kappa = .1
44
45
# if kappa is larger, than points shoudld be closer together
46
47
sp0 = JulierSigmaPoints(n=2, kappa=kappa)
48
sp1 = JulierSigmaPoints(n=2, kappa=kappa*1000)
49
50
w0, _ = sp0.weights()
51
w1, _ = sp1.weights()
52
53
Xi0 = sp0.sigma_points (x, P)
54
Xi1 = sp1.sigma_points (x, P)
55
56
assert max(Xi1[:,0]) > max(Xi0[:,0])
57
assert max(Xi1[:,1]) > max(Xi0[:,1])
58
59
if DO_PLOT:
60
plt.figure()
61
for i in range(Xi0.shape[0]):
62
plt.scatter((Xi0[i,0]-x[0, 0])*w0[i] + x[0, 0],
63
(Xi0[i,1]-x[0, 1])*w0[i] + x[0, 1],
64
color='blue')
65
66
for i in range(Xi1.shape[0]):
67
plt.scatter((Xi1[i, 0]-x[0, 0]) * w1[i] + x[0,0],
68
(Xi1[i, 1]-x[0, 1]) * w1[i] + x[0,1],
69
color='green')
70
71
stats.plot_covariance_ellipse([1, 2], P)
72
73
74
def test_julier_weights():
75
for n in range(1,15):
76
for k in np.linspace(0,5,0.1):
77
Wm = UKF.weights(n, k)
78
79
assert abs(sum(Wm) - 1) < 1.e-12
80
81
82
def test_scaled_weights():
83
for n in range(1,5):
84
for alpha in np.linspace(0.99, 1.01, 100):
85
for beta in range(0,2):
86
for kappa in range(0,2):
87
sp = MerweScaledSigmaPoints(n, alpha, 0, 3-n)
88
Wm, Wc = sp.weights()
89
assert abs(sum(Wm) - 1) < 1.e-1
90
assert abs(sum(Wc) - 1) < 1.e-1
91
92
93
def test_sigma_points_1D():
94
""" tests passing 1D data into sigma_points"""
95
96
kappa = 0.
97
sp = JulierSigmaPoints(1, kappa)
98
99
#ukf = UKF(dim_x=1, dim_z=1, dt=0.1, hx=None, fx=None, kappa=kappa)
100
101
Wm, Wc = sp.weights()
102
assert np.allclose(Wm, Wc, 1e-12)
103
assert len(Wm) == 3
104
105
mean = 5
106
cov = 9
107
108
Xi = sp.sigma_points (mean, cov)
109
xm, ucov = unscented_transform(Xi,Wm, Wc, 0)
110
111
# sum of weights*sigma points should be the original mean
112
m = 0.0
113
for x, w in zip(Xi, Wm):
114
m += x*w
115
116
assert abs(m-mean) < 1.e-12
117
assert abs(xm[0] - mean) < 1.e-12
118
assert abs(ucov[0,0]-cov) < 1.e-12
119
120
assert Xi.shape == (3,1)
121
122
123
124
class RadarSim(object):
125
def __init__(self, dt):
126
self.x = 0
127
self.dt = dt
128
129
def get_range(self):
130
vel = 100 + 5*randn()
131
alt = 1000 + 10*randn()
132
self.x += vel*self.dt
133
134
v = self.x * 0.05*randn()
135
rng = (self.x**2 + alt**2)**.5 + v
136
return rng
137
138
139
def test_radar():
140
def fx(x, dt):
141
A = np.eye(3) + dt * np.array ([[0, 1, 0],
142
[0, 0, 0],
143
[0, 0, 0]])
144
return A.dot(x)
145
146
def hx(x):
147
return np.sqrt (x[0]**2 + x[2]**2)
148
149
dt = 0.05
150
151
sp = JulierSigmaPoints(n=3, kappa=0.)
152
kf = UKF(3, 1, dt, fx=fx, hx=hx, points=sp)
153
154
kf.Q *= 0.01
155
kf.R = 10
156
kf.x = np.array([0., 90., 1100.])
157
kf.P *= 100.
158
radar = RadarSim(dt)
159
160
t = np.arange(0,20+dt, dt)
161
162
n = len(t)
163
164
xs = np.zeros((n,3))
165
166
random.seed(200)
167
rs = []
168
#xs = []
169
for i in range(len(t)):
170
r = radar.get_range()
171
#r = GetRadar(dt)
172
kf.predict()
173
kf.update(z=[r])
174
175
xs[i,:] = kf.x
176
rs.append(r)
177
178
if DO_PLOT:
179
print(xs[:,0].shape)
180
181
plt.figure()
182
plt.subplot(311)
183
plt.plot(t, xs[:,0])
184
plt.subplot(312)
185
plt.plot(t, xs[:,1])
186
plt.subplot(313)
187
188
plt.plot(t, xs[:,2])
189
190
191
def test_linear_2d():
192
""" should work like a linear KF if problem is linear """
193
194
195
def fx(x, dt):
196
F = np.array([[1, dt, 0, 0],
197
[0, 1, 0, 0],
198
[0, 0, 1, dt],
199
[0, 0, 0, 1]], dtype=float)
200
201
return np.dot(F, x)
202
203
def hx(x):
204
return np.array([x[0], x[2]])
205
206
207
dt = 0.1
208
points = MerweScaledSigmaPoints(4, .1, 2., -1)
209
kf = UKF(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
210
211
212
kf.x = np.array([-1., 1., -1., 1])
213
kf.P*=0.0001
214
#kf.R *=0
215
#kf.Q
216
217
zs = []
218
for i in range(20):
219
z = np.array([i+randn()*0.1, i+randn()*0.1])
220
zs.append(z)
221
222
223
224
Ms, Ps = kf.batch_filter(zs)
225
smooth_x, _, _ = kf.rts_smoother(Ms, Ps, dt=dt)
226
227
if DO_PLOT:
228
zs = np.asarray(zs)
229
230
#plt.plot(zs[:,0])
231
plt.plot(Ms[:,0])
232
plt.plot(smooth_x[:,0], smooth_x[:,2])
233
234
print(smooth_x)
235
236
237
def test_batch_missing_data():
238
""" batch filter should accept missing data with None in the measurements """
239
240
241
def fx(x, dt):
242
F = np.array([[1, dt, 0, 0],
243
[0, 1, 0, 0],
244
[0, 0, 1, dt],
245
[0, 0, 0, 1]], dtype=float)
246
247
return np.dot(F, x)
248
249
def hx(x):
250
return np.array([x[0], x[2]])
251
252
253
dt = 0.1
254
points = MerweScaledSigmaPoints(4, .1, 2., -1)
255
kf = UKF(dim_x=4, dim_z=2, dt=dt, fx=fx, hx=hx, points=points)
256
257
258
kf.x = np.array([-1., 1., -1., 1])
259
kf.P*=0.0001
260
261
zs = []
262
for i in range(20):
263
z = np.array([i+randn()*0.1, i+randn()*0.1])
264
zs.append(z)
265
266
zs[2] = None
267
Rs = [1]*len(zs)
268
Rs[2] = None
269
Ms, Ps = kf.batch_filter(zs)
270
271
272
def test_rts():
273
def fx(x, dt):
274
A = np.eye(3) + dt * np.array ([[0, 1, 0],
275
[0, 0, 0],
276
[0, 0, 0]])
277
f = np.dot(A, x)
278
return f
279
280
def hx(x):
281
return np.sqrt (x[0]**2 + x[2]**2)
282
283
dt = 0.05
284
285
sp = JulierSigmaPoints(n=3, kappa=1.)
286
kf = UKF(3, 1, dt, fx=fx, hx=hx, points=sp)
287
288
kf.Q *= 0.01
289
kf.R = 10
290
kf.x = np.array([0., 90., 1100.])
291
kf.P *= 100.
292
radar = RadarSim(dt)
293
294
t = np.arange(0,20+dt, dt)
295
296
n = len(t)
297
298
xs = np.zeros((n,3))
299
300
random.seed(200)
301
rs = []
302
#xs = []
303
for i in range(len(t)):
304
r = radar.get_range()
305
#r = GetRadar(dt)
306
kf.predict()
307
kf.update(z=[r])
308
309
xs[i,:] = kf.x
310
rs.append(r)
311
312
313
kf.x = np.array([0., 90., 1100.])
314
kf.P = np.eye(3)*100
315
M, P = kf.batch_filter(rs)
316
assert np.array_equal(M, xs), "Batch filter generated different output"
317
318
Qs = [kf.Q]*len(t)
319
M2, P2, K = kf.rts_smoother(Xs=M, Ps=P, Qs=Qs)
320
321
322
if DO_PLOT:
323
print(xs[:,0].shape)
324
325
plt.figure()
326
plt.subplot(311)
327
plt.plot(t, xs[:,0])
328
plt.plot(t, M2[:,0], c='g')
329
plt.subplot(312)
330
plt.plot(t, xs[:,1])
331
plt.plot(t, M2[:,1], c='g')
332
plt.subplot(313)
333
334
plt.plot(t, xs[:,2])
335
plt.plot(t, M2[:,2], c='g')
336
337
338
def test_fixed_lag():
339
def fx(x, dt):
340
A = np.eye(3) + dt * np.array ([[0, 1, 0],
341
[0, 0, 0],
342
[0, 0, 0]])
343
f = np.dot(A, x)
344
return f
345
346
def hx(x):
347
return np.sqrt (x[0]**2 + x[2]**2)
348
349
dt = 0.05
350
351
sp = JulierSigmaPoints(n=3, kappa=0)
352
353
kf = UKF(3, 1, dt, fx=fx, hx=hx, points=sp)
354
355
kf.Q *= 0.01
356
kf.R = 10
357
kf.x = np.array([0., 90., 1100.])
358
kf.P *= 1.
359
radar = RadarSim(dt)
360
361
t = np.arange(0,20+dt, dt)
362
363
n = len(t)
364
365
xs = np.zeros((n,3))
366
367
random.seed(200)
368
rs = []
369
#xs = []
370
371
M = []
372
P = []
373
N =10
374
flxs = []
375
for i in range(len(t)):
376
r = radar.get_range()
377
#r = GetRadar(dt)
378
kf.predict()
379
kf.update(z=[r])
380
381
xs[i,:] = kf.x
382
flxs.append(kf.x)
383
rs.append(r)
384
M.append(kf.x)
385
P.append(kf.P)
386
print(i)
387
#print(i, np.asarray(flxs)[:,0])
388
if i == 20 and len(M) >= N:
389
try:
390
M2, P2, K = kf.rts_smoother(Xs=np.asarray(M)[-N:], Ps=np.asarray(P)[-N:])
391
flxs[-N:] = M2
392
#flxs[-N:] = [20]*N
393
except:
394
print('except', i)
395
#P[-N:] = P2
396
397
398
kf.x = np.array([0., 90., 1100.])
399
kf.P = np.eye(3)*100
400
M, P = kf.batch_filter(rs)
401
402
Qs = [kf.Q]*len(t)
403
M2, P2, K = kf.rts_smoother(Xs=M, Ps=P, Qs=Qs)
404
405
406
flxs = np.asarray(flxs)
407
print(xs[:,0].shape)
408
409
plt.figure()
410
plt.subplot(311)
411
plt.plot(t, xs[:,0])
412
plt.plot(t, flxs[:,0], c='r')
413
plt.plot(t, M2[:,0], c='g')
414
plt.subplot(312)
415
plt.plot(t, xs[:,1])
416
plt.plot(t, flxs[:,1], c='r')
417
plt.plot(t, M2[:,1], c='g')
418
419
plt.subplot(313)
420
plt.plot(t, xs[:,2])
421
plt.plot(t, flxs[:,2], c='r')
422
plt.plot(t, M2[:,2], c='g')
423
424
425
426
def test_circle():
427
from filterpy.kalman import KalmanFilter
428
from math import radians
429
def hx(x):
430
radius = x[0]
431
angle = x[1]
432
x = cos(radians(angle)) * radius
433
y = sin(radians(angle)) * radius
434
return np.array([x, y])
435
436
def fx(x, dt):
437
return np.array([x[0], x[1]+x[2], x[2]])
438
439
std_noise = .1
440
441
sp = JulierSigmaPoints(n=3, kappa=0.)
442
f = UKF(dim_x=3, dim_z=2, dt=.01, hx=hx, fx=fx, points=sp)
443
f.x = np.array([50., 90., 0])
444
f.P *= 100
445
f.R = np.eye(2)*(std_noise**2)
446
f.Q = np.eye(3)*.001
447
f.Q[0,0]=0
448
f.Q[2,2]=0
449
450
kf = KalmanFilter(dim_x=6, dim_z=2)
451
kf.x = np.array([50., 0., 0, 0, .0, 0.])
452
453
F = np.array([[1., 1., .5, 0., 0., 0.],
454
[0., 1., 1., 0., 0., 0.],
455
[0., 0., 1., 0., 0., 0.],
456
[0., 0., 0., 1., 1., .5],
457
[0., 0., 0., 0., 1., 1.],
458
[0., 0., 0., 0., 0., 1.]])
459
460
kf.F = F
461
kf.P*= 100
462
kf.H = np.array([[1,0,0,0,0,0],
463
[0,0,0,1,0,0]])
464
465
466
kf.R = f.R
467
kf.Q[0:3, 0:3] = Q_discrete_white_noise(3, 1., .00001)
468
kf.Q[3:6, 3:6] = Q_discrete_white_noise(3, 1., .00001)
469
470
measurements = []
471
results = []
472
473
zs = []
474
kfxs = []
475
for t in range (0,12000):
476
a = t / 30 + 90
477
x = cos(radians(a)) * 50.+ randn() * std_noise
478
y = sin(radians(a)) * 50. + randn() * std_noise
479
# create measurement = t plus white noise
480
z = np.array([x,y])
481
zs.append(z)
482
483
f.predict()
484
f.update(z)
485
486
kf.predict()
487
kf.update(z)
488
489
# save data
490
results.append (hx(f.x))
491
kfxs.append(kf.x)
492
#print(f.x)
493
494
results = np.asarray(results)
495
zs = np.asarray(zs)
496
kfxs = np.asarray(kfxs)
497
498
print(results)
499
if DO_PLOT:
500
plt.plot(zs[:,0], zs[:,1], c='r', label='z')
501
plt.plot(results[:,0], results[:,1], c='k', label='UKF')
502
plt.plot(kfxs[:,0], kfxs[:,3], c='g', label='KF')
503
plt.legend(loc='best')
504
plt.axis('equal')
505
506
507
508
def kf_circle():
509
from filterpy.kalman import KalmanFilter
510
from math import radians
511
import math
512
def hx(x):
513
radius = x[0]
514
angle = x[1]
515
x = cos(radians(angle)) * radius
516
y = sin(radians(angle)) * radius
517
return np.array([x, y])
518
519
def fx(x, dt):
520
return np.array([x[0], x[1]+x[2], x[2]])
521
522
523
def hx_inv(x, y):
524
angle = math.atan2(y,x)
525
radius = math.sqrt(x*x + y*y)
526
return np.array([radius, angle])
527
528
529
std_noise = .1
530
531
532
kf = KalmanFilter(dim_x=3, dim_z=2)
533
kf.x = np.array([50., 0., 0.])
534
535
F = np.array([[1., 0, 0.],
536
[0., 1., 1.,],
537
[0., 0., 1.,]])
538
539
kf.F = F
540
kf.P*= 100
541
kf.H = np.array([[1,0,0],
542
[0,1,0]])
543
544
kf.R = np.eye(2)*(std_noise**2)
545
#kf.Q[0:3, 0:3] = Q_discrete_white_noise(3, 1., .00001)
546
547
548
549
zs = []
550
kfxs = []
551
for t in range (0,2000):
552
a = t / 30 + 90
553
x = cos(radians(a)) * 50.+ randn() * std_noise
554
y = sin(radians(a)) * 50. + randn() * std_noise
555
556
z = hx_inv(x,y)
557
zs.append(z)
558
559
kf.predict()
560
kf.update(z)
561
562
# save data
563
kfxs.append(kf.x)
564
565
zs = np.asarray(zs)
566
kfxs = np.asarray(kfxs)
567
568
569
if DO_PLOT:
570
plt.plot(zs[:,0], zs[:,1], c='r', label='z')
571
plt.plot(kfxs[:,0], kfxs[:,1], c='g', label='KF')
572
plt.legend(loc='best')
573
plt.axis('equal')
574
575
576
577
578
def two_radar():
579
580
# code is not complete - I was using to test RTS smoother. very similar
581
# to two_radary.py in book.
582
583
import numpy as np
584
import matplotlib.pyplot as plt
585
586
from numpy import array
587
from numpy.linalg import norm
588
from numpy.random import randn
589
from math import atan2, radians
590
591
from filterpy.common import Q_discrete_white_noise
592
593
class RadarStation(object):
594
595
def __init__(self, pos, range_std, bearing_std):
596
self.pos = asarray(pos)
597
598
self.range_std = range_std
599
self.bearing_std = bearing_std
600
601
602
def reading_of(self, ac_pos):
603
""" Returns range and bearing to aircraft as tuple. bearing is in
604
radians.
605
"""
606
607
diff = np.subtract(self.pos, ac_pos)
608
rng = norm(diff)
609
brg = atan2(diff[1], diff[0])
610
return rng, brg
611
612
613
def noisy_reading(self, ac_pos):
614
rng, brg = self.reading_of(ac_pos)
615
rng += randn() * self.range_std
616
brg += randn() * self.bearing_std
617
return rng, brg
618
619
620
621
622
class ACSim(object):
623
624
def __init__(self, pos, vel, vel_std):
625
self.pos = asarray(pos, dtype=float)
626
self.vel = asarray(vel, dtype=float)
627
self.vel_std = vel_std
628
629
630
def update(self):
631
vel = self.vel + (randn() * self.vel_std)
632
self.pos += vel
633
634
return self.pos
635
636
dt = 1.
637
638
639
def hx(x):
640
r1, b1 = hx.R1.reading_of((x[0], x[2]))
641
r2, b2 = hx.R2.reading_of((x[0], x[2]))
642
643
return array([r1, b1, r2, b2])
644
pass
645
646
647
648
def fx(x, dt):
649
x_est = x.copy()
650
x_est[0] += x[1]*dt
651
x_est[2] += x[3]*dt
652
return x_est
653
654
655
656
vx, vy = 0.1, 0.1
657
658
f = UnscentedKalmanFilter(dim_x=4, dim_z=4, dt=dt, hx=hx, fx=fx, kappa=0)
659
aircraft = ACSim ((100,100), (vx*dt,vy*dt), 0.00000002)
660
661
662
range_std = 0.001 # 1 meter
663
bearing_std = 1/1000 # 1mrad
664
665
R1 = RadarStation ((0,0), range_std, bearing_std)
666
R2 = RadarStation ((200,0), range_std, bearing_std)
667
668
hx.R1 = R1
669
hx.R2 = R2
670
671
f.x = array([100, vx, 100, vy])
672
673
f.R = np.diag([range_std**2, bearing_std**2, range_std**2, bearing_std**2])
674
q = Q_discrete_white_noise(2, var=0.0002, dt=dt)
675
#q = np.array([[0,0],[0,0.0002]])
676
f.Q[0:2, 0:2] = q
677
f.Q[2:4, 2:4] = q
678
f.P = np.diag([.1, 0.01, .1, 0.01])
679
680
681
track = []
682
zs = []
683
684
685
for i in range(int(300/dt)):
686
687
pos = aircraft.update()
688
689
r1, b1 = R1.noisy_reading(pos)
690
r2, b2 = R2.noisy_reading(pos)
691
692
z = np.array([r1, b1, r2, b2])
693
zs.append(z)
694
track.append(pos.copy())
695
696
zs = asarray(zs)
697
698
699
xs, Ps, Pxz, pM, pP = f.batch_filter(zs)
700
ms, _, _ = f.rts_smoother(xs, Ps)
701
702
track = asarray(track)
703
time = np.arange(0,len(xs)*dt, dt)
704
705
plt.figure()
706
plt.subplot(411)
707
plt.plot(time, track[:,0])
708
plt.plot(time, xs[:,0])
709
plt.legend(loc=4)
710
plt.xlabel('time (sec)')
711
plt.ylabel('x position (m)')
712
plt.tight_layout()
713
714
715
716
plt.subplot(412)
717
plt.plot(time, track[:,1])
718
plt.plot(time, xs[:,2])
719
plt.legend(loc=4)
720
plt.xlabel('time (sec)')
721
plt.ylabel('y position (m)')
722
plt.tight_layout()
723
724
725
plt.subplot(413)
726
plt.plot(time, xs[:,1])
727
plt.plot(time, ms[:,1])
728
plt.legend(loc=4)
729
plt.ylim([0, 0.2])
730
plt.xlabel('time (sec)')
731
plt.ylabel('x velocity (m/s)')
732
plt.tight_layout()
733
734
plt.subplot(414)
735
plt.plot(time, xs[:,3])
736
plt.plot(time, ms[:,3])
737
plt.ylabel('y velocity (m/s)')
738
plt.legend(loc=4)
739
plt.xlabel('time (sec)')
740
plt.tight_layout()
741
plt.show()
742
743
744
if __name__ == "__main__":
745
746
DO_PLOT = True
747
748
749
test_batch_missing_data()
750
751
#test_linear_2d()
752
#test_sigma_points_1D()
753
#test_fixed_lag()
754
#DO_PLOT = True
755
#test_rts()
756
#kf_circle()
757
#test_circle()
758
759
760
'''test_1D_sigma_points()
761
#plot_sigma_test ()
762
763
x = np.array([[1,2]])
764
P = np.array([[2, 1.2],
765
[1.2, 2]])\
766
767
768
kappa = .1
769
770
xi,w = sigma_points (x,P,kappa)
771
xm, cov = unscented_transform(xi, w)'''
772
#test_radar()
773
#test_sigma_plot()
774
#test_julier_weights()
775
#test_scaled_weights()
776
777
#print('xi=\n',Xi)
778
"""
779
xm, cov = unscented_transform(Xi, W)
780
print(xm)
781
print(cov)"""
782
# sigma_points ([5,2],9*np.eye(2), 2)
783
784
785