To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit d1fceec6 authored by widmerl's avatar widmerl
Browse files

improved vision

parent ee6fd905
'''
================================================================================
Finding Marker Balls, labeling them and calculating distance and angle estimates
================================================================================
Using HSV Color Masks this finds the four biggest green balls, detects their positions and
measures distance to the camera (dist) as well as the two angles that define the position
of the camera in the frame of the Marker. Zero is perpendicular to the Marker Plane.
phi_ud for up/down and phi_lr for left/right. The Output Values are filtered by a kalman filter.
Constants:
WIDTH should contain the Width of the Video Frame
BLURR defines the Size of the Gaussian Blurr filter. Should be between 5 and 30
ACT_SIZE defines the distance between the centres of the Markers in mm
FOV defines the field ov view of the camera.
Marker Structure:
ACT_SIZE at least 100mm
diameter at least 30mm
G means coloured in Green
ACT_SIZE
|----------|
___ ___
/ G \ / G \
\___/ \___/ -
|
|ACT_SIZE
___ ___ |
/ G \ / G \ |
\___/ \___/ -
|---|
diameter
Copyright 2019 Lars Widmer
'''
# import the necessary packages
from collections import deque
from imutils.video import VideoStream
import numpy as np
import math
import argparse
import cv2
import imutils
import time
import matplotlib.pyplot as plt
from operator import itemgetter
from pykalman import KalmanFilter
WIDTH = 600
BLURR = 11
ACT_SIZE = 100
FOV = 70
greenLower = (50, 50, 40)
greenUpper = (80, 255, 255)
blueLower = (90,40,40)
blueUpper = (110,255,255)
magentaLower = (130,80,30)
magentaUpper = (150,255,255)
upper = greenUpper
lower = greenLower
def angle(p1, p2):
return (math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)/WIDTH)*FOV
#initialize font
fontface = cv2.FONT_HERSHEY_SIMPLEX
fontscale = 0.5
fontcolor = (255, 255, 255)
# #initialize Kalman Filter
# kf = KalmanFilter(
# transition_matrix
# observation_matrix
# initial_transition_covariance
# initial_observation_covariance
# transition_offsets
# observation_offset
# initial_state_mean
# initial_state_covariance
# random_state
# )
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video",
help="path to the (optional) video file")
help="path to the (optional) video file")
ap.add_argument("-b", "--buffer", type=int, default=64,
help="max buffer size")
help="max buffer size")
args = vars(ap.parse_args())
# define the lower and upper boundaries of the "green"
# ball in the HSV color space, then initialize the
# list of tracked points
greenLower = (29, 86, 6)
greenUpper = (64, 255, 255)
redLower1 = (0, 86, 6)
redUpper1 = (8, 255, 255)
redLower2 = (90, 86, 6)
redUpper2 = (100, 255, 255)
points = []
t_meas_past = []
t_meas_kalman = []
fig, ax = plt.subplots()
pts = deque(maxlen=args["buffer"])
# if a video path was not supplied, grab the reference
# to the webcam
if not args.get("video", False):
vs = VideoStream(src=0).start()
vs = VideoStream(src=0).start()
# otherwise, grab a reference to the video file
else:
vs = cv2.VideoCapture(args["video"])
vs = cv2.VideoCapture(args["video"])
# allow the camera or video file to warm up
time.sleep(0.2)
# keep looping
while True:
# grab the current frame
frame = vs.read()
# handle the frame from VideoCapture or VideoStream
frame = frame[1] if args.get("video", False) else frame
# if we are viewing a video and we did not grab a frame,
# then we have reached the end of the video
if frame is None:
break
# resize the frame, blur it, and convert it to the HSV
# color space
frame = imutils.resize(frame, width=600)
blurred = cv2.GaussianBlur(frame, (11, 11), 0)
hsv = cv2.cvtColor(blurred, cv2.COLOR_BGR2HSV)
# construct a mask for the color "green", then perform
# a series of dilations and erosions to remove any small
# blobs left in the mask
maskGreen = cv2.inRange(hsv, greenLower, greenUpper)
maskGreen = cv2.erode(maskGreen, None, iterations=2)
maskGreen = cv2.dilate(maskGreen, None, iterations=2)
maskRed1 = cv2.inRange(hsv, redLower1, redUpper1)
maskRed2 = cv2.inRange(hsv, redLower1, redUpper1)
maskRed = cv2.bitwise_or(maskRed1, maskRed2)
maskRed = cv2.erode(maskRed, None, iterations=2)
maskRed = cv2.dilate(maskRed, None, iterations=2)
# find contours in the mask and initialize the current
# (x, y) center of the ball
cntsGreen = cv2.findContours(maskGreen.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
cntsGreen = imutils.grab_contours(cntsGreen)
cntsRed = cv2.findContours(maskRed.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
cntsRed = imutils.grab_contours(cntsRed)
centerRed = None
centerGreen = None
# only proceed if at least one Red contour was found
if len(cntsRed) > 0:
# find the largest contour in the mask, then use
# it to compute the minimum enclosing circle and
# centroid
cR = max(cntsRed, key=cv2.contourArea)
((xR, yR), radiusR) = cv2.minEnclosingCircle(cR)
print('Red position: ', int(xR), ',',int(yR),'radius: ', int(radiusR))
MR = cv2.moments(cR)
centerR = (int(MR["m10"] / MR["m00"]), int(MR["m01"] / MR["m00"]))
# only proceed if the radius meets a minimum size
if radiusR > 10:
# draw the circle and centroid on the frame,
# then update the list of tracked points
cv2.circle(frame, (int(xR), int(yR)), int(radiusR), (0, 255, 255), 2)
cv2.circle(frame, centerR, 5, (0, 0, 255), -1)
# only proceed if at least one Green contour was found
if len(cntsGreen) > 0:
# find the largest contour in the mask, then use
# it to compute the minimum enclosing circle and
# centroid
cG = max(cntsGreen, key=cv2.contourArea)
((xG, yG), radiusG) = cv2.minEnclosingCircle(cG)
print('Green position: ', int(xG), ',',int(yG),'radius: ', int(radiusG))
MG = cv2.moments(cG)
centerG = (int(MG["m10"] / MG["m00"]), int(MG["m01"] / MG["m00"]))
# only proceed if the radius meets a minimum size
if radiusG > 2:
# draw the circle and centroid on the frame,
# then update the list of tracked points
cv2.circle(frame, (int(xG), int(yG)), int(radiusG), (0, 255, 255), 2)
cv2.circle(frame, centerG, 5, (0, 0, 255), -1)
#clear points queue
points = []
# grab the current frame
frame = vs.read()
# handle the frame from VideoCapture or VideoStream
frame = frame[1] if args.get("video", False) else frame
# if we are viewing a video and we did not grab a frame,
# then we have reached the end of the video
if frame is None:
break
# resize the frame, blur it, and convert it to the HSV
# color space
frame = imutils.resize(frame, width=WIDTH)
BLURRed = cv2.GaussianBlur(frame, (BLURR, BLURR), 0)
cv2.imshow("BLURRed", BLURRed)
hsv = cv2.cvtColor(BLURRed, cv2.COLOR_BGR2HSV)
# construct a mask for the color "green", then perform
# a series of dilations and erosions to remove any small
# blobs left in the mask
mask = cv2.inRange(hsv, lower, upper)
cv2.imshow("Masked", mask)
mask = cv2.erode(mask, None, iterations=2)
cv2.imshow("Eroded", mask)
mask = cv2.dilate(mask, None, iterations=2)
cv2.imshow("Dilated", mask)
# find contours in the mask and initialize the current
# (x, y) center of the ball
cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
cnts = imutils.grab_contours(cnts)
center = None
# only proceed if at least one contour was found
if len(cnts) > 0:
# find the largest contour in the mask, then use
# it to compute the minimum enclosing circle and
# centroid
cnts = sorted(cnts, key=cv2.contourArea)
#print(cnts)
cnts = cnts[-4:]
for c in cnts:
((x, y), radius) = cv2.minEnclosingCircle(c)
#print('Green position: ', int(x), ',', int(y), 'radius: ', int(radius))
M = cv2.moments(c)
center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
cv2.circle(frame, (int(x), int(y)), int(radius), (0, 255, 255), 2)
cv2.circle(frame, center, 5, (0, 0, 255), -1)
#add to points list
points.append((x,y))
#calculate new Vector if all four points visible
if len(points) == 4:
points.sort(key = itemgetter(1))
down = sorted(points[-2:], key = itemgetter(0))
up = sorted(points[:2], key = itemgetter(0))
up_left = up[0]
up_right = up[1]
down_left = down[0]
down_right = down[1]
cv2.putText(frame, 'up_left', (int(up_left[0]),int(up_left[1])), fontface, fontscale, fontcolor)
cv2.putText(frame, 'up_right', (int(up_right[0]),int(up_right[1])), fontface, fontscale, fontcolor)
cv2.putText(frame, 'down_left', (int(down_left[0]),int(down_left[1])), fontface, fontscale, fontcolor)
cv2.putText(frame, 'down_right', (int(down_right[0]),int(down_right[1])), fontface, fontscale, fontcolor)
t_meas = angle(up_left, up_right)
b_meas = angle(down_left, down_right)
l_meas = angle(up_left, down_left)
r_meas = angle(up_right, down_right)
t_meas_past.append(t_meas)
l_t = ACT_SIZE/math.sin(t_meas*0.01745)
l_b = ACT_SIZE/math.sin(b_meas*0.01745)
l_l = ACT_SIZE/math.sin(l_meas*0.01745)
l_r = ACT_SIZE/math.sin(r_meas*0.01745)
l = (l_t+l_b+l_r+l_l)/4
phi_ud = math.atan((l_t - l_b)/ACT_SIZE)
phi_lr = math.atan((l_l - l_r)/ACT_SIZE)
output = 'dist: ' + str(int(l)) + ' phi_ud: ' + "{:5.1f}".format(phi_ud * 57.296) + ' phi_lr: ' + "{:5.1f}".format(phi_lr * 57.296)
cv2.putText(frame, output, (0,20), fontface, fontscale, fontcolor)
# t_meas_kalman.append( #???????????????????????????????????)
# show the frame to our screen
cv2.imshow("Frame", frame)
#print(points)
# cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the 'q' key is pressed, stop the loop
if key == ord("q"):
break
## # update the points queue
## pts.appendleft(center)
## # loop over the set of tracked points
## for i in range(1, len(pts)):
## # if either of the tracked points are None, ignore
## # them
## if pts[i - 1] is None or pts[i] is None:
## continue
##
## # otherwise, compute the thickness of the line and
## # draw the connecting lines
## thickness = int(np.sqrt(args["buffer"] / float(i + 1)) * 2.5)
## cv2.line(frame, pts[i - 1], pts[i], (0, 0, 255), thickness)
# show the frame to our screen
cv2.imshow("Frame", frame)
#cv2.imshow("Blurred", blurred)
#cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the 'q' key is pressed, stop the loop
if key == ord("q"):
break
# if we are not using a video file, stop the camera video stream
if not args.get("video", False):
vs.stop()
vs.stop()
# otherwise, release the camera
else:
vs.release()
vs.release()
# close all windows
plt.plot(t_meas_past)
plt.ylabel('t_measured')
plt.show()
time.sleep(0.1)
while True:
if key == ord("q"):
break
cv2.destroyAllWindows()
'''
==============================================
Online State Estimation with the Kalman Filter
==============================================
This example shows how :class:`KalmanFilter` can be used to estimate hidden
states in an online setting.
While the Kalman Smoother is able to estimate the hidden state of a target at
any time step using *all* measurements, the Kalman Filter only uses
measurements up to and including the current time step. This is done using a
set of recursive formulae that only require the mean and covariance matrix
output by the Kalman Filter at the previous time step, meaning that we may
apply the Kalman Filter in an online manner.
The drawn figure shows two sets of lines; the first represents the true, hidden
state of the target, while the second represents the estimates output by the
Kalman Filter.
'''
import numpy as np
import pylab as pl
from pykalman.datasets import load_robot
from pykalman import KalmanFilter
# Initialize the Kalman Filter
data = load_robot()
kf = KalmanFilter(
data.transition_matrix,
data.observation_matrix,
data.initial_transition_covariance,
data.initial_observation_covariance,
data.transition_offsets,
data.observation_offset,
data.initial_state_mean,
data.initial_state_covariance,
random_state=0
)
# Estimate mean and covariance of hidden state distribution iteratively. This
# is equivalent to
#
# >>> (filter_state_means, filtered_state_covariance) = kf.filter(data)
n_timesteps = data.observations.shape[0]
n_dim_state = data.transition_matrix.shape[0]
filtered_state_means = np.zeros((n_timesteps, n_dim_state))
filtered_state_covariances = np.zeros((n_timesteps, n_dim_state, n_dim_state))
for t in range(n_timesteps - 1):
if t == 0:
filtered_state_means[t] = data.initial_state_mean
filtered_state_covariances[t] = data.initial_state_covariance
filtered_state_means[t + 1], filtered_state_covariances[t + 1] = (
kf.filter_update(
filtered_state_means[t],
filtered_state_covariances[t],
data.observations[t + 1],
transition_offset=data.transition_offsets[t],
)
)
# draw estimates
pl.figure()
lines_true = pl.plot(data.states, color='b')
lines_filt = pl.plot(filtered_state_means, color='r')
pl.legend((lines_true[0], lines_filt[0]), ('true', 'filtered'))
pl.show()
\ No newline at end of file
'''
=============================
EM for Linear-Gaussian Models
=============================
This example shows how one may use the EM algorithm to estimate model
parameters with a Kalman Filter.
The EM algorithm is a meta-algorithm for learning parameters in probabilistic
models. The algorithm works by first fixing the parameters and finding a closed
form distribution over the unobserved variables, then finds new parameters that
maximize the expected likelihood of the observed variables (where the
expectation is taken over the unobserved ones). Due to convexity arguments, we
are guaranteed that each iteration of the algorithm will increase the
likelihood of the observed data and that it will eventually reach a local
optimum.
The EM algorithm is applied to the Linear-Gaussian system (that is, the model
assumed by the Kalman Filter) by first using the Kalman Smoother to calculate
the distribution over all unobserved variables (in this case, the hidden target
states), then closed-form update equations are used to update the model
parameters.
The first figure plotted contains 4 sets of lines. The first, labeled `true`,
represents the true, unobserved state of the system. The second, labeled
`blind`, represents the predicted state of the system if no measurements are
incorporated. The third, labeled `filtered`, are the state estimates given
measurements up to and including the current time step. Finally, the fourth,
labeled `smoothed`, are the state estimates using all observations for all time
steps. The latter three estimates use parameters learned via 10 iterations of
the EM algorithm.
The second figure contains a single line representing the likelihood of the
observed data as a function of the EM Algorithm iteration.
'''
import numpy as np
import pylab as pl
from pykalman.datasets import load_robot
from pykalman import KalmanFilter
# Load data and initialize Kalman Filter
data = load_robot()
kf = KalmanFilter(
data.transition_matrix,
data.observation_matrix,
data.initial_transition_covariance,
data.initial_observation_covariance,
data.transition_offsets,
data.observation_offset,
data.initial_state_mean,
data.initial_state_covariance,
em_vars=[
'transition_matrices', 'observation_matrices',
'transition_covariance', 'observation_covariance',
'observation_offsets', 'initial_state_mean',
'initial_state_covariance'
]
)
# Learn good values for parameters named in `em_vars` using the EM algorithm
loglikelihoods = np.zeros(10)
for i in range(len(loglikelihoods)):
kf = kf.em(X=data.observations, n_iter=1)
loglikelihoods[i] = kf.loglikelihood(data.observations)
# Estimate the state without using any observations. This will let us see how
# good we could do if we ran blind.
n_dim_state = data.transition_matrix.shape[0]
n_timesteps = data.observations.shape[0]
blind_state_estimates = np.zeros((n_timesteps, n_dim_state))
for t in range(n_timesteps - 1):
if t == 0:
blind_state_estimates[t] = kf.initial_state_mean
blind_state_estimates[t + 1] = (
np.dot(kf.transition_matrices, blind_state_estimates[t])
+ kf.transition_offsets[t]
)
# Estimate the hidden states using observations up to and including
# time t for t in [0...n_timesteps-1]. This method outputs the mean and
# covariance characterizing the Multivariate Normal distribution for
# P(x_t | z_{1:t})
filtered_state_estimates = kf.filter(data.observations)[0]
# Estimate the hidden states using all observations. These estimates
# will be 'smoother' (and are to be preferred) to those produced by
# simply filtering as they are made with later observations in mind.
# Probabilistically, this method produces the mean and covariance
# characterizing,
# P(x_t | z_{1:n_timesteps})
smoothed_state_estimates = kf.smooth(data.observations)[0]
# Draw the true, blind,e filtered, and smoothed state estimates for all 5
# dimensions.
pl.figure(figsize=(16, 6))
lines_true = pl.plot(data.states, linestyle='-', color='b')
lines_blind = pl.plot(blind_state_estimates, linestyle=':', color='m')
lines_filt = pl.plot(filtered_state_estimates, linestyle='--', color='g')
lines_smooth = pl.plot(smoothed_state_estimates, linestyle='-.', color='r')
pl.legend(
(lines_true[0], lines_blind[0], lines_filt[0], lines_smooth[0]),
('true', 'blind', 'filtered', 'smoothed')
)
pl.xlabel('time')
pl.ylabel('state')
pl.xlim(xmax=500)
# Draw log likelihood of observations as a function of EM iteration number.
# Notice how it is increasing (this is guaranteed by the EM algorithm)
pl.figure()
pl.plot(loglikelihoods)
pl.xlabel('em iteration number')
pl.ylabel('log likelihood')
pl.show()
'''
====================================================
Applying the Kalman Filter with Missing Observations
====================================================
This example shows how one may apply :class:`KalmanFilter` when some
measurements are missing.
While the Kalman Filter and Kalman Smoother are typically presented assuming a
measurement exists for every time step, this is not always the case in reality.
:class:`KalmanFilter` is implemented to recognize masked portions of numpy
arrays as missing measurements.
The figure drawn illustrates the trajectory of each dimension of the true
state, the estimated state using all measurements, and the estimated state
using every fifth measurement.
'''
import numpy as np
import pylab as pl
from pykalman import KalmanFilter
# specify parameters
random_state = np.random.RandomState(0)
transition_matrix = [[1, 0.1], [0, 1]]
transition_offset = [-0.1, 0.1]
observation_matrix = np.eye(2) + random_state.randn(2, 2) * 0.1
observation_offset = [1.0, -1.0]
initial_state_mean = [5, -5]
n_timesteps = 50
# sample from model
kf = KalmanFilter(
transition_matrices=transition_matrix,
observation_matrices=observation_matrix,
transition_offsets=transition_offset,
observation_offsets=observation_offset,
initial_state_mean=initial_state_mean,
random_state=0
)
states, observations_all = kf.sample(
n_timesteps, initial_state=initial_state_mean
)
# label half of the observations as missing
observations_missing = np.ma.array(
observations_all,
mask=np.zeros(observations_all.shape)
)
for t in range(n_timesteps):
if t % 5 != 0:
observations_missing[t] = np.ma.masked
# estimate state with filtering and smoothing
smoothed_states_all = kf.smooth(observations_all)[0]
smoothed_states_missing = kf.smooth(observations_missing)[0]