Add a mostly working setup to create working users.
Allow for authentication endpoint to work better.
This commit is contained in:
parent
fffe5c58b1
commit
62c1fc374d
|
@ -16,7 +16,10 @@ from werkzeug import exceptions
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
HTTP_ERRORS_WITH_PATHS = [http.HTTPStatus.NOT_FOUND, http.HTTPStatus.UNAUTHORIZED]
|
HTTP_ERRORS_WITH_PATHS = [
|
||||||
|
http.HTTPStatus.NOT_FOUND,
|
||||||
|
http.HTTPStatus.UNAUTHORIZED,
|
||||||
|
]
|
||||||
HTTP_ERRORS_WITHOUT_PATHS = [
|
HTTP_ERRORS_WITHOUT_PATHS = [
|
||||||
http_code
|
http_code
|
||||||
for http_code in exceptions.default_exceptions
|
for http_code in exceptions.default_exceptions
|
||||||
|
@ -39,6 +42,13 @@ class ConflictingResourceError(Exception):
|
||||||
self.message = message
|
self.message = message
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidCredentialsError(Exception):
|
||||||
|
def __init__(self, username):
|
||||||
|
self.message = 'Invalid credentials provided.'
|
||||||
|
self.username = username
|
||||||
|
self.code = http.HTTPStatus.UNAUTHORIZED
|
||||||
|
|
||||||
|
|
||||||
def get_message_for_status(http_code):
|
def get_message_for_status(http_code):
|
||||||
try:
|
try:
|
||||||
http_status = http.HTTPStatus(http_code)
|
http_status = http.HTTPStatus(http_code)
|
||||||
|
@ -70,6 +80,17 @@ def error_message_from_http_code(error):
|
||||||
return flask.jsonify(error_message), error.code
|
return flask.jsonify(error_message), error.code
|
||||||
|
|
||||||
|
|
||||||
|
def error_message_from_invalid_credentials(error):
|
||||||
|
logger.error(f'Invalid credentials provided for "{error.username}"')
|
||||||
|
error_message = {
|
||||||
|
'error': {
|
||||||
|
'status_code': error.code,
|
||||||
|
'message': error.message,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return flask.jsonify(error_message), error.code
|
||||||
|
|
||||||
|
|
||||||
def error_message_from_invalid_op_error(error: InvalidOperationError):
|
def error_message_from_invalid_op_error(error: InvalidOperationError):
|
||||||
logger.error(f'Invalid operation for "{flask.request.url}" because "{error.message}"')
|
logger.error(f'Invalid operation for "{flask.request.url}" because "{error.message}"')
|
||||||
error_message = {
|
error_message = {
|
||||||
|
@ -120,12 +141,26 @@ def error_message_from_json_schema_validation_error(error: jsonschema.Validation
|
||||||
|
|
||||||
def attach_error_handlers_to_api_app(app):
|
def attach_error_handlers_to_api_app(app):
|
||||||
for http_code in HTTP_ERRORS_WITH_PATHS:
|
for http_code in HTTP_ERRORS_WITH_PATHS:
|
||||||
app.register_error_handler(http_code, error_message_from_http_code_with_resource)
|
app.register_error_handler(
|
||||||
|
http_code, error_message_from_http_code_with_resource)
|
||||||
|
|
||||||
for http_code in HTTP_ERRORS_WITHOUT_PATHS:
|
for http_code in HTTP_ERRORS_WITHOUT_PATHS:
|
||||||
app.register_error_handler(http_code, error_message_from_http_code)
|
app.register_error_handler(http_code, error_message_from_http_code)
|
||||||
|
|
||||||
app.register_error_handler(InvalidOperationError, error_message_from_invalid_op_error)
|
app.register_error_handler(
|
||||||
app.register_error_handler(ConflictingResourceError, error_message_from_conflicting_resource_error)
|
InvalidOperationError,
|
||||||
app.register_error_handler(jsonschema.ValidationError, error_message_from_json_schema_validation_error)
|
error_message_from_invalid_op_error
|
||||||
|
)
|
||||||
|
app.register_error_handler(
|
||||||
|
ConflictingResourceError,
|
||||||
|
error_message_from_conflicting_resource_error
|
||||||
|
)
|
||||||
|
app.register_error_handler(
|
||||||
|
jsonschema.ValidationError,
|
||||||
|
error_message_from_json_schema_validation_error
|
||||||
|
)
|
||||||
|
app.register_error_handler(
|
||||||
|
InvalidCredentialsError,
|
||||||
|
error_message_from_invalid_credentials
|
||||||
|
)
|
||||||
app.register_error_handler(Exception, error_message_from_exception)
|
app.register_error_handler(Exception, error_message_from_exception)
|
||||||
|
|
|
@ -18,6 +18,7 @@ class Page(db.DatabaseModel):
|
||||||
# TODO: Break out into content blocks.
|
# TODO: Break out into content blocks.
|
||||||
# TODO: Add in content type, created, updated dates and author
|
# TODO: Add in content type, created, updated dates and author
|
||||||
# TODO: Author is based on a user.
|
# TODO: Author is based on a user.
|
||||||
|
# TODO: Add a language as in markdown / html
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import logging
|
||||||
import flask
|
import flask
|
||||||
from werkzeug import security
|
from werkzeug import security
|
||||||
|
|
||||||
|
from rookeries import database, errors
|
||||||
from rookeries.users import models
|
from rookeries.users import models
|
||||||
from rookeries.vendors import flask_jwt
|
from rookeries.vendors import flask_jwt
|
||||||
|
|
||||||
|
@ -26,12 +27,21 @@ logger = logging.getLogger(__name__)
|
||||||
@jwt.authentication_handler
|
@jwt.authentication_handler
|
||||||
def valid_admin_user_credentials(username, password):
|
def valid_admin_user_credentials(username, password):
|
||||||
"""Checks if the supplied password matches the stored password hash."""
|
"""Checks if the supplied password matches the stored password hash."""
|
||||||
user = models.User.query.filter_by(username=username).first()
|
# TODO: Attach DB to Flask app context.
|
||||||
if not user:
|
db = database.CouchDB.configure_from_env()
|
||||||
return
|
found_user = db.find_first_document({
|
||||||
|
'type': models.User.doc_type(),
|
||||||
|
'username': username,
|
||||||
|
})
|
||||||
|
|
||||||
|
if not found_user:
|
||||||
|
raise errors.InvalidCredentialsError(username)
|
||||||
|
|
||||||
|
user = models.User(**found_user)
|
||||||
if security.check_password_hash(user.password, password):
|
if security.check_password_hash(user.password, password):
|
||||||
return user.to_json()
|
return user.to_json()
|
||||||
|
else:
|
||||||
|
raise errors.InvalidCredentialsError(username)
|
||||||
|
|
||||||
|
|
||||||
@jwt.identity_handler
|
@jwt.identity_handler
|
||||||
|
|
|
@ -5,84 +5,48 @@ Models for users.
|
||||||
:license: AGPL v3+
|
:license: AGPL v3+
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import enum
|
import copy
|
||||||
|
|
||||||
from werkzeug import security
|
from werkzeug import security
|
||||||
|
|
||||||
|
from rookeries import database as db
|
||||||
class UserRole(enum.Enum):
|
|
||||||
admin = 1
|
|
||||||
editor = 2
|
|
||||||
subscriber = 3
|
|
||||||
|
|
||||||
|
|
||||||
class User(object):
|
class User(db.DatabaseModel):
|
||||||
__tablename__ = 'user'
|
|
||||||
|
|
||||||
# id = db.Column(db.Integer, primary_key=True)
|
def __init__(self, **kwargs):
|
||||||
# username = db.Column(db.String(128), unique=True)
|
super().__init__(**kwargs)
|
||||||
# password = db.Column(db.String, nullable=False)
|
self.username = kwargs.get('username')
|
||||||
# role = db.Column(db.Enum(UserRole))
|
self.full_name = kwargs.get('fullName')
|
||||||
# profile = db.relationship('UserProfile', uselist=False, back_populates='user')
|
# TODO: Include a way to check parts of a profile
|
||||||
|
self.profile = kwargs.get('profile', {})
|
||||||
def __init__(self, username, role=UserRole.subscriber):
|
# TODO: Know when to hash and when to not...
|
||||||
self.username = username
|
password = kwargs.get('password')
|
||||||
self.role = role
|
if password:
|
||||||
|
# TODO: Also determine when to hash to avoid multiple hashing.
|
||||||
def __repr__(self) -> str:
|
self.password = security.generate_password_hash(password)
|
||||||
return f'<User {self.username}>'
|
else:
|
||||||
|
self.password = None
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_json(json_dict: dict):
|
def doc_type():
|
||||||
username = json_dict['username']
|
return 'user'
|
||||||
user = User(username=username)
|
|
||||||
|
|
||||||
password = json_dict['password']
|
def _json_repr(self) -> dict:
|
||||||
user.password = security.generate_password_hash(password)
|
# TODO: Consider creating some sort of json template and mapper.
|
||||||
|
|
||||||
try:
|
|
||||||
user_role = UserRole[json_dict.get('role', UserRole.subscriber.name)]
|
|
||||||
except ValueError:
|
|
||||||
user_role = UserRole.subscriber
|
|
||||||
user.role = user_role
|
|
||||||
|
|
||||||
user_profile = UserProfile.from_json(json_dict.get('profile', {}))
|
|
||||||
user_profile.user = user
|
|
||||||
user.profile = user_profile
|
|
||||||
return user
|
|
||||||
|
|
||||||
def to_json(self) -> dict:
|
|
||||||
return {
|
return {
|
||||||
'id': self.id,
|
|
||||||
'username': self.username,
|
'username': self.username,
|
||||||
'role': self.role.name,
|
|
||||||
'profile': self.profile.to_json(),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class UserProfile(object):
|
|
||||||
__tablename__ = 'user_profile'
|
|
||||||
|
|
||||||
# id = db.Column(db.Integer, primary_key=True)
|
|
||||||
# user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
|
||||||
# user = db.relationship('User')
|
|
||||||
# alias_name = db.Column(db.String)
|
|
||||||
# full_name = db.Column(db.String)
|
|
||||||
# email = db.Column(db_types.EmailType)
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return f'<UserProfile {self.user.username}>'
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json(json_dict: dict):
|
|
||||||
user_profile = UserProfile()
|
|
||||||
user_profile.alias_name = json_dict.get('aliasName')
|
|
||||||
user_profile.full_name = json_dict.get('fullName')
|
|
||||||
user_profile.email = json_dict.get('email')
|
|
||||||
return user_profile
|
|
||||||
|
|
||||||
def to_json(self) -> dict:
|
|
||||||
return {
|
|
||||||
'fullName': self.full_name,
|
'fullName': self.full_name,
|
||||||
'aliasName': self.alias_name,
|
'profile': {
|
||||||
'email': self.email,
|
'email': self.profile.get('email'),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def to_api_json(self):
|
||||||
|
# TODO: Figure out how to get a user self link....
|
||||||
|
return copy.deepcopy(self._json_repr())
|
||||||
|
|
||||||
|
def to_db_json(self):
|
||||||
|
db_json = copy.deepcopy(self._json_repr())
|
||||||
|
db_json['password'] = self.password
|
||||||
|
return self.attach_db_metadata(db_json)
|
||||||
|
|
|
@ -11,8 +11,7 @@ import logging
|
||||||
import flask
|
import flask
|
||||||
import jsonschema
|
import jsonschema
|
||||||
|
|
||||||
from rookeries import errors
|
from rookeries import database, errors
|
||||||
from rookeries.database import db
|
|
||||||
from rookeries.main import rookeries_app
|
from rookeries.main import rookeries_app
|
||||||
from rookeries.users import models, schema
|
from rookeries.users import models, schema
|
||||||
from rookeries.vendors import flask_jwt
|
from rookeries.vendors import flask_jwt
|
||||||
|
@ -31,16 +30,19 @@ def replace_id_with_self_link(user_json):
|
||||||
@rookeries_app.route('/api/users/<username>', methods=['GET'])
|
@rookeries_app.route('/api/users/<username>', methods=['GET'])
|
||||||
@flask_jwt.jwt_required()
|
@flask_jwt.jwt_required()
|
||||||
def get_user(username):
|
def get_user(username):
|
||||||
user = models.User.query.filter_by(username=username).first_or_404()
|
|
||||||
|
|
||||||
current_user = flask_jwt.current_identity
|
# TODO: Attach DB to Flask app context.
|
||||||
requesting_user_role = models.UserRole[current_user['role']]
|
db = database.CouchDB.configure_from_env()
|
||||||
|
found_user = db.find_first_document({
|
||||||
|
'type': models.User.doc_type(),
|
||||||
|
'username': username,
|
||||||
|
})
|
||||||
|
|
||||||
if requesting_user_role != models.UserRole.admin and user.username != current_user['username']:
|
if not found_user:
|
||||||
flask.abort(http.HTTPStatus.UNAUTHORIZED)
|
flask.abort(http.HTTPStatus.NOT_FOUND)
|
||||||
|
|
||||||
user_response = replace_id_with_self_link(user.to_json())
|
user = models.User(**found_user)
|
||||||
return flask.jsonify(user_response)
|
return flask.jsonify(user.to_api_json())
|
||||||
|
|
||||||
|
|
||||||
@rookeries_app.route('/api/users/', methods=['POST'])
|
@rookeries_app.route('/api/users/', methods=['POST'])
|
||||||
|
@ -53,20 +55,11 @@ def create_user():
|
||||||
incoming_request = flask.request.get_json()
|
incoming_request = flask.request.get_json()
|
||||||
jsonschema.validate(incoming_request, schema.USER_CREATION_MODIFICATION_SCHEMA)
|
jsonschema.validate(incoming_request, schema.USER_CREATION_MODIFICATION_SCHEMA)
|
||||||
|
|
||||||
# Check if user allowed to create a user.
|
|
||||||
current_user = flask_jwt.current_identity
|
|
||||||
requesting_user_role = models.UserRole[current_user['role']]
|
|
||||||
|
|
||||||
if requesting_user_role != models.UserRole.admin:
|
|
||||||
flask.abort(http.HTTPStatus.UNAUTHORIZED)
|
|
||||||
|
|
||||||
# Creates a user from the json if no user exists with the given username.
|
# Creates a user from the json if no user exists with the given username.
|
||||||
if models.User.query.filter_by(username=incoming_request['username']).first():
|
if models.User.query.filter_by(username=incoming_request['username']).first():
|
||||||
raise errors.ConflictingResourceError('Existing resource already found. PUT to the resource to update it.')
|
raise errors.ConflictingResourceError('Existing resource already found. PUT to the resource to update it.')
|
||||||
|
|
||||||
user = models.User.from_json(incoming_request)
|
user = models.User.from_json(incoming_request)
|
||||||
db.session.add(user)
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
user_response = replace_id_with_self_link(user.to_json())
|
user_response = replace_id_with_self_link(user.to_json())
|
||||||
return flask.jsonify(user_response), http.HTTPStatus.CREATED
|
return flask.jsonify(user_response), http.HTTPStatus.CREATED
|
||||||
|
@ -84,9 +77,8 @@ def update_user(username):
|
||||||
|
|
||||||
# Check if user allowed to create a user.
|
# Check if user allowed to create a user.
|
||||||
current_user = flask_jwt.current_identity
|
current_user = flask_jwt.current_identity
|
||||||
requesting_user_role = models.UserRole[current_user['role']]
|
|
||||||
|
|
||||||
if requesting_user_role != models.UserRole.admin and current_user['username'] != username:
|
if current_user['username'] != username:
|
||||||
flask.abort(http.HTTPStatus.UNAUTHORIZED)
|
flask.abort(http.HTTPStatus.UNAUTHORIZED)
|
||||||
|
|
||||||
# Modifies a user from the json.
|
# Modifies a user from the json.
|
||||||
|
@ -97,8 +89,6 @@ def update_user(username):
|
||||||
existing_user.profile.alias_name = updated_user.profile.alias_name
|
existing_user.profile.alias_name = updated_user.profile.alias_name
|
||||||
existing_user.profile.email = updated_user.profile.email
|
existing_user.profile.email = updated_user.profile.email
|
||||||
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
user_response = replace_id_with_self_link(updated_user.to_json())
|
user_response = replace_id_with_self_link(updated_user.to_json())
|
||||||
return flask.jsonify(user_response), http.HTTPStatus.CREATED
|
return flask.jsonify(user_response), http.HTTPStatus.CREATED
|
||||||
|
|
||||||
|
@ -110,16 +100,9 @@ def update_user(username):
|
||||||
@flask_jwt.jwt_required()
|
@flask_jwt.jwt_required()
|
||||||
def delete_user(username):
|
def delete_user(username):
|
||||||
user = models.User.query.filter_by(username=username).first_or_404()
|
user = models.User.query.filter_by(username=username).first_or_404()
|
||||||
|
|
||||||
current_user = flask_jwt.current_identity
|
current_user = flask_jwt.current_identity
|
||||||
requesting_user_role = models.UserRole[current_user['role']]
|
|
||||||
|
|
||||||
if requesting_user_role != models.UserRole.admin:
|
|
||||||
flask.abort(http.HTTPStatus.UNAUTHORIZED)
|
|
||||||
|
|
||||||
if user.username == current_user['username']:
|
if user.username == current_user['username']:
|
||||||
raise errors.InvalidOperationError(message='Can not delete your own user.')
|
raise errors.InvalidOperationError(message='Can not delete your own user.')
|
||||||
|
|
||||||
db.session.delete(user)
|
|
||||||
db.session.commit()
|
|
||||||
return '', http.HTTPStatus.NO_CONTENT
|
return '', http.HTTPStatus.NO_CONTENT
|
||||||
|
|
|
@ -18,13 +18,12 @@ from tests import utils
|
||||||
@bdd.given(parsers.parse('I am a valid user'))
|
@bdd.given(parsers.parse('I am a valid user'))
|
||||||
def auth_token_headers(api_base_uri, db):
|
def auth_token_headers(api_base_uri, db):
|
||||||
|
|
||||||
user = utils.generate_test_user()
|
user, password = utils.generate_test_user(db)
|
||||||
utils.save_test_user_in_db(db, user)
|
|
||||||
token = requests.post(
|
token = requests.post(
|
||||||
url=f'{api_base_uri}/auth',
|
url=f'{api_base_uri}/auth',
|
||||||
json={
|
json={
|
||||||
'username': user.username,
|
'username': user.username,
|
||||||
'password': user.password,
|
'password': password,
|
||||||
}
|
}
|
||||||
).json()['access_token']
|
).json()['access_token']
|
||||||
|
|
||||||
|
|
|
@ -6,8 +6,7 @@ Scenario: A visitor can get an existing site
|
||||||
And I get the site
|
And I get the site
|
||||||
Then I get a valid site with a menu and landing page
|
Then I get a valid site with a menu and landing page
|
||||||
|
|
||||||
# TODO: Enable once authentication endpoints ready.
|
Scenario: A valid user can get an existing site
|
||||||
#Scenario: A valid user can get an existing site
|
Given I am a valid user
|
||||||
# Given I am a valid user
|
And I get the site
|
||||||
# And I get the site
|
Then I get a valid site with a menu and landing page
|
||||||
# Then I get a valid site with a menu and landing page
|
|
||||||
|
|
|
@ -53,5 +53,3 @@ def test_calling_delete_on_missing_api_endpoint_returns_405(api_base_uri):
|
||||||
missing_api_url = f'{api_base_uri}/api/missing/resource'
|
missing_api_url = f'{api_base_uri}/api/missing/resource'
|
||||||
actual_response = requests.delete(missing_api_url)
|
actual_response = requests.delete(missing_api_url)
|
||||||
assert_method_not_allowed_response(actual_response)
|
assert_method_not_allowed_response(actual_response)
|
||||||
|
|
||||||
# TODO: Add tests for missing React pages rendering.
|
|
||||||
|
|
|
@ -17,7 +17,6 @@ from tests import utils
|
||||||
|
|
||||||
# TODO: Rework site management and testing.
|
# TODO: Rework site management and testing.
|
||||||
bdd.scenarios('site_access.feature')
|
bdd.scenarios('site_access.feature')
|
||||||
# TODO: Add scenario to bootstrap site, when is not available.
|
|
||||||
# TODO: Rework modification to only allow changes to layout and children...
|
# TODO: Rework modification to only allow changes to layout and children...
|
||||||
# bdd.scenarios('site_modification.feature')
|
# bdd.scenarios('site_modification.feature')
|
||||||
|
|
||||||
|
|
|
@ -16,28 +16,26 @@ from pytest_bdd import parsers
|
||||||
|
|
||||||
from tests import utils
|
from tests import utils
|
||||||
|
|
||||||
# TODO: Re-enable
|
bdd.scenarios('user_authentication.feature')
|
||||||
# bdd.scenarios('user_authentication.feature')
|
|
||||||
|
|
||||||
Credentials = collections.namedtuple('Credentials', ['username', 'password'])
|
Credentials = collections.namedtuple('Credentials', ['username', 'password'])
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
def valid_test_user(db):
|
def valid_user(db):
|
||||||
test_user = utils.generate_test_user()
|
test_user, password = utils.generate_test_user(db)
|
||||||
utils.save_test_user_in_db(db, test_user)
|
return Credentials(test_user.username, password)
|
||||||
return Credentials(test_user.username, test_user.password)
|
|
||||||
|
|
||||||
|
|
||||||
@bdd.given(parsers.parse('I am a {user_type} user with {login_type} credentials'))
|
@bdd.given(parsers.parse('I am a {user_type} user with {login_type} credentials'))
|
||||||
def user_credentials(user_type, login_type, valid_test_user):
|
def user_credentials(user_type, login_type, valid_user):
|
||||||
if not user_type == 'valid':
|
if not user_type == 'valid':
|
||||||
return Credentials('invalid_user', 'invalid_password')
|
return Credentials('invalid_user', 'invalid_password')
|
||||||
|
|
||||||
if not login_type == 'valid':
|
if not login_type == 'valid':
|
||||||
return Credentials(valid_test_user.username, 'invalid_password')
|
return Credentials(valid_user.username, 'invalid_password')
|
||||||
|
|
||||||
return valid_test_user
|
return valid_user
|
||||||
|
|
||||||
|
|
||||||
@bdd.given('I post my credentials against the auth endpoint')
|
@bdd.given('I post my credentials against the auth endpoint')
|
||||||
|
@ -57,15 +55,16 @@ def assert_valid_user_json_web_token(user_credentials: Credentials, auth_endpoin
|
||||||
decoded_jwt = jwt.decode(jwt_token, verify=False)
|
decoded_jwt = jwt.decode(jwt_token, verify=False)
|
||||||
assert 'identity' in decoded_jwt
|
assert 'identity' in decoded_jwt
|
||||||
|
|
||||||
assert auth_endpoint_response.json().get('username') == user_credentials.username
|
assert auth_endpoint_response.json().get('username') == user_credentials.user.username
|
||||||
|
|
||||||
|
|
||||||
@bdd.then('I get an unauthorized response')
|
@bdd.then('I get an unauthorized response')
|
||||||
def assert_unauthorized_response(auth_endpoint_response: requests.Response):
|
def assert_unauthorized_response(auth_endpoint_response: requests.Response):
|
||||||
expected_json = {
|
expected_json = {
|
||||||
'status_code': http.HTTPStatus.UNAUTHORIZED.value,
|
'error': {
|
||||||
'error': 'Bad Request',
|
'message': 'Invalid credentials provided.',
|
||||||
'description': 'Invalid credentials',
|
'status_code': http.HTTPStatus.UNAUTHORIZED.value,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
assert auth_endpoint_response.status_code == http.HTTPStatus.UNAUTHORIZED
|
assert auth_endpoint_response.status_code == http.HTTPStatus.UNAUTHORIZED
|
||||||
|
|
|
@ -1,19 +1,16 @@
|
||||||
Feature: User Authentication
|
Feature: User Authentication
|
||||||
The user endpoint allows user to authenticate against the system
|
The user endpoint allows user to authenticate against the system
|
||||||
|
|
||||||
|
|
||||||
Scenario: Valid user can authenticate against site with valid credentials
|
Scenario: Valid user can authenticate against site with valid credentials
|
||||||
Given I am a valid user with valid credentials
|
Given I am a valid user with valid credentials
|
||||||
And I post my credentials against the auth endpoint
|
And I post my credentials against the auth endpoint
|
||||||
Then I get a valid JWT
|
Then I get a valid JWT
|
||||||
|
|
||||||
|
|
||||||
Scenario: Valid user cannot authenticate against site with invalid credentials
|
Scenario: Valid user cannot authenticate against site with invalid credentials
|
||||||
Given I am a valid user with invalid credentials
|
Given I am a valid user with invalid credentials
|
||||||
And I post my credentials against the auth endpoint
|
And I post my credentials against the auth endpoint
|
||||||
Then I get an unauthorized response
|
Then I get an unauthorized response
|
||||||
|
|
||||||
|
|
||||||
Scenario: Invalid user cannot authenticate against site with credentials
|
Scenario: Invalid user cannot authenticate against site with credentials
|
||||||
Given I am a invalid user with some credentials
|
Given I am a invalid user with some credentials
|
||||||
And I post my credentials against the auth endpoint
|
And I post my credentials against the auth endpoint
|
||||||
|
|
|
@ -18,8 +18,8 @@ from rookeries.users import models as user_models
|
||||||
|
|
||||||
RANDOM_SUFFIX_CHARACTERS = list(string.ascii_lowercase + string.digits)
|
RANDOM_SUFFIX_CHARACTERS = list(string.ascii_lowercase + string.digits)
|
||||||
|
|
||||||
TestUserInfo = collections.namedtuple('TestUserInfo', ['username', 'password', 'role', 'email'])
|
ResponseSampleBundle = collections.namedtuple(
|
||||||
ResponseSampleBundle = collections.namedtuple('ResponseSampleBundle', ['response', 'sample'])
|
'ResponseSampleBundle', ['response', 'sample'])
|
||||||
|
|
||||||
|
|
||||||
def generate_test_password():
|
def generate_test_password():
|
||||||
|
@ -34,44 +34,35 @@ def generate_random_suffix():
|
||||||
return ''.join(random.choices(RANDOM_SUFFIX_CHARACTERS, k=10))
|
return ''.join(random.choices(RANDOM_SUFFIX_CHARACTERS, k=10))
|
||||||
|
|
||||||
|
|
||||||
def generate_test_user(user_prefix='', role=user_models.UserRole.subscriber.name) -> TestUserInfo:
|
def generate_test_user(db, user_prefix='test'):
|
||||||
"""
|
"""
|
||||||
Creates a new test user.
|
Creates a new test user.
|
||||||
|
|
||||||
:param user_prefix: The prefix for the username of the test user.
|
:param user_prefix: The prefix for the username of the test user.
|
||||||
:param role: The role of the user. Defaults to subscriber.
|
|
||||||
:return: A tuple with information about the test user.
|
:return: A tuple with information about the test user.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
username = f'{user_prefix}-{generate_random_suffix()}' if user_prefix else f'test-{role}-{generate_random_suffix()}'
|
username = f'{user_prefix}-{generate_random_suffix()}'
|
||||||
email = f'{username}@test.rookeries.org'
|
email = f'{username}@test.rookeries.org'
|
||||||
password = generate_test_password()
|
password = generate_test_password()
|
||||||
return TestUserInfo(username=username, password=password, role=role, email=email)
|
user = user_models.User(
|
||||||
|
username=username,
|
||||||
|
password=password,
|
||||||
|
# TODO: Come up with better random names for tests
|
||||||
|
fullName=f'{generate_random_suffix()} {generate_random_suffix()}',
|
||||||
|
profile={
|
||||||
|
'email': email
|
||||||
|
}
|
||||||
|
)
|
||||||
|
db.create_document(doc=user.to_db_json())
|
||||||
|
user_in_db = user_models.User(
|
||||||
|
**db.find_first_document({
|
||||||
|
'type': user_models.User.doc_type(),
|
||||||
|
'username': username,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
return user_in_db, password
|
||||||
def save_test_user_in_db(db_engine, test_user: TestUserInfo):
|
|
||||||
"""
|
|
||||||
Saves a given test user into the database.
|
|
||||||
|
|
||||||
:param db_engine: The DB engine to spawn a session from.
|
|
||||||
:param test_user: The test user to save.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# TODO: Deal with the issue of test users.
|
|
||||||
# session = orm.sessionmaker(bind=db_engine, autocommit=True)()
|
|
||||||
# session.begin()
|
|
||||||
user = user_models.User(test_user.username)
|
|
||||||
user.password = hash_password(test_user.password)
|
|
||||||
user.role = test_user.role
|
|
||||||
|
|
||||||
user_profile = user_models.UserProfile()
|
|
||||||
user_profile.full_name = f'Test {test_user.role.capitalize()}'
|
|
||||||
user_profile.email = test_user.email
|
|
||||||
user.profile = user_profile
|
|
||||||
|
|
||||||
# session.add(user)
|
|
||||||
# session.commit()
|
|
||||||
# session.close()
|
|
||||||
|
|
||||||
|
|
||||||
def generate_test_page(db, slug='', title='', content=''):
|
def generate_test_page(db, slug='', title='', content=''):
|
||||||
|
|
Loading…
Reference in New Issue