Commit ea1990c3 authored by adietmue's avatar adietmue
Browse files

Added authentication.

parent 67e7ebd6
# new_pvk_tool
A new AMIV PVK tool using Eve and authenticating users with AMIVAPI.
## Running the tests
```
pip install pytest
# Add the test user to the test database
mongo pvk_test --eval 'db.createUser({user:"pvkuser",pwd:"pvkpass",roles:["readWrite"]});'
py.test
```
\ No newline at end of file
......@@ -4,6 +4,8 @@ from os import getcwd
from eve import Eve
from flask import Config
from auth import APIAuth, APIValidator, only_own_signups
def create_app(settings=None):
"""Super simply bootstrapping for easier testing."""
......@@ -11,7 +13,13 @@ def create_app(settings=None):
config.from_object('settings')
if settings is not None:
config.update(settings)
return Eve(settings=config)
application = Eve(auth=APIAuth, validator=APIValidator, settings=config)
for method in ['GET', 'PATCH', 'DELETE']:
event = getattr(application, 'on_pre_%s_signups' % method)
event += only_own_signups
return application
app = create_app()
"""AMIVAPI Authentication.
We use the `g` globals as intermediate storage:
- g['user'] is the user_id if the token was valid, otherwise None
- g['admin'] is True if the user is an admin, False otherwise
(This allows easier testing since we can just modify g)
"""
from functools import wraps
import json
import requests
from eve.auth import TokenAuth
from eve.io.mongo import Validator
from flask import request, g, current_app
def request_cache(key):
"""User as decorator: safe the function return in g[key]."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return getattr(g, key)
except KeyError:
setattr(g, key, f(*args, **kwargs))
return getattr(g, key)
return wrapper
return decorator
def api_get(resource, token, where, projection):
"""Format and send a GET request to AMIVAPI. Return json data or None."""
url = requests.compat.urljoin(current_app.config['AMIVAPI_URL'], resource)
headers = {'Authorization': "Token %s" % token}
params = {
'where': json.dumps(where),
'projection': json.dumps(projection)
}
response = requests(url, params=params, headers=auth_header(token))
if response.status_code == 200:
return response.json()
@request_cache('user')
def get_user():
"""Return user id if the token is valid, None otherwise."""
token = g.get(token, '')
response = api_get('sessions', token, {'token': token}, {'user': 1})
if response:
return response['_items'][0]['user']
@request_cache('nethz')
def get_nethz():
"""Return nethz of current user."""
if get_user() is not None:
response = api_get('users/%s' % get_user(), token, {}, {'nethz': 1})
return response.get('nethz')
@request_cache('admin')
def is_admin():
"""Return True if user is in the 'PVK Admins' Group, False otherwise.
The result is saved in g, to avoid checking twice, so there is no
performance loss if is_admin is called multiple times during a request.
"""
token = g.get(token, '')
user_id = get_user()
if user_id is not None:
# Find Group with correct name, return list of members
groups = api_get('groups', token,
{'name': current_app.config['ADMIN_GROUP_NAME']},
{'_id': 1})
if groups:
group_id = groups['_items'][0]['_id']
membership = api_get('groupmemberships', token,
{'user': user_id, 'group': group_id},
{'_id': 1})
return bool(membership and len(membership['_items']) != 0)
# In all other cases, user is not an admin.
return False
class APIAuth(TokenAuth):
"""Verifies the presented with AMIVAPI."""
def check_auth(self, token, allowed_roles, resource, method):
"""Allow request if token exists in AMIVAPI."""
g.token = token # Safe in g such that other methods can use it
# Valid Login always required
if get_user() is None:
return False
# Read always allowed
# Write any resource but 'signups': you need to be an admin
return method == 'GET' or (resource == 'signups') or is_admin()
# Hook to filter signups
def only_own_signups(request, lookup):
"""Users can only see signups if their ID matches."""
if not is_admin():
# Add the additional lookup with an `$and` condition
# or extend existing `$and`s
lookup.setdefault('$and', []).append({'nethz': get_nethz()})
# Validator that allows to check nethz
class APIValidator(Validator):
"""Provide a rule to check nethz of current user."""
def _validate_only_own_nethz(self, enabled, field, value):
"""If the user is no admin, only own nethz is allowed for singup."""
if enabled and not is_admin():
if value != get_nethz():
self._error(field,
"You can only use your own nethz to sign up.")
def _validate_unique_combination(self, unique_combination, field, value):
"""Validate that a combination of fields is unique.
Code is copy-pasted from amivapi, see there for more explanation.
https://github.com/amiv-eth/amivapi/blob/master/amivapi/utils.py
"""
lookup = {field: value} # self
for other_field in unique_combination:
lookup[other_field] = self.document.get(other_field)
if request.method == 'PATCH':
original = self._original_document
for key in unique_combination:
if key not in self.document.keys():
lookup[key] = original[key]
if current_app.data.find_one(self.resource, None, **lookup) is not None:
self._error(field, "value already exists in the database in " +
"combination with values for: %s" %
unique_combination)
Cerberus==0.9.2
certifi==2017.7.27.1
chardet==3.0.4
click==6.7
Eve==0.7.4
Events==0.2.2
Flask==0.12
Flask-PyMongo==0.5.1
idna==2.6
itsdangerous==0.24
Jinja2==2.9.6
MarkupSafe==0.23
py==1.4.34
pymongo==3.5.1
pytest==3.2.3
requests==2.18.4
simplejson==3.11.1
urllib3==1.22
Werkzeug==0.11.15
"""API Configuration."""
"""Configuration."""
# AMIVAPI URL and Admin Group
AMIVAPI_URL = "https://amiv-api.ethz.ch"
ADMIN_GROUP_NAME = 'PVK Admins'
# DB
MONGO_HOST = 'localhost'
......@@ -7,11 +11,15 @@ MONGO_USERNAME = 'pvkuser'
MONGO_PASSWORD = 'pvkpass'
MONGO_DBNAME = 'pvk'
RESOURCE_METHODS = ['GET', 'POST']
ITEM_METHODS = ['GET', 'PATCH', 'DELETE']
# ISO 8601 time format instead of rfc1123
DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# A schema for required start/end time tuple
TIMESPAN = {
'type': 'dict',
......@@ -120,14 +128,10 @@ DOMAIN = {
'nethz': {
'type': 'string',
'maxlength': 10,
'unique': True,
'empty': False,
'nullable': False,
'required': True,
},
'name': {
'type': 'string',
'readonly': True,
'only_own_nethz': True,
},
'course': {
'type': 'objectid',
......@@ -136,7 +140,7 @@ DOMAIN = {
'field': '_id',
'embeddable': True
},
'unique_combination': ['nethz'],
},
'status': {
'type': 'string',
......
"""Test fixtures."""
import pytest
import json
import pytest
from pymongo import MongoClient
from flask.testing import FlaskClient
from app import create_app
......@@ -38,17 +38,21 @@ class TestClient(FlaskClient):
return json.loads(response.get_data(as_text=True))
def dropDB(app):
def drop_database(application):
"""Drop drop drop!"""
connection = MongoClient(app.config['MONGO_HOST'],
app.config['MONGO_PORT'])
connection.drop_database(app.config['MONGO_DBNAME'])
connection = MongoClient(application.config['MONGO_HOST'],
application.config['MONGO_PORT'])
connection.drop_database(application.config['MONGO_DBNAME'])
connection.close()
@pytest.fixture(scope='session')
def client():
app = create_app(settings=TEST_SETTINGS)
app.test_client_class = TestClient
yield app.test_client()
dropDB(app)
@pytest.fixture
def app():
"""Create app, instantiate test client, drop DB after use."""
application = create_app(settings=TEST_SETTINGS)
application.test_client_class = TestClient
application.client = application.test_client()
yield application
drop_database(application)
"""Tests for authentication."""
import pytest
from flask import g
@pytest.mark.parametrize('resource',
['lectures', 'assistants', 'courses', 'signups'])
@pytest.mark.parametrize('method', ['get', 'post'])
def test_auth_required_for_resource(app, resource, method):
"""Without auth header, we get get 401 for all methods."""
getattr(app.client, method)('/' + resource, data={}, assert_status=401)
@pytest.mark.parametrize('resource',
['lectures', 'assistants', 'courses', 'signups'])
@pytest.mark.parametrize('method', ['get', 'patch', 'delete'])
def test_auth_required_for_item(app, resource, method):
"""Without auth provided, we can access any item either."""
# Bypass validation and put a empty item directly into db
with app.app_context():
_id = app.data.driver.db[resource].insert({})
getattr(app.client, method)('/%s/%s' % (resource, _id),
data={},
assert_status=401)
@pytest.mark.parametrize('resource', ['lectures', 'assistants', 'courses'])
def test_user_can_read(app, resource):
"""Users should be able to to GET requests on resource and item.
Not signups! There, users can only see their own items -> extra test
This implies that admins can read, too, since every admin is a user.
"""
with app.test_request_context():
# Fake a user
g.user = 'Not None :)'
faketoken = {'Authorization': 'Token Trolololo'}
# Read resource
app.client.get('/' + resource, headers=faketoken, assert_status=200)
# Create fake item and read item
_id = app.data.driver.db[resource].insert({})
app.client.get('/%s/%s' % (resource, _id),
headers=faketoken,
assert_status=200)
@pytest.mark.parametrize('resource', ['lectures', 'assistants', 'courses'])
def test_user_cannot_write(app, resource):
"""Users cannot create, modify or delete."""
with app.test_request_context():
# Fake a user, specify non-admin
g.user = 'Not None :)'
g.admin = False
faketoken = {'Authorization': 'Token Trolololo'}
data = {}
# Post something
app.client.post('/' + resource,
data=data,
headers=faketoken,
assert_status=401)
# Create fake item, try to patch/delete it
_id = app.data.driver.db[resource].insert({})
app.client.patch('/%s/%s' % (resource, _id),
data=data,
headers=faketoken,
assert_status=401)
app.client.delete('/%s/%s' % (resource, _id),
headers=faketoken,
assert_status=401)
def test_signup_with_own_nethz_only(app):
"""Users can only post singups with their own nethz."""
with app.test_request_context():
nethz = 'Something'
# Fake a user, specify non-admin
g.user = 'Not None :)'
g.nethz = nethz
g.admin = False
faketoken = {'Authorization': 'Token Trolololo'}
# Create fake course to sign up to
course = str(app.data.driver.db['courses'].insert({}))
# Try with other nethz
bad_signup = {
'nethz': 'Notthenethz',
'course': course
}
app.client.post('/signups',
data=bad_signup,
headers=faketoken,
assert_status=422)
# Try with own nethz
good_signup = {
'nethz': nethz,
'course': course
}
app.client.post('/signups',
data=good_signup,
headers=faketoken,
assert_status=201)
def test_user_signup_visibility(app):
"""Test that we a user cannot see others' signups."""
with app.test_request_context():
# Fake a user, specify non-admin
g.user = 'Not None :)'
g.nethz = 'Something'
g.admin = False
token = {'Authorization': 'Token FakeeTrolololo', 'If-Match': 'Wrong'}
# Create fake signup with different nethz
own = str(app.data.driver.db['signups'].insert({'nethz': g.nethz}))
other = str(app.data.driver.db['signups'].insert({'nethz': 'trolo'}))
# Resource: Can only see own, not both signups
response = app.client.get('/signups', headers=token, assert_status=200)
assert len(response['_items']) == 1
assert response['_items'][0]['nethz'] == g.nethz
# Items
own_url = '/signups/' + own
other_url = '/signups/' + other
# Get
app.client.get(own_url, headers=token, assert_status=200)
app.client.get(other_url, headers=token, assert_status=404)
# Patch (if we can see item, we get 412 since etag is wrong)
app.client.patch(own_url, headers=token, data={}, assert_status=412)
app.client.patch(other_url, headers=token, data={}, assert_status=404)
# Delete (etag missing again)
app.client.delete(own_url, headers=token, assert_status=412)
app.client.delete(other_url, headers=token, assert_status=404)
def test_admin_signup_visibility(app):
"""Test that we an admin can see others' signups."""
with app.test_request_context():
# Fake a user, specify admin
g.user = 'Not None :)'
g.nethz = 'Nothing special really'
g.admin = True
token = {'Authorization': 'Token FakeeTrolololo', 'If-Match': 'Wrong'}
# Create fake signup with different nethz
other = str(app.data.driver.db['signups'].insert({'nethz': 'trolo'}))
# Resource: Can see signups
response = app.client.get('/signups', headers=token, assert_status=200)
assert len(response['_items']) == 1
# Items
url = '/signups/' + other
# Get
app.client.get(url, headers=token, assert_status=200)
# Patch (if we can see item, we get 412 since etag is wrong)
app.client.patch(url, headers=token, data={}, assert_status=412)
# Delete (etag missing again)
app.client.delete(url, headers=token, assert_status=412)
"""Tests for basic requests to all resources."""
def test_create(client):
"""Test creating everything."""
lecture = {
'title': "Awesome Lecture",
'department': "itet",
'year': 3,
}
lecture_response = client.post('lectures', data=lecture, assert_status=201)
assistant = {'nethz': "Pablo"}
assistant_response = client.post('assistants',
data=assistant,
assert_status=201)
course = {
'lecture': lecture_response['_id'],
'assistant': assistant_response['_id'],
'room': 'ETZ E 6',
'spots': 30,
'signup': {
'start': '2020-05-01T10:00:00Z',
'end': '2020-05-05T23:59:59Z',
},
'datetimes': [{
from flask import g
def test_create(app):
"""Test creating everything as an admin user."""
with app.test_request_context():
# Fake a admin user
g.user = 'Not None :)'
g.admin = True
faketoken = {'Authorization': 'Token Trolololo'}
lecture = {
'title': "Awesome Lecture",
'department': "itet",
'year': 3,
}
lecture_response = app.client.post('lectures',
data=lecture,
headers=faketoken,
assert_status=201)
assistant = {'nethz': "Pablo"}
assistant_response = app.client.post('assistants',
data=assistant,
headers=faketoken,
assert_status=201)
course = {
'lecture': lecture_response['_id'],
'assistant': assistant_response['_id'],
'room': 'ETZ E 6',
'spots': 30,
'signup': {
'start': '2020-05-01T10:00:00Z',
'end': '2020-05-05T23:59:59Z',
},
'datetimes': [{
'start': '2020-06-05T10:00:00Z',
'end': '2020-06-05T12:00:00Z',
}, {
'start': '2020-06-06T10:00:00Z',
'end': '2020-06-06T12:00:00Z',
}],
}
course_response = client.post('courses', data=course, assert_status=201)
signup = {
'nethz': "Pablito",
'course': course_response['_id']
}
client.post('signups', data=signup, assert_status=201)
}
course_response = app.client.post('courses',
data=course,
headers=faketoken,
assert_status=201)
signup = {
'nethz': "Pablito",
'course': course_response['_id']
}
app.client.post('signups',
data=signup,
headers=faketoken,
assert_status=201)
def test_no_double_signup(app):
"""Users can signup for several courses, but not for any course twice."""
with app.test_request_context():
# Fake a admin user
g.user = 'Not None :)'
g.admin = True
faketoken = {'Authorization': 'Token Trolololo'}
# Create two fake courses to sign up to
first = str(app.data.driver.db['courses'].insert({}))
second = str(app.data.driver.db['courses'].insert({}))
def _signup(course, assert_status):
signup = {
'nethz': "Pablito",
'course': course
}
app.client.post('signups',
data=signup,
headers=faketoken,
assert_status=assert_status)
# No Double signup to same course
_signup(first, 201)
_signup(first, 422)
# Sign up to other courses still fine
_signup(second, 201)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment