479 lines
16 KiB
Python
479 lines
16 KiB
Python
"""
|
|
Functional tests for the managing users.
|
|
|
|
:copyright: Copyright 2013-2017, Dorian Pula <dorian.pula@amber-penguin-software.ca>
|
|
:license: AGPL v3+
|
|
"""
|
|
|
|
import collections
|
|
import http
|
|
from unittest import mock
|
|
|
|
import pytest_bdd as bdd
|
|
import requests
|
|
from pytest_bdd import parsers
|
|
|
|
from rookeries.users import models
|
|
from tests import utils
|
|
|
|
bdd.scenarios('user_creation.feature')
|
|
bdd.scenarios('user_access.feature')
|
|
bdd.scenarios('user_modification.feature')
|
|
bdd.scenarios('user_deletion.feature')
|
|
|
|
TEST_USER_PASSWORDS = 'password-testing'
|
|
MODIFIED_PREFIX = 'Modified'
|
|
NEW_PREFIX = 'New'
|
|
|
|
|
|
TokenUserBundle = collections.namedtuple('TokenUserBundle', ['user', 'token'])
|
|
ResponseUserBundle = collections.namedtuple('ResponseUserBundle', ['response', 'user'])
|
|
|
|
|
|
@bdd.given(parsers.parse('I am an {user_role} user'))
|
|
@bdd.given(parsers.parse('I am a {user_role} user'))
|
|
def requester(user_role, api_base_uri, db_engine):
|
|
|
|
user = utils.generate_test_user(role=user_role)
|
|
utils.save_test_user_in_db(db_engine, user)
|
|
access_token = requests.post(
|
|
url=f'{api_base_uri}/auth',
|
|
json={
|
|
'username': user.username,
|
|
'password': user.password,
|
|
}
|
|
).json()['access_token']
|
|
|
|
return TokenUserBundle(user=user, token=access_token)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to create a new {user_role} user'), target_fixture='user_response')
|
|
def create_new_user_response(user_role, requester, api_base_uri):
|
|
|
|
test_user = utils.generate_test_user(role=user_role)
|
|
user_creation_request = {
|
|
'username': test_user.username,
|
|
'password': test_user.password,
|
|
'role': user_role,
|
|
'profile': {
|
|
'fullName': f'{NEW_PREFIX} {user_role.capitalize()}',
|
|
'email': test_user.email,
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
url=f'{api_base_uri}/api/users',
|
|
json=user_creation_request,
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
return ResponseUserBundle(response, test_user)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to create user with the same username'), target_fixture='user_response')
|
|
def recreate_existing_user_response(user_response, requester, api_base_uri):
|
|
test_user = user_response.user
|
|
user_creation_request = {
|
|
'username': test_user.username,
|
|
'password': test_user.password,
|
|
'role': test_user.role,
|
|
'profile': {
|
|
'fullName': f'{NEW_PREFIX} {test_user.role.capitalize()}',
|
|
'email': test_user.email,
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
url=f'{api_base_uri}/api/users',
|
|
json=user_creation_request,
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
return ResponseUserBundle(response, test_user)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to modify my own user'), target_fixture='user_response')
|
|
def modify_my_user_response(requester, api_base_uri):
|
|
|
|
user_creation_request = {
|
|
'username': requester.user.username,
|
|
'password': TEST_USER_PASSWORDS,
|
|
'role': requester.user.role,
|
|
'profile': {
|
|
'fullName': f'{MODIFIED_PREFIX} {requester.user.role.capitalize()}',
|
|
'email': requester.user.email,
|
|
}
|
|
}
|
|
|
|
return requests.put(
|
|
url=f'{api_base_uri}/api/users/{requester.user.username}',
|
|
json=user_creation_request,
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to modify an existing {user_role} user'), target_fixture='user_response')
|
|
def modify_other_user_response(user_role, requester, api_base_uri, db_engine):
|
|
|
|
test_user = utils.generate_test_user(role=user_role)
|
|
utils.save_test_user_in_db(db_engine, test_user)
|
|
|
|
user_creation_request = {
|
|
'username': test_user.username,
|
|
'password': test_user.password,
|
|
'role': user_role,
|
|
'profile': {
|
|
'fullName': f'{MODIFIED_PREFIX} {user_role.capitalize()}',
|
|
'email': test_user.email,
|
|
}
|
|
}
|
|
|
|
response = requests.put(
|
|
url=f'{api_base_uri}/api/users/{test_user.username}',
|
|
json=user_creation_request,
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
return ResponseUserBundle(response, test_user)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to modify a non-existent user'), target_fixture='user_response')
|
|
def modify_non_existent_user_response(requester, api_base_uri):
|
|
|
|
username = 'non-existent'
|
|
user_creation_request = {
|
|
'username': username,
|
|
'password': TEST_USER_PASSWORDS,
|
|
'role': models.UserRole.subscriber.name,
|
|
'profile': {
|
|
'fullName': f'{MODIFIED_PREFIX} Subscriber',
|
|
'email': 'i-still-dont-exist@rookeries.org',
|
|
}
|
|
}
|
|
|
|
return requests.put(
|
|
url=f'{api_base_uri}/api/users/{username}',
|
|
json=user_creation_request,
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to create an user with a non-json request'), target_fixture='user_response')
|
|
def invalid_non_json_create_new_user_response(requester, api_base_uri):
|
|
return requests.post(
|
|
url=f'{api_base_uri}/api/users',
|
|
data='',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to create an user with an empty request'), target_fixture='user_response')
|
|
def invalid_empty_create_new_user_response(requester, api_base_uri):
|
|
return requests.post(
|
|
url=f'{api_base_uri}/api/users',
|
|
json={},
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to modify an user with a non-json request'), target_fixture='user_response')
|
|
def invalid_non_json_modify_user_response(requester, api_base_uri):
|
|
return requests.put(
|
|
url=f'{api_base_uri}/api/users/test_user',
|
|
data='',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to modify an user with an empty request'), target_fixture='user_response')
|
|
def invalid_empty_modify_user_response(requester, api_base_uri):
|
|
return requests.put(
|
|
url=f'{api_base_uri}/api/users/test_user',
|
|
json={},
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to get my own user'), target_fixture='user_response')
|
|
def get_my_user_response(requester, api_base_uri):
|
|
return requests.get(
|
|
url=f'{api_base_uri}/api/users/{requester.user.username}',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to get a non-existent user'), target_fixture='user_response')
|
|
def get_non_existent_user_response(requester, api_base_uri):
|
|
return requests.get(
|
|
url=f'{api_base_uri}/api/users/non-existent',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to get an existing {user_role} user'), target_fixture='user_response')
|
|
def get_other_user_response(user_role, requester, api_base_uri, db_engine):
|
|
test_user = utils.generate_test_user(role=user_role)
|
|
utils.save_test_user_in_db(db_engine, test_user)
|
|
response = requests.get(
|
|
url=f'{api_base_uri}/api/users/{test_user.username}',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
return ResponseUserBundle(response, test_user)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to delete an existing {user_role} user'), target_fixture='user_response')
|
|
def delete_other_user_response(user_role, requester, api_base_uri, db_engine):
|
|
test_user = utils.generate_test_user(role=user_role)
|
|
utils.save_test_user_in_db(db_engine, test_user)
|
|
response = requests.delete(
|
|
url=f'{api_base_uri}/api/users/{test_user.username}',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
return ResponseUserBundle(response, test_user)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to delete the deleted user again'), target_fixture='user_response')
|
|
def delete_deleted_user_response(user_response, requester, api_base_uri):
|
|
test_user = user_response.user
|
|
return requests.delete(
|
|
url=f'{api_base_uri}/api/users/{test_user.username}',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to delete a non-existent user'), target_fixture='user_response')
|
|
def delete_non_existent_user_response(requester, api_base_uri):
|
|
return requests.delete(
|
|
url=f'{api_base_uri}/api/users/non-existent',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.given(parsers.parse('I try to delete my own user'), target_fixture='user_response')
|
|
def delete_my_user_response(requester, api_base_uri):
|
|
return requests.delete(
|
|
url=f'{api_base_uri}/api/users/{requester.user.username}',
|
|
headers={
|
|
'Authorization': f'JWT {requester.token}',
|
|
},
|
|
)
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a new {user_role} user'))
|
|
def assert_create_user_response(user_role, user_response, api_base_uri):
|
|
created_user = user_response.user
|
|
|
|
expected_user_creation_response = {
|
|
'username': created_user.username,
|
|
'role': user_role,
|
|
'profile': {
|
|
'aliasName': None,
|
|
'fullName': f'{NEW_PREFIX} {user_role.capitalize()}',
|
|
'email': created_user.email,
|
|
},
|
|
'urls': {
|
|
'self': f'{api_base_uri}/api/users/{created_user.username}'
|
|
}
|
|
}
|
|
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.CREATED
|
|
assert actual_response.json() == expected_user_creation_response
|
|
|
|
|
|
@bdd.then(parsers.parse('I get my own user'))
|
|
def assert_my_user_profile(requester, user_response, api_base_uri):
|
|
|
|
expected_user_creation_response = {
|
|
'username': requester.user.username,
|
|
'role': requester.user.role,
|
|
'profile': {
|
|
'aliasName': None,
|
|
'fullName': mock.ANY,
|
|
'email': requester.user.email,
|
|
},
|
|
'urls': {
|
|
'self': f'{api_base_uri}/api/users/{requester.user.username}'
|
|
}
|
|
}
|
|
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.OK
|
|
assert actual_response.json() == expected_user_creation_response
|
|
assert MODIFIED_PREFIX not in actual_response.json()['profile']['fullName']
|
|
|
|
|
|
@bdd.then(parsers.parse('I get an existing {user_role} user'))
|
|
def assert_user_profile(user_role, user_response, api_base_uri):
|
|
|
|
test_user = user_response.user
|
|
|
|
expected_user_creation_response = {
|
|
'username': test_user.username,
|
|
'role': user_role,
|
|
'profile': {
|
|
'aliasName': None,
|
|
'fullName': f'Test {user_role.capitalize()}',
|
|
'email': test_user.email,
|
|
},
|
|
'urls': {
|
|
'self': f'{api_base_uri}/api/users/{test_user.username}'
|
|
}
|
|
}
|
|
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.OK
|
|
assert actual_response.json() == expected_user_creation_response
|
|
|
|
|
|
@bdd.then(parsers.parse('updates to my own user are preserved'))
|
|
def assert_my_modified_user_profile(requester, user_response, api_base_uri):
|
|
|
|
expected_user_creation_response = {
|
|
'username': requester.user.username,
|
|
'role': requester.user.role,
|
|
'profile': {
|
|
'aliasName': None,
|
|
'fullName': f'{MODIFIED_PREFIX} {requester.user.role.capitalize()}',
|
|
'email': requester.user.email,
|
|
},
|
|
'urls': {
|
|
'self': f'{api_base_uri}/api/users/{requester.user.username}'
|
|
}
|
|
}
|
|
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.CREATED
|
|
assert actual_response.json() == expected_user_creation_response
|
|
|
|
|
|
@bdd.then(parsers.parse('updates to the {user_role} user are preserved'))
|
|
def assert_modified_user_profile(user_role, user_response, api_base_uri):
|
|
|
|
expected_user = user_response.user
|
|
expected_user_response = {
|
|
'username': expected_user.username,
|
|
'role': user_role,
|
|
'profile': {
|
|
'aliasName': None,
|
|
'fullName': f'{MODIFIED_PREFIX} {user_role.capitalize()}',
|
|
'email': expected_user.email,
|
|
},
|
|
'urls': {
|
|
'self': f'{api_base_uri}/api/users/{expected_user.username}'
|
|
}
|
|
}
|
|
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.CREATED
|
|
assert actual_response.json() == expected_user_response
|
|
|
|
|
|
@bdd.then(parsers.parse('I get an unauthorized response'))
|
|
def assert_unauthorized_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
expected_response_json = {
|
|
'error': {
|
|
'status_code': http.HTTPStatus.UNAUTHORIZED.value,
|
|
'message': 'Not authorized to access this resource.',
|
|
'resource': actual_response.request.url,
|
|
}
|
|
}
|
|
|
|
assert actual_response.status_code == http.HTTPStatus.UNAUTHORIZED
|
|
assert actual_response.json() == expected_response_json
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a forbidden response'))
|
|
def assert_forbidden_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
expected_response_json = {
|
|
'error': {
|
|
'status_code': http.HTTPStatus.FORBIDDEN.value,
|
|
'message': 'Can not delete your own user.',
|
|
'resource': actual_response.request.url,
|
|
}
|
|
}
|
|
|
|
assert actual_response.status_code == http.HTTPStatus.FORBIDDEN
|
|
assert actual_response.json() == expected_response_json
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a user can not be found message'))
|
|
def assert_resource_not_found_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
expected_response_json = {
|
|
'error': {
|
|
'status_code': http.HTTPStatus.NOT_FOUND.value,
|
|
'message': 'Resource not found.',
|
|
'resource': actual_response.request.url,
|
|
},
|
|
}
|
|
|
|
assert actual_response.status_code == http.HTTPStatus.NOT_FOUND
|
|
assert actual_response.json() == expected_response_json
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a conflict response about existing user'))
|
|
def assert_conflicting_resource_found_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
expected_response_json = {
|
|
'error': {
|
|
'status_code': http.HTTPStatus.CONFLICT.value,
|
|
'message': 'Existing resource already found. PUT to the resource to update it.',
|
|
'resource': actual_response.request.url,
|
|
},
|
|
}
|
|
|
|
assert actual_response.status_code == http.HTTPStatus.CONFLICT
|
|
assert actual_response.json() == expected_response_json
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a bad request response'))
|
|
def assert_bad_request_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
expected_response_json = {
|
|
'error': {
|
|
'status_code': http.HTTPStatus.BAD_REQUEST.value,
|
|
'message': mock.ANY,
|
|
}
|
|
}
|
|
|
|
assert actual_response.json() == expected_response_json
|
|
assert actual_response.status_code == http.HTTPStatus.BAD_REQUEST
|
|
|
|
|
|
@bdd.then(parsers.parse('I get a response indicating the deletion was successful'))
|
|
def assert_resource_deleted_response(user_response):
|
|
actual_response = utils.extract_response(user_response)
|
|
assert actual_response.status_code == http.HTTPStatus.NO_CONTENT
|
|
assert not actual_response.content
|
|
assert len(actual_response.content) == 0
|