Commit 6858f239 authored by Alexander Dietmüller's avatar Alexander Dietmüller

Backend: Add validation for time slots, update tests and README

New validation:

- No timeslot overlap
- user do not only require to use their own nethz, they can also only use this
  nethz if they are members

Additionally tests and README updates.
parent f6818b55
......@@ -31,6 +31,9 @@ wheels/
*.manifest
*.spec
# pytest
.pytest_cache/
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
......
......@@ -54,27 +54,19 @@ def request_cache(key):
@request_cache('apiuser')
def get_user():
"""Return user id if the token is valid, None otherwise."""
"""Return user data if the token is valid, None otherwise."""
if g.get('token') is not None:
response = api_get(
'sessions',
where={'token': g.get('token', '')},
projection={'user': 1}
projection={'user': 1},
embedded={'user': 1}
)
if response:
return response['_items'][0]['user']
return None
@request_cache('nethz')
def get_nethz():
"""Return nethz of current user."""
if get_user() is not None:
response = api_get('users/' + get_user())
return response.get('nethz')
return None
@request_cache('admin')
def is_admin():
"""Return True if user is in the 'PVK Admins' Group, False otherwise.
......@@ -82,8 +74,8 @@ def is_admin():
The result is saved in g, to avoid checking twice, so there is no
performance loss if is_admin is called multiple times during a request.
"""
user_id = get_user()
if user_id is not None:
user = get_user()
if user is not None:
# Find Group with correct name, return list of members
groups = api_get(
'groups',
......@@ -95,7 +87,7 @@ def is_admin():
membership = api_get(
'groupmemberships',
where={'user': user_id, 'group': group_id},
where={'user': user['_id'], 'group': group_id},
# This Projection currectly crashes AMIVAPI
# https://github.com/amiv-eth/amivapi/issues/206
# projection={'_id': 1}
......@@ -148,4 +140,5 @@ def only_own_nethz(_, lookup):
if not is_admin():
# Add the additional lookup with an `$and` condition
# or extend existing `$and`s
lookup.setdefault('$and', []).append({'nethz': get_nethz()})
lookup.setdefault('$and', []).append({
'nethz': get_user().get('nethz')})
......@@ -52,6 +52,13 @@ DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BANDWIDTH_SAVER = False
# Disable bulk insert
# They are not compatible with uniqueness constraints
# (see more here: https://github.com/pyeve/eve/issues/691)
# In our case, we need time and room uniqueness nearly everywhere => disabled
BULK_ENABLED = False
# A schema for required start/end time tuple
TIMESPAN_SCHEMA = {
'type': 'dict',
......@@ -67,6 +74,7 @@ TIMESPAN_SCHEMA = {
'required': True,
},
},
'start_before_end': True,
}
......@@ -110,7 +118,7 @@ DOMAIN = {
'empty': False,
'nullable': False,
},
# TODO: Not the same nethz twice
# TODO: Not the same nethz twice (use new nocopies validator)
# TODO: Is nethz enough here?
}
},
......@@ -129,10 +137,12 @@ DOMAIN = {
'embeddable': True
},
'not_patchable': True, # Course is tied to lecture
'required': True,
},
'assistant': {
'type': 'string',
# TODO: Assistant needs to exist for lecture
# TODO: assistant timeslot
},
'signup': TIMESPAN_SCHEMA,
......@@ -140,22 +150,21 @@ DOMAIN = {
'datetimes': {
'type': 'list',
'schema': TIMESPAN_SCHEMA,
# TODO: Timeslots must not overlap
'no_time_overlap': True,
'unique_room_booking': True,
},
'room': {
'type': 'string',
'maxlength': 100,
'unique': True,
'required': True,
'nullable': False,
'empty': False,
# TODO: Room must be empty for time slot
'unique_room_booking': True,
},
'spots': {
'type': 'integer',
'min': 1,
'required': True,
'nullable': False
'nullable': False,
}
},
},
......@@ -184,7 +193,7 @@ DOMAIN = {
},
'unique_combination': ['nethz'],
'required': True,
# TODO: No overlapping courses
'no_course_overlap': 'signups',
},
'status': {
'type': 'string',
......@@ -220,7 +229,7 @@ DOMAIN = {
},
'unique_combination': ['nethz'],
'required': True,
# TODO: No overlapping courses
'no_course_overlap': 'selections',
},
},
},
......@@ -235,9 +244,6 @@ DOMAIN = {
# Also, there is no reason to ever change a payment.
'user_methods': ['GET', 'POST'],
# Bulk inserts don't make sense here, so we disallow them
'bulk_enabled': False,
'schema': {
'signups': {
'type': 'list',
......@@ -248,15 +254,17 @@ DOMAIN = {
'field': '_id',
'embeddable': True
},
'no_waiting': True, # No signups on waiting list
'no_accepted': True, # No signups which have already been paid
# No signups on waiting list
'no_waiting': True,
# No signups which have already been paid
'no_accepted': True,
},
'no_copies': True,
'required': True,
'nullable': False,
},
# Admins may leave this field empty
# However, it still has to be sent, even if it contains an empty string / None
# However, it still has to be sent, even if empty or None
'token': {
'type': 'string',
'unique': True,
......@@ -264,13 +272,13 @@ DOMAIN = {
'nullable': True,
'only_admin_empty': True,
},
'charge_id': { # Set by backend
'charge_id': { # Set by payment backend
'type': 'string',
'unique': True,
'required': False,
'nullable': True,
},
'amount': { # Set by backend
'amount': { # Set by payment backend
'type': 'integer',
'required': False,
'nullable': True,
......
......@@ -9,21 +9,43 @@ TODO: Several validation rules still need to be implemented. See `settings.py`.
"""
from itertools import combinations, chain
from flask import request, current_app
from eve.io.mongo import Validator
from backend.security import is_admin, get_nethz
from backend.security import is_admin, get_user
class APIValidator(Validator):
"""Provide a rule to check nethz of current user."""
def _get_field(self, field):
"""Get other field. Check original document as well if PATCH."""
if request.method != 'PATCH':
return self.document.get(field)
# PATCH: field is only in self.document if modified
return self.document.get(field) or self._original_document.get(field)
# nethz validation
def _validate_only_own_nethz(self, enabled, field, value):
"""If the user is no admin, only own nethz is allowed for singup."""
"""If the user is no admin, only own nethz is allowed for singup.
Furthermore, the user must be a member to use his nethz.
Only admins can sign up someone else.
"""
print(get_user())
if enabled and not is_admin():
if value != get_nethz():
if value != get_user().get('nethz'):
self._error(field,
"You can only use your own nethz to sign up.")
"you can only use your own nethz to sign up")
# If the nethz matches, we have to check if the user is a member
elif get_user().get('membership') == 'none':
self._error(field,
"only members can sign up")
# Various helpers
def _validate_unique_combination(self, unique_combination, field, value):
"""Validate that a combination of fields is unique.
......@@ -33,13 +55,7 @@ class APIValidator(Validator):
"""
lookup = {field: value} # self
for other_field in unique_combination:
lookup[other_field] = self.document.get(other_field)
if request.method == 'PATCH':
original = self._original_document
for key in unique_combination:
if key not in self.document.keys():
lookup[key] = original[key]
lookup[other_field] = self._get_field(other_field)
resource = self.resource
if current_app.data.find_one(resource, None, **lookup) is not None:
......@@ -72,6 +88,83 @@ class APIValidator(Validator):
"which have already been paid")
def _validate_no_copies(self, no_copies, field, value):
"""Ensure that each signup only appears once per payment."""
"""Ensure that each item only appears once in the list."""
if no_copies and len(set(value)) != len(value):
self._error(field, "this field may not contain duplicate signups")
# Time related
def _validate_start_before_end(self, enabled, field, value):
"""Ensure that the start time is before the end time."""
if enabled and(value.get('start') > value.get('end')):
self._error(field, 'start time must be earlier then end time.')
def _validate_unique_room_booking(self, enabled, field, _):
"""A room can not be used at the same time by several courses."""
# Get new room and timespans for current course
room = self._get_field('room')
timespans = self._get_field('datetimes')
# Get _id of current course (needed for path) and filter to ignore it
course_id = self._get_field('_id')
id_filter = {'_id': {'$ne': course_id}} if course_id else {}
if enabled and timespans and room:
# Get all other courses with the same room
course_db = current_app.data.driver.db['courses']
courses = course_db.find({'room': room, **id_filter})
# chain all timespan lists into one list of other timespans
other_timespans = chain.from_iterable(course['datetimes']
for course in courses)
if has_overlap(*timespans, *other_timespans):
self._error(field, "the room '%s' is already occupied by "
"another course at the same time")
def _validate_no_time_overlap(self, enabled, field, value):
"""Multiple timeslots of the same course must not overlap."""
# value will be a list of timeslots
if enabled and has_overlap(*value):
self._error(field, "time slots must not overlap")
def _validate_no_course_overlap(self, resource, field, value):
"""Ensure that a user cannot select/sign up for parallel courses."""
nethz = self._get_field('nethz')
# Get timespan of current course
course_db = current_app.data.driver.db['courses']
timespans = course_db.find_one({'_id': value}).get('datetimes')
if nethz and timespans:
# Find ids from all other courses for this nethz
resource_db = current_app.data.driver.db[resource]
items = resource_db.find({'nethz': nethz})
course_ids = [item['course'] for item in items
if item['course'] != value]
# Get all courses and extract time spans
courses = course_db.find({'_id': {'$in': course_ids}})
# chain all timespan lists into one list of other timespans
other_timespans = chain.from_iterable(course['datetimes']
for course in courses)
if has_overlap(*timespans, *other_timespans):
self._error(field, 'this course has a timing conflict with an '
'already chosen course')
def has_overlap(*timeslots):
"""Return True if any two timeslots overlap, False otherwise."""
def has_one_overlap(first, second):
"""Compare two timeslots."""
# All data is UTC, but some has tzinfo, some doesnt => Normalize
s_1 = first['start'].replace(tzinfo=None)
e_1 = first['end'].replace(tzinfo=None)
s_2 = second['start'].replace(tzinfo=None)
e_2 = second['end'].replace(tzinfo=None)
# Overlap if one course is "(not before) and (not after)" the other
return (e_1 > s_2) and (s_1 < e_2)
# Return if *any* combination of elements overlaps
return any(has_one_overlap(first, second)
for (first, second) in combinations(timeslots, 2))
......@@ -99,11 +99,13 @@ def drop_database(application):
@contextmanager
def user(self, **kwargs):
def user(self, nethz=None, membership=None, **kwargs):
"""Additional context to fake a user."""
with self.test_request_context():
g.apiuser = 'Not None :)'
g.nethz = 'Something'
g.apiuser = {
'nethz': nethz or 'nethz',
'membership': membership or 'regular',
}
g.admin = False
# The test requests will use this header
......
......@@ -19,7 +19,7 @@ So we just put the provided token into g and see if the functions work.
from flask import g
from backend.security import get_user, get_nethz, is_admin
from backend.security import get_user, is_admin
def test_user_found(app, usertoken):
......@@ -27,13 +27,9 @@ def test_user_found(app, usertoken):
with app.app_context():
g.token = usertoken
assert get_user() is not None
def test_nethz_found(app, usertoken):
"""Test if a users nethz can be found with an api token."""
with app.app_context():
g.token = usertoken
assert get_nethz() is not None
# Assert that we get the required user data (nethz and membership)
assert get_user().get('nethz') is not None
assert get_user().get('membership') is not None
def test_user_not_found(app):
......@@ -42,12 +38,6 @@ def test_user_not_found(app):
assert get_user() is None
def test_nethz_not_found(app):
"""Without token, users nethz cannot be found."""
with app.app_context():
assert get_nethz() is None
def test_not_admin(app):
"""Without token, user does no get admin priviledges."""
with app.app_context():
......
......@@ -38,7 +38,7 @@ def base_data(app):
assert_status=201)
signup = {
'nethz': 'Something',
'nethz': 'nethz', # default dummy value for user nethz
'course': response['_id']
}
app.client.post('signups',
......@@ -233,14 +233,3 @@ def test_signup_unique_per_payment(app):
app.client.post('payments',
data=payment,
assert_status=422)
def test_no_batch_payments(app):
"""Test that batch payments are disabled (Eve sends 400 in this case)."""
with app.admin():
batch = [{'signups': [], 'token': 'something'},
{'signups': [], 'token': 'something'}]
app.client.post('payments',
data=batch,
assert_status=400)
"""Tests for basic requests to all resources as admin."""
import pytest
from backend.settings import DOMAIN
def test_create(app):
"""Test creating everything as an admin user."""
......@@ -99,3 +103,14 @@ def test_no_patch(app):
data={'nethz': 'lalala'},
assert_status=422)
assert response["_issues"]["nethz"] == no_patch_error
@pytest.mark.parametrize('resource', DOMAIN.keys())
def test_no_batch_inserts(app, resource):
"""Test that batch payments are disabled (Eve sends 400 in this case)."""
with app.admin():
list_data = [{}, {}]
app.client.post(resource,
data=list_data,
assert_status=400)
......@@ -2,8 +2,9 @@
import pytest
from backend.settings import DOMAIN
ALL_RESOURCES = ['lectures', 'courses', 'signups', 'selections', 'payments']
ALL_RESOURCES = DOMAIN.keys()
ADMIN_RESOURCES = ['lectures', 'courses'] # only admin can write
PERSONAL_RESOURCES = ['signups', 'selections'] # users can only see their own
......@@ -65,56 +66,49 @@ def test_user_cannot_write(app, resource):
assert_status=403)
def test_signup_with_own_nethz_only(app):
"""Users can only post singups with their own nethz."""
@pytest.mark.parametrize('resource', ['signups', 'selections'])
def test_signup_with_own_nethz_only(app, resource):
"""Users can only post signups with their own nethz."""
nethz = 'Something'
with app.user(nethz=nethz):
# Create fake course to sign up to
course = str(app.data.driver.db['courses'].insert({}))
# Try with other nethz
bad_signup = {
bad = {
'nethz': 'Notthenethz',
'course': course
}
app.client.post('/signups',
data=bad_signup,
app.client.post(resource,
data=bad,
assert_status=422)
# Try with own nethz
good_signup = {
good = {
'nethz': nethz,
'course': course
}
app.client.post('/signups',
data=good_signup,
app.client.post(resource,
data=good,
assert_status=201)
def test_selection_own_nethz(app):
"""Users can only select courses for themselves."""
@pytest.mark.parametrize('resource', ['signups', 'selections'])
def test_signup_member_only(app, resource):
"""Only members can sign up."""
nethz = 'Something'
with app.user(nethz=nethz):
# Create fake course to select
with app.user(nethz=nethz, membership='none'):
# Create fake course to sign up to
course = str(app.data.driver.db['courses'].insert({}))
# Try with other nethz
bad_selection = {
'nethz': 'Notthenethz',
'course': course,
}
app.client.post('/selections',
data=bad_selection,
assert_status=422)
# Try with own nethz
good_selection = {
data = {
'nethz': nethz,
'course': course,
'course': course
}
app.client.post('/selections',
data=good_selection,
assert_status=201)
app.client.post(resource,
data=data,
assert_status=422)
@pytest.mark.parametrize('resource', PERSONAL_RESOURCES)
......
......@@ -207,28 +207,6 @@ def test_post_signups_triggers_update(app, course, mock_update):
mock_update.assert_called_with(str(course))
def test_batch_post_signups_triggers_update(app, course, mock_update):
"""Test the the update of spots gets triggered correctly."""
# We need a second course to test everything
other_course = app.data.driver.db['courses'].insert({})
batch = [{
'course': str(course),
'nethz': 'bla'
}, {
'course': str(course),
'nethz': 'bli'
}, {
'course': str(other_course),
'nethz': 'bli'
}]
app.client.post('/signups', data=batch, assert_status=201)
# Same course doesn't get updated twice per request
print(mock_update.mock_calls)
mock_update.assert_has_calls([call(str(course)), call(str(other_course))],
any_order=True)
def test_patch_signup_triggers_update(app, course, mock_update):
"""Test the the update of spots gets triggered correctly."""
fake = app.data.driver.db['signups'].insert({
......
"""Test special validators, e.g. for time overlap."""
# pylint: disable=redefined-outer-name
import pytest
@pytest.fixture
def lecture(app):
"""Create a lecture, return its id."""
with app.admin():
lecture = {
'title': "Time and Space",
'department': "itet",
'year': 2,
'assistants': ['pablo'],
}
return app.client.post('lectures',
data=lecture,
assert_status=201)['_id']
def test_start_time_before_end(app, lecture):
"""Test that any start time must come before end time."""
correct = {
'lecture': lecture,
'assistant': 'pablo',
'spots': 10,
'datetimes': [{
'start': '2019-01-09T10:00:00Z',
'end': '2019-01-09T13:00:00Z',
}],
}
wrong = {
'lecture': lecture,
'assistant': 'pablo',
'spots': 10,
'datetimes': [{
'start': '2019-02-09T13:00:00Z',
'end': '2019-02-09T10:00:00Z',
}],
}
with app.admin():
app.client.post('courses', data=correct, assert_status=201)
app.client.post('courses', data=wrong, assert_status=422)
@pytest.fixture
def courses(app, lecture):
"""Return data for courses that overlap. (And a control course)"""
with app.admin():
same_time = {
'lecture': lecture,
'assistant': 'pablo',
'spots': 10,
'datetimes': [{
'start': '2019-01-09T10:00:00Z',
'end': '2019-01-09T13:00:00Z',
}]
}
first = {'room': 'ETZ E 6'}
first.update(same_time)
first_id = app.client.post('courses',
data=first,
assert_status=201)['_id']
second = {'room': 'ETZ E 8'}
second.update(same_time)
second_id = app.client.post('courses',
data=second,
assert_status=201)['_id']
# Now a third course without overlap