To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 0fffaddc authored by stehess's avatar stehess
Browse files

Initial commit

parents
# Data files and directories common in repo root
*.avi
__pycache__/
# # Jupyter Notebook
*.ipynb_checkpoints
# # VS Code
.vscode
# Weights
*.h5
Evaluation/mapped/hamilton_FLSD/*
Evaluation/mapped/hamilton_SLSD/*
Evaluation/mapped/wanda_FLSD/*
Evaluation/mapped/wanda_SLSD/*
Evaluation/warped/hamilton_FLSD/*
Evaluation/warped/hamilton_SLSD/*
Evaluation/warped/wanda_FLSD/*
Evaluation/warped/wanda_SLSD/*
Evaluation/postprocessing/hamilton_FLSD/*
Evaluation/postprocessing/hamilton_SLSD/*
Evaluation/postprocessing/wanda_FLSD/*
Evaluation/postprocessing/wanda_SLSD/*
Evaluation/paper
logs/
text_files/eval_v1
text_files/eval_v2
text_files/eval_v3
text_files/eval_v4
text_files/eval_v5
text_files/recovery_eval
# # dataset for cnn
# /infusion_cnn/dataset/train/*.JPG
# /infusion_cnn/dataset/val/*.JPG
device/dataset/
# # logs
# /logs/*
\ No newline at end of file
import cv2
import numpy as np
from scipy.spatial import distance as dist
from math import *
from Helper.helper import get_ratios
def check_shape(pts_to_check, R):
# Check if the shape of the point is within range -+20 %
r1, r2 = get_ratios(pts_to_check)
if 0.8*R < r1 < 1.2*R and 0.8*R < r2 < 1.2*R:
return True, r1,r2
else:
return False, r1,r2
def check_intersection_location(intersections, pts_for_process, frame, EPSILON):
# Check if the intersections are within the masked area
# Create circle around old point with radius epsilon and check if within the circle
radius = sqrt(2)*EPSILON
fc = frame.copy()
indices = []
for k,(pt_int, pt_old) in enumerate(zip(intersections, pts_for_process)):
center_x = pt_old[0]
center_y = pt_old[1]
x = pt_int[0]
y = pt_int[1]
cv2.circle(fc,(center_x, center_y), int(radius), (255,255,255), 5)
is_inside = (x - center_x)**2 + (y - center_y)**2 <= radius**2
if not is_inside:
indices.append(k)
return indices, fc
def check_for_outliers(xmvmts, ymvmts):
# Check if any movement along x or y is an outlier using quartiles
def outliers_iqr(ys):
# get upper and lower quartile
quartile_1, quartile_3 = np.percentile(ys, [25, 75])
# get inter quartile range
iqr = quartile_3 - quartile_1
# extend range
lower_bound = quartile_1 - (iqr * 1.5)
upper_bound = quartile_3 + (iqr * 1.5)
# check if within range
idx = np.where((ys > upper_bound) | (ys < lower_bound))[0]
return idx
outliers = []
outliers.append(outliers_iqr(xmvmts))
outliers.append(outliers_iqr(ymvmts))
idxx = None
idxy = None
# Check for outliers in x and y
# If same outlier index return one index otherwise return two indices
if len(outliers[0]) > 0:
idxx = outliers[0][0]
if len(outliers[1]) > 0:
idxy = outliers[1][0]
if idxx is not None and idxy is not None:
return np.unique(np.array([idxx,idxy]))
elif idxx is None and idxy is not None:
return np.array([idxy])
elif idxy is None and idxx is not None:
return np.array([idxx])
elif idxx is None and idxy is None:
return np.array([])
def check_for_huge_mvmt(xmvmts,ymvmts):
# Check if any point moved more than 50px
return any(abs(mov) > 50 for mov in xmvmts) or any(abs(mov) > 50 for mov in ymvmts)
def check_all_sides_have_lines(lines_array):
# Check if any side has no lines
return all(len(l) > 0 for l in lines_array)
def check_error(err):
# Check if any optical flow error is above threshold (8)
return any(e[0] > 8 for e in err)
def monitor_mvmt(new_pts, SUB_ROI):
# Get the movement of all the points compared to previous frame
x_mvmts = []
y_mvmts = []
mvmts = []
for ptN, ptS in zip(new_pts, SUB_ROI):
oa = np.array(ptN)
os = np.array(ptS)
mov = oa-os
mvmts.append(mov)
x_mvmts.append(mov[0])
y_mvmts.append(mov[1])
return x_mvmts, y_mvmts, mvmts
from scipy.spatial import distance as dist
import cv2
import os
import numpy as np
def chunks(l, n):
# For item i in a range that is a length of l,
for i in range(0, len(l), n):
# Create an index range for l of n items:
yield l[i:i+n]
def sortpts_clockwise(A,idx=None):
if idx is None:
# Sort A based on Y(col-2) coordinates
sortedAc2 = A[np.argsort(A[:,1]),:]
# Get top two and bottom two points
top2 = sortedAc2[0:2,:]
bottom2 = sortedAc2[2:,:]
# Sort top2 points to have the first row as the top-left one
sortedtop2c1 = top2[np.argsort(top2[:,0]),:]
top_left = sortedtop2c1[0,:]
# Use top left point as pivot & calculate sq-euclidean dist against
# bottom2 points & thus get bottom-right, bottom-left sequentially
sqdists = dist.cdist(top_left[None], bottom2, 'sqeuclidean')
rest2 = bottom2[np.argsort(np.max(sqdists,0))[::-1],:]
# Concatenate all these points for the final output
return np.concatenate((sortedtop2c1,rest2),axis =0)
elif idx == 0:
# Sort A based on X(col-2) coordinates
sortedAc1 = A[np.argsort(A[:,0]),:]
# most left is bottom left point
bl = sortedAc1[0]
# others are on right side
right=sortedAc1[1:]
# sort by Y coordinate
sortedAc2 = right[np.argsort(right[:,1]),:]
tr = sortedAc2[0]
br = sortedAc2[1]
return np.array([tr,br,bl],dtype='int32')
elif idx == 1:
# Sort A based on X(col-2) coordinates
sortedAc1 = A[np.argsort(A[:,0]),:]
# most right is bottom right point
br = sortedAc1[-1]
# others are on right side
left=sortedAc1[0:2]
# sort by Y coordinate
sortedAc2 = left[np.argsort(left[:,1]),:]
tl = sortedAc2[0]
bl = sortedAc2[1]
return np.array([tl,br,bl],dtype='int32')
elif idx == 2:
# Sort A based on X(col-2) coordinates
sortedAc1 = A[np.argsort(A[:,0]),:]
# most right is bottom right point
tr = sortedAc1[-1]
# others are on right side
left=sortedAc1[0:2]
# sort by Y coordinate
sortedAc2 = left[np.argsort(left[:,1]),:]
tl = sortedAc2[0]
bl = sortedAc2[1]
return np.array([tl,tr,bl],dtype='int32')
elif idx == 3:
# Sort A based on X(col-2) coordinates
sortedAc1 = A[np.argsort(A[:,0]),:]
# most right is bottom right point
tl = sortedAc1[0]
# others are on right side
right=sortedAc1[1:]
# sort by Y coordinate
sortedAc2 = right[np.argsort(right[:,1]),:]
tr = sortedAc2[0]
br = sortedAc2[1]
return np.array([tl,tr,br],dtype='int32')
# Print iterations progress
def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█'):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print('\r%s |%s| %s%% %s' % (prefix, bar, percent, suffix), end = '\r')
# Print New Line on Complete
if iteration == total:
print()
def get_layout(device, ROOT_DIR):
return cv2.imread(os.path.join(ROOT_DIR, 'assets/%s/layout/layout.png' %device))
def get_ratios(pts):
h1 = dist.euclidean(pts[1], pts[2])
h2 = dist.euclidean(pts[3], pts[0])
w1 = dist.euclidean(pts[0], pts[1])
w2 = dist.euclidean(pts[2], pts[3])
return h1/w1, h2/w2
import logging, coloredlogs
def init_logger(fileName):
logger = logging.getLogger()
coloredlogs.install(level=logging.INFO,
fmt='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%Y-%m-%d %H:%I:%S')
# create file handler which logs even debug messages
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler('./logs/LSD/%s.log' %fileName)
fh.setFormatter(formatter)
logger.addHandler(fh)
\ No newline at end of file
import cv2
import numpy as np
from Helper.helper import sortpts_clockwise, chunks
import PIL
from collections import defaultdict
import json
import csv
import os
import pandas
import re
from math import *
from scipy.spatial import distance as dist
def get_gaze_pts(fileName):
# get gaze points
df_gaze_pts = pandas.read_csv('%s' %fileName, header=None,sep=';',names=['frame','pts'],low_memory=False)
gaze = []
for f,pt in zip(df_gaze_pts['frame'],df_gaze_pts['pts']):
p = {
'frame': int(f),
'pts': []
}
li = re.findall('\d+', pt)
tmp = list(chunks(li, 2))
for x in tmp:
p['pts'].append([int(x[0]),int(x[1])])
gaze.append(p)
return gaze
def get_mask(mask_coords, edged, frame, inverse=False):
# Get the mask
roi = np.array(mask_coords,dtype=np.int32)
mask = np.zeros(edged.shape, dtype="uint8")
channel_count = frame.shape[2]
ignore_mask_color = (255,)*channel_count
cv2.fillPoly(mask, np.array([roi],dtype=np.int32), ignore_mask_color)
# if inner mask reverse the mask
if inverse:
mask = 255-mask
res = cv2.bitwise_and(edged, mask)
return res
def get_search_area(pts_for_process, frame, is_cnn=False, EPSILON=10, EPSILON_W=15):
# Build the search area based on EPSILONS
fc = frame.copy()
rect = cv2.minAreaRect(np.array(pts_for_process,dtype='int32'))
box = cv2.boxPoints(rect)
box = np.int0(box)
cv2.drawContours(fc,[np.array(pts_for_process,dtype='int32')],0,(0,0,255),2)
for pt in pts_for_process:
cv2.circle(fc,(int(pt[0]), int(pt[1])), 5, 255, -1)
if is_cnn:
sorted_coords = sortpts_clockwise(np.array(box, dtype='int32'))
else:
sorted_coords = sortpts_clockwise(np.array(pts_for_process, dtype='int32'))
a = (sorted_coords[0][0]-EPSILON_W,sorted_coords[0][1]-EPSILON)
b = (sorted_coords[1][0]+EPSILON_W,sorted_coords[1][1]-EPSILON)
c = (sorted_coords[2][0]+EPSILON_W,sorted_coords[2][1]+EPSILON)
d = (sorted_coords[3][0]-EPSILON_W,sorted_coords[3][1]+EPSILON)
e = (sorted_coords[0][0]+EPSILON_W,sorted_coords[0][1]+EPSILON)
f = (sorted_coords[1][0]-EPSILON_W,sorted_coords[1][1]+EPSILON)
g = (sorted_coords[2][0]-EPSILON_W,sorted_coords[2][1]-EPSILON)
h = (sorted_coords[3][0]+EPSILON_W,sorted_coords[3][1]-EPSILON)
mask_array = [a,b,c,d,e,f,g,h]
fc2 = fc.copy()
for pt in mask_array:
cv2.circle(fc2,pt, 5, 255, -1)
return mask_array[0:4], mask_array[4:8], fc, fc2
def get_Canny(img, backlight):
# Canny edge detector with different values based on backlight flag
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
if backlight:
edges = cv2.Canny(gray, 580,600)
else:
edges = cv2.Canny(gray, 10,100)
return edges
def get_enhanced_image(img):
# Enhance image if backlight flag is set
# Higher brightness, contrast and sharpness
image = PIL.Image.fromarray(img.astype('uint8'), 'RGB')
brightness = PIL.ImageEnhance.Brightness(image).enhance(5.0)
contrast = PIL.ImageEnhance.Contrast(brightness).enhance(35.0)
sharpness = PIL.ImageEnhance.Sharpness(contrast).enhance(10.0)
bw = PIL.ImageEnhance.Color(sharpness).enhance(0.0)
pixel = np.array(bw)
return pixel
def get_avg_good_mvmts(xmvmts,ymvmts, idx):
# get the average x and y movement of the good points (not outliers)
good_xmvmts = [x for i,x in enumerate(xmvmts) if i!=idx[0]]
good_ymvmts = [x for i,x in enumerate(ymvmts) if i!=idx[0]]
avg_x = np.average(good_xmvmts)
avg_y = np.average(good_ymvmts)
avg_mov = np.array((avg_x,avg_y))
return avg_mov
def get_vectors(lines):
# Vectorize the lines
def get_vector(line):
return np.array((line[0][2],line[0][3]))-np.array((line[0][0],line[0][1]))
vectors = []
for line in lines[0]:
vectors.append(get_vector(line))
return vectors
def get_angles(vectors):
# Get the angles compared to horizontal axis
def unit_vector(vector):
""" Returns the unit vector of the vector. """
return vector / np.linalg.norm(vector)
def angle_between(v1, v2):
""" Returns the angle in radians between vectors 'v1' and 'v2'::
"""
v1_u = unit_vector(v1)
v2_u = unit_vector(v2)
# return angle in degrees
return (np.arccos(np.clip(np.dot(v2_u, v1_u), -1.0, 1.0)) * 180 / pi)
angles = []
# Reference vector
v0 = np.array((10,0))
for v in vectors:
angle = angle_between(v0,v)
if 180-angle < 50:
angles.append(180-angle)
else:
angles.append(angle)
return angles
def get_segmented_by_angle_kmeans(angles, lines, k=2):
# Define criteria = ( type, max_iter = 10 , epsilon = 1.0 )
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
# Set flags (Just to avoid line break in the code)
flags = cv2.KMEANS_RANDOM_CENTERS
# Apply KMeans
__,labels,centers = cv2.kmeans(np.array(angles,dtype='float32'),k,None,criteria,10,flags)
labels = labels.reshape(-1) # transpose to row vec
# Check if centers are clearly apart (more then 70°) otherwise no segmentation and return None
if dist.euclidean(centers[0], centers[1]) < 70:
return None
else:
# segment lines based on their kmeans label
segmented = defaultdict(list)
for i, line in zip(range(len(lines[0])), lines[0]):
segmented[labels[i]].append(line)
segmented = list(segmented.values())
return segmented
def get_center_pts(segmented):
def get_center_pt(line):
x1 = line[0][0]
y1 = line[0][1]
x2 = line[0][2]
y2 = line[0][3]
meanx = int((x1 + x2)/2)
meany = int((y1 + y2)/2)
return (meanx,meany)
center_pts_1 = []
center_pts_2 = []
for l, sub in enumerate(segmented):
for line in sub:
if l == 0:
center_pts_1.append(get_center_pt(line))
else:
center_pts_2.append(get_center_pt(line))
center_pts = [center_pts_1,center_pts_2]
return center_pts
def get_segmented_by_coordinates_kmeans(center_pts, segmented, k=2):
try:
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
line_seg = []
for i, pts in enumerate(center_pts):
__,label,__=cv2.kmeans(np.array(pts,dtype='float32'),2,None,criteria,10,cv2.KMEANS_RANDOM_CENTERS)
label = label.reshape(-1) # transpose to row vec
# segment lines based on their kmeans label
seg = defaultdict(list)
for i, pt in zip(range(len(segmented[i])), segmented[i]):
seg[label[i]].append(pt)
seg = list(seg.values())
line_seg.append(seg)
return line_seg
except:
return None
def get_sorted_lines(line_seg, frame):
fc = frame.copy()
top_lines = []
right_lines = []
bottom_lines = []
left_lines = []
i = 0
for sub in line_seg:
i += 1
for ssub in sub:
i += 1
for line in ssub:
if i == 2:
color = (255,0,0)
left_lines.append(line)
elif i == 3:
color = (0,255,0)
right_lines.append(line)
elif i == 5:
color = (0,0,255)
bottom_lines.append(line)
elif i == 6:
color = (0,255,255)
top_lines.append(line)
cv2.line(fc,(int(line[0][0]),int(line[0][1])),(int(line[0][2]),int(line[0][3])),color,3)
return top_lines, right_lines, bottom_lines, left_lines, fc
def get_main_line(lines):
def get_length(line):
return dist.euclidean((line[0][0],line[0][1]),(line[0][2],line[0][3]))
lengths = []
for line in lines:
lengths.append(get_length(line))
max_l = max(lengths)
max_idx = lengths.index(max(lengths))
norm_lengths = []
for l in lengths:
norm_lengths.append(l/max_l)
return lines[max_idx][0], lengths, norm_lengths
def get_intersections(main_lines):
def line_intersection(line1, line2):
xdiff = (line1[0] - line1[2], line2[0] - line2[2])
ydiff = (line1[1] - line1[3], line2[1] - line2[3])
def det(a, b):
return a[0] * b[1] - a[1] * b[0]
div = det(xdiff, ydiff)
if div == 0:
return (0,0)
else:
d = (det((line1[0],line1[1]),(line1[2],line1[3])), det((line2[0],line2[1]),(line2[2],line2[3])))
x = det(d, xdiff) / div
y = det(d, ydiff) / div
return int(x), int(y)
intersections = []
intersections.append(line_intersection(main_lines[0], main_lines[1]))
intersections.append(line_intersection(main_lines[1], main_lines[2]))
intersections.append(line_intersection(main_lines[2], main_lines[3]))
intersections.append(line_intersection(main_lines[3], main_lines[0]))
return intersections
def get_w_avg_line(lines, normed_l):
X_start = []
X_end = []
Y_start = []
Y_end = []
for line in lines:
X_start.append(line[0][0])
X_end.append(line[0][2])
Y_start.append(line[0][1])
Y_end.append(line[0][3])
weighted_avg_x_start= int(np.average(X_start,weights=normed_l))
weighted_avg_x_end= int(np.average(X_end,weights=normed_l))
weighted_avg_y_start= int(np.average(Y_start,weights=normed_l))
weighted_avg_y_end= int(np.average(Y_end,weights=normed_l))
w_avg_line = [weighted_avg_x_start,weighted_avg_y_start,weighted_avg_x_end,weighted_avg_y_end]
return w_avg_line
def get_major_lines(lines_array, lengths, normed_lengths):
GOOD_LINES = []
GOOD_NORMS = []
for i,(sub_l,sub_len, sub_n) in enumerate(zip(lines_array, lengths, normed_lengths)):
good_lines = []
good_norms = []
for line,le, norm in zip(sub_l,sub_len, sub_n):
if norm > 0.5:
good_lines.append(line)
good_norms.append(norm)
GOOD_LINES.append(good_lines)
GOOD_NORMS.append(good_norms)
return GOOD_LINES, GOOD_NORMS
def get_weighted_avg_lines(lines_array, lengths, normed_lengths):
good_lines, good_norms = get_major_lines(lines_array, lengths, normed_lengths)
w_avg_lines = []
for line_arr, normed_l in zip(good_lines, good_norms):
w_avg_lines.append(get_w_avg_line(line_arr, normed_l))
return w_avg_lines
def get_main_lines(lines_array):
main_lines = []
normed_lengths = []
lengths = []
for line_arr in lines_array:
main_line, leng, normed_l = get_main_line(line_arr)
main_lines.append(main_line)
normed_lengths.append(normed_l)
lengths.append(leng)
return main_lines, lengths, normed_lengths
def get_lines(image):