# app/blueprints/auth/routes.py (COMPLETE IMPLEMENTATION) from flask import request, jsonify, current_app from flask_jwt_extended import jwt_required, get_jwt_identity, create_access_token from datetime import datetime, date, timedelta from sqlalchemy.exc import IntegrityError from . import auth_bp from ...models import User from ...extensions import db from ...utils.validators import validate_abn, validate_phone_number, validate_australian_business_number from ...utils.gamification import award_points @auth_bp.route('/register', methods=['POST']) def register(): """Complete user registration with Australian compliance validation""" try: data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 # Required fields validation required_fields = { 'email': 'Email address', 'password': 'Password', 'first_name': 'First name', 'last_name': 'Last name', 'role': 'Role (worker or contractor)', 'phone_number': 'Phone number', 'location': 'Location', 'abn_number': 'ABN (Australian Business Number)' } validation_errors = [] for field, description in required_fields.items(): if not data.get(field) or str(data.get(field)).strip() == '': validation_errors.append(f"{description} is required") if validation_errors: return jsonify({"error": "Validation failed", "details": validation_errors}), 400 # Email format validation email = data['email'].strip().lower() if '@' not in email or '.' not in email: return jsonify({"error": "Invalid email format"}), 400 # Password strength validation password = data['password'] if len(password) < 6: return jsonify({"error": "Password must be at least 6 characters long"}), 400 # Role validation role = data['role'].lower() if role not in ['worker', 'contractor']: return jsonify({"error": "Role must be 'worker' or 'contractor'"}), 400 # Phone number validation phone = data['phone_number'].strip() if not validate_phone_number(phone): return jsonify({"error": "Invalid Australian phone number format (e.g., 0412345678)"}), 400 # ABN validation using official Australian method abn = data['abn_number'].strip().replace(' ', '') if not validate_australian_business_number(abn): return jsonify({"error": "Invalid ABN. Please provide a valid 11-digit Australian Business Number"}), 400 # Check for existing email existing_email = User.query.filter_by(email=email).first() if existing_email: return jsonify({"error": "Email address already registered"}), 400 # Check for existing ABN existing_abn = User.query.filter_by(abn_number=abn).first() if existing_abn: return jsonify({"error": "ABN already registered with another account"}), 400 # Business name validation for contractors business_name = data.get('business_name', '').strip() if role == 'contractor' and not business_name: return jsonify({"error": "Business name is required for contractors"}), 400 # Primary trade validation for workers primary_trade = data.get('primary_trade', '').strip() if role == 'worker' and not primary_trade: return jsonify({"error": "Primary trade is required for workers"}), 400 # Create new user user = User( email=email, first_name=data['first_name'].strip(), last_name=data['last_name'].strip(), role=role, phone_number=phone, location=data['location'].strip(), abn_number=abn, business_name=business_name if business_name else None, primary_trade=primary_trade if primary_trade else None, gst_registered=data.get('gst_registered', False), public_liability_insurance=data.get('public_liability_insurance', False), public_liability_amount=data.get('public_liability_amount'), workers_comp_insurance=data.get('workers_comp_insurance', False), privacy_consent=data.get('privacy_consent', False), terms_accepted=data.get('terms_accepted', False), terms_accepted_date=datetime.utcnow() if data.get('terms_accepted') else None ) # Set password hash user.set_password(password) # Insurance expiry date handling if data.get('insurance_expiry_date'): try: user.insurance_expiry_date = datetime.strptime(data['insurance_expiry_date'], '%Y-%m-%d').date() except ValueError: return jsonify({"error": "Invalid insurance expiry date format (use YYYY-MM-DD)"}), 400 # Save to database db.session.add(user) db.session.commit() # Award welcome points for gamification if current_app.config['FEATURES'].get('gamification_leaderboards'): award_points(user.id, 'registration', 50, 'Welcome to RateRight!') db.session.commit() # Create access token access_token = create_access_token(identity=user.id) # Check compliance status is_compliant, compliance_issues = user.is_compliance_valid() return jsonify({ "message": "Registration successful! Welcome to RateRight Australian Construction Marketplace", "access_token": access_token, "user": { "id": user.id, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "role": user.role, "business_name": user.business_name, "primary_trade": user.primary_trade, "abn_number": user.abn_number, "gst_registered": user.gst_registered, "total_points": user.total_points, "current_level": user.current_level, "compliance": { "is_compliant": is_compliant, "issues": compliance_issues[:3] if compliance_issues else [] # Limit to first 3 issues } } }), 201 except IntegrityError as e: db.session.rollback() if 'email' in str(e): return jsonify({"error": "Email address already registered"}), 400 elif 'abn_number' in str(e): return jsonify({"error": "ABN already registered"}), 400 else: return jsonify({"error": "Registration failed due to duplicate data"}), 400 except Exception as e: db.session.rollback() current_app.logger.error(f"Registration error: {str(e)}") return jsonify({"error": "Registration failed. Please try again."}), 500 @auth_bp.route('/login', methods=['POST']) def login(): """User login with comprehensive validation""" try: data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 # Validation email = data.get('email', '').strip().lower() password = data.get('password', '') if not email or not password: return jsonify({"error": "Email and password are required"}), 400 # Find user user = User.query.filter_by(email=email).first() if not user: return jsonify({"error": "Invalid email or password"}), 401 # Check password if not user.check_password(password): return jsonify({"error": "Invalid email or password"}), 401 # Check if account is active if not user.is_active: return jsonify({"error": "Account is deactivated. Please contact support."}), 401 # Update last login user.last_login = datetime.utcnow() db.session.commit() # Create access token access_token = create_access_token(identity=user.id) # Check compliance status is_compliant, compliance_issues = user.is_compliance_valid() # Get user statistics user_stats = { "jobs_completed": user.jobs_completed, "average_rating": float(user.average_rating) if user.average_rating else 0.0, "total_reviews": user.total_reviews, "response_rate": float(user.response_rate) if user.response_rate else 0.0 } # Add gamification data if enabled gamification_data = {} if current_app.config['FEATURES'].get('gamification_leaderboards'): gamification_data = { "total_points": user.total_points, "current_level": user.current_level, "seasonal_league": user.seasonal_league, "points_to_next_level": user.points_to_next_level() } return jsonify({ "message": "Login successful", "access_token": access_token, "user": { "id": user.id, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "role": user.role, "business_name": user.business_name, "primary_trade": user.primary_trade, "location": user.location, "phone_number": user.phone_number, "abn_number": user.abn_number, "gst_registered": user.gst_registered, "account_status": user.account_status, "last_login": user.last_login.isoformat() if user.last_login else None, "date_created": user.date_created.isoformat() if user.date_created else None, "stats": user_stats, "gamification": gamification_data, "compliance": { "is_compliant": is_compliant, "issues_count": len(compliance_issues), "critical_issues": compliance_issues[:2] if compliance_issues else [] } } }), 200 except Exception as e: current_app.logger.error(f"Login error: {str(e)}") return jsonify({"error": "Login failed. Please try again."}), 500 @auth_bp.route('/profile', methods=['GET']) @jwt_required() def get_profile(): """Get comprehensive user profile""" try: current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({"error": "User not found"}), 404 # Check compliance is_compliant, compliance_issues = user.is_compliance_valid() # Insurance status insurance_status = { "public_liability": { "covered": user.public_liability_insurance, "amount": float(user.public_liability_amount) if user.public_liability_amount else None, "expiry_date": user.insurance_expiry_date.isoformat() if user.insurance_expiry_date else None, "expired": user.insurance_expiry_date < date.today() if user.insurance_expiry_date else None }, "workers_comp": user.workers_comp_insurance } # Profile data profile_data = { "id": user.id, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "role": user.role, "phone_number": user.phone_number, "location": user.location, "business_name": user.business_name, "primary_trade": user.primary_trade, "abn_number": user.abn_number, "gst_registered": user.gst_registered, "account_status": user.account_status, "is_active": user.is_active, "date_created": user.date_created.isoformat() if user.date_created else None, "last_login": user.last_login.isoformat() if user.last_login else None, "stats": { "jobs_completed": user.jobs_completed, "average_rating": float(user.average_rating) if user.average_rating else 0.0, "total_reviews": user.total_reviews, "response_rate": float(user.response_rate) if user.response_rate else 0.0 }, "insurance": insurance_status, "compliance": { "is_compliant": is_compliant, "issues": compliance_issues, "privacy_consent": user.privacy_consent, "terms_accepted": user.terms_accepted, "terms_accepted_date": user.terms_accepted_date.isoformat() if user.terms_accepted_date else None } } # Add gamification data if enabled if current_app.config['FEATURES'].get('gamification_leaderboards'): profile_data["gamification"] = { "total_points": user.total_points, "current_level": user.current_level, "seasonal_league": user.seasonal_league, "points_to_next_level": user.points_to_next_level(), "level_progress_percentage": min(100, ((user.total_points % 500) / 500) * 100) } return jsonify({"user": profile_data}), 200 except Exception as e: current_app.logger.error(f"Profile fetch error: {str(e)}") return jsonify({"error": "Failed to fetch profile"}), 500 @auth_bp.route('/profile', methods=['PUT']) @jwt_required() def update_profile(): """Update user profile with validation""" try: current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({"error": "User not found"}), 404 data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 # Track what was updated updated_fields = [] # Updatable fields with validation if 'first_name' in data: if data['first_name'].strip(): user.first_name = data['first_name'].strip() updated_fields.append('first_name') else: return jsonify({"error": "First name cannot be empty"}), 400 if 'last_name' in data: if data['last_name'].strip(): user.last_name = data['last_name'].strip() updated_fields.append('last_name') else: return jsonify({"error": "Last name cannot be empty"}), 400 if 'phone_number' in data: phone = data['phone_number'].strip() if validate_phone_number(phone): user.phone_number = phone updated_fields.append('phone_number') else: return jsonify({"error": "Invalid phone number format"}), 400 if 'location' in data: if data['location'].strip(): user.location = data['location'].strip() updated_fields.append('location') else: return jsonify({"error": "Location cannot be empty"}), 400 if 'business_name' in data: user.business_name = data['business_name'].strip() if data['business_name'] else None updated_fields.append('business_name') if 'primary_trade' in data: user.primary_trade = data['primary_trade'].strip() if data['primary_trade'] else None updated_fields.append('primary_trade') if 'gst_registered' in data: user.gst_registered = bool(data['gst_registered']) updated_fields.append('gst_registered') # Insurance updates if 'public_liability_insurance' in data: user.public_liability_insurance = bool(data['public_liability_insurance']) updated_fields.append('public_liability_insurance') if 'public_liability_amount' in data: amount = data['public_liability_amount'] if amount is not None: try: user.public_liability_amount = float(amount) updated_fields.append('public_liability_amount') except (ValueError, TypeError): return jsonify({"error": "Invalid insurance amount"}), 400 else: user.public_liability_amount = None updated_fields.append('public_liability_amount') if 'workers_comp_insurance' in data: user.workers_comp_insurance = bool(data['workers_comp_insurance']) updated_fields.append('workers_comp_insurance') if 'insurance_expiry_date' in data: if data['insurance_expiry_date']: try: user.insurance_expiry_date = datetime.strptime(data['insurance_expiry_date'], '%Y-%m-%d').date() updated_fields.append('insurance_expiry_date') except ValueError: return jsonify({"error": "Invalid date format (use YYYY-MM-DD)"}), 400 else: user.insurance_expiry_date = None updated_fields.append('insurance_expiry_date') # White card information if 'white_card_number' in data: user.white_card_number = data['white_card_number'].strip() if data['white_card_number'] else None updated_fields.append('white_card_number') if 'white_card_expiry' in data: if data['white_card_expiry']: try: user.white_card_expiry = datetime.strptime(data['white_card_expiry'], '%Y-%m-%d').date() updated_fields.append('white_card_expiry') except ValueError: return jsonify({"error": "Invalid white card expiry date format (use YYYY-MM-DD)"}), 400 else: user.white_card_expiry = None updated_fields.append('white_card_expiry') # Privacy and terms if 'privacy_consent' in data: user.privacy_consent = bool(data['privacy_consent']) updated_fields.append('privacy_consent') if 'terms_accepted' in data: if bool(data['terms_accepted']) and not user.terms_accepted: user.terms_accepted = True user.terms_accepted_date = datetime.utcnow() updated_fields.append('terms_accepted') # Save changes if updated_fields: db.session.commit() # Award points for profile completion if current_app.config['FEATURES'].get('gamification_leaderboards'): completion_bonus = len(updated_fields) * 5 # 5 points per field updated award_points(user.id, 'profile_update', completion_bonus, f"Profile updated: {', '.join(updated_fields)}") db.session.commit() return jsonify({ "message": "Profile updated successfully", "updated_fields": updated_fields, "points_awarded": len(updated_fields) * 5 if current_app.config['FEATURES'].get('gamification_leaderboards') else 0 }), 200 else: return jsonify({"message": "No changes made"}), 200 except Exception as e: db.session.rollback() current_app.logger.error(f"Profile update error: {str(e)}") return jsonify({"error": "Profile update failed"}), 500 @auth_bp.route('/change-password', methods=['PUT']) @jwt_required() def change_password(): """Change user password""" try: current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({"error": "User not found"}), 404 data = request.get_json() if not data: return jsonify({"error": "No data provided"}), 400 current_password = data.get('current_password') new_password = data.get('new_password') if not current_password or not new_password: return jsonify({"error": "Current password and new password are required"}), 400 # Verify current password if not user.check_password(current_password): return jsonify({"error": "Current password is incorrect"}), 400 # Validate new password if len(new_password) < 6: return jsonify({"error": "New password must be at least 6 characters long"}), 400 if new_password == current_password: return jsonify({"error": "New password must be different from current password"}), 400 # Update password user.set_password(new_password) db.session.commit() return jsonify({"message": "Password changed successfully"}), 200 except Exception as e: db.session.rollback() current_app.logger.error(f"Password change error: {str(e)}") return jsonify({"error": "Password change failed"}), 500 @auth_bp.route('/deactivate', methods=['PUT']) @jwt_required() def deactivate_account(): """Deactivate user account""" try: current_user_id = get_jwt_identity() user = User.query.get(current_user_id) if not user: return jsonify({"error": "User not found"}), 404 data = request.get_json() password = data.get('password') if data else None if not password: return jsonify({"error": "Password confirmation required"}), 400 # Verify password if not user.check_password(password): return jsonify({"error": "Incorrect password"}), 400 # Deactivate account user.is_active = False user.account_status = 'deactivated' db.session.commit() return jsonify({"message": "Account deactivated successfully"}), 200 except Exception as e: db.session.rollback() current_app.logger.error(f"Account deactivation error: {str(e)}") return jsonify({"error": "Account deactivation failed"}), 500 # test_auth_complete.py (create this file) """Test complete authentication system""" def test_complete_auth(): """Test all authentication endpoints""" print("šŸ” Testing Complete Authentication System") print("=" * 50) try: from app import create_app from app.extensions import db import json app = create_app() client = app.test_client() with app.app_context(): db.create_all() # Test user registration print("1. Testing User Registration...") registration_data = { "email": "testworker@rateright.com.au", "password": "securepass123", "first_name": "Test", "last_name": "Worker", "role": "worker", "phone_number": "0412345678", "location": "Sydney, NSW", "abn_number": "53004085616", # Valid ABN format "primary_trade": "Electrician", "gst_registered": True, "public_liability_insurance": True, "public_liability_amount": 2000000, "privacy_consent": True, "terms_accepted": True } response = client.post('/api/auth/register', json=registration_data, content_type='application/json') if response.status_code == 201: print("āœ… Registration successful") registration_result = response.get_json() access_token = registration_result['access_token'] user_id = registration_result['user']['id'] print(f" User ID: {user_id}") print(f" Points: {registration_result['user']['total_points']}") else: print(f"āŒ Registration failed: {response.status_code}") print(f" Error: {response.get_json()}") return False # Test user login print("\n2. Testing User Login...") login_data = { "email": "testworker@rateright.com.au", "password": "securepass123" } response = client.post('/api/auth/login', json=login_data, content_type='application/json') if response.status_code == 200: print("āœ… Login successful") login_result = response.get_json() access_token = login_result['access_token'] print(f" JWT Token: {access_token[:50]}...") print(f" User Role: {login_result['user']['role']}") print(f" Compliance: {login_result['user']['compliance']['is_compliant']}") else: print(f"āŒ Login failed: {response.status_code}") return False # Test profile retrieval print("\n3. Testing Profile Retrieval...") headers = {'Authorization': f'Bearer {access_token}'} response = client.get('/api/auth/profile', headers=headers) if response.status_code == 200: print("āœ… Profile retrieval successful") profile = response.get_json()['user'] print(f" Business: {profile.get('business_name', 'N/A')}") print(f" Trade: {profile.get('primary_trade')}") print(f" Insurance: ${profile['insurance']['public_liability']['amount']:,}") print(f" Gamification Level: {profile.get('gamification', {}).get('current_level', 'N/A')}") else: print(f"āŒ Profile retrieval failed: {response.status_code}") return False # Test profile update print("\n4. Testing Profile Update...") update_data = { "business_name": "Updated Electrical Services Pty Ltd", "public_liability_amount": 5000000, "white_card_number": "WC123456789" } response = client.put('/api/auth/profile', json=update_data, headers=headers, content_type='application/json') if response.status_code == 200: print("āœ… Profile update successful") update_result = response.get_json() print(f" Updated fields: {update_result['updated_fields']}") print(f" Points awarded: {update_result['points_awarded']}") else: print(f"āŒ Profile update failed: {response.status_code}") return False # Test validation errors print("\n5. Testing Validation...") # Invalid email registration invalid_data = registration_data.copy() invalid_data['email'] = 'invalid-email' response = client.post('/api/auth/register', json=invalid_data) if response.status_code == 400: print("āœ… Email validation working") else: print("āŒ Email validation failed") # Invalid ABN registration invalid_data['email'] = 'newemail@test.com' invalid_data['abn_number'] = '123' # Too short response = client.post('/api/auth/register', json=invalid_data) if response.status_code == 400: print("āœ… ABN validation working") else: print("āŒ ABN validation failed") # Invalid login response = client.post('/api/auth/login', json={"email": "wrong@email.com", "password": "wrongpass"}) if response.status_code == 401: print("āœ… Login security working") else: print("āŒ Login security failed") print("\nšŸŽ‰ AUTHENTICATION SYSTEM: FULLY FUNCTIONAL!") print("=" * 50) print("āœ… User Registration with Australian compliance") print("āœ… JWT Authentication system") print("āœ… Profile management") print("āœ… Input validation and security") print("āœ… Gamification integration") print("āœ… Insurance and ABN verification") return True except Exception as e: print(f"āŒ Authentication test failed: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": test_complete_auth()