from flask import render_template, redirect, request, session, jsonify, flash, url_for, current_app
from flask_login import login_required, current_user, logout_user, login_user
from datetime import datetime, date
import logging
from app.models import Category
from app.services.notification_service import notification_service
from app.models.notification import NotificationType
logger = logging.getLogger(__name__)
def register_main_routes(app):
"""Register main web application routes"""
@app.route("/workers")
def workers():
"""List workers page"""
from app.models.user import User
workers = User.query.filter_by(role="worker").all()
return render_template("workers/list.html", workers=workers)
@app.route("/api/workers")
def api_workers():
"""API endpoint to get workers list"""
from app.models.user import User
workers = User.query.filter_by(role="worker").all()
workers_data = [
{
"id": w.id,
"email": w.email,
"first_name": w.first_name,
"last_name": w.last_name,
"created_at": w.created_at.isoformat() if w.created_at else None
}
for w in workers
]
return jsonify(workers_data)
@app.before_request
def debug_authentication_state():
"""Debug any automatic authentication that might be happening"""
from flask_login import current_user
from flask import request, session
# Only log for web routes (not API routes)
if not request.path.startswith('/api/'):
print(f"[?] Route: {request.path}")
print(f"[?] current_user.is_authenticated: {current_user.is_authenticated}")
print(f"[?] session keys: {list(session.keys())}")
if current_user.is_authenticated:
print(f"[?] User automatically authenticated: {current_user.email}")
@app.route('/')
def index():
# Serve the complete marketing page directly
with open('app/templates/index.html', 'r', encoding='utf-8') as f:
return f.read()
@app.route('/test')
def test():
"""Simple test route"""
return "
[?] RateRight Test Route Working!
If you see this, the Flask server is responding correctly.
"
@app.route('/applications')
@login_required
def my_applications():
"""View user's job applications or received applications"""
from app.models import Application
from app.extensions import db
if current_user.role == 'worker':
applications = Application.query.filter_by(worker_id=current_user.id).order_by(Application.id.desc()).all()
return render_template('applications/list.html', applications=applications)
elif current_user.role == 'contractor':
# Get all applications for contractor's jobs
from app.models import Job
applications = db.session.query(Application).join(Job).filter(
Job.contractor_id == current_user.id
).order_by(Application.id.desc()).all()
return render_template('applications/contractor_list.html', applications=applications)
else:
flash('Access denied.', 'error')
return redirect(url_for('dashboard'))
@app.route('/jobs')
def browse_jobs():
"""Browse available construction jobs"""
if current_user.is_authenticated and current_user.role == 'contractor':
flash('As a contractor, you can post jobs and review applications. Workers browse and apply for jobs.', 'info')
return render_template('jobs/browse.html')
@app.route('/search')
def search():
"""Search for jobs"""
# Jobs route passes no variables, so we do the same
return render_template('jobs/browse.html')
@app.route('/api/capture-lead', methods=['POST'])
def capture_lead():
"""Handle lead capture - direct to Slack only, no database"""
from app.forms import LeadCaptureForm
from app.utils.slack import send_lead_to_slack
# Debug: Print what we received
print(f"[DEBUG] Form data received: {request.form}")
# Create form WITHOUT CSRF
class LeadCaptureFormNoCSRF(LeadCaptureForm):
class Meta:
csrf = False
form = LeadCaptureFormNoCSRF()
if form.validate_on_submit():
# Prepare lead data
lead_data = {
'email': form.email.data,
'is_beta_tester': form.is_beta_tester.data,
'wants_updates': form.wants_updates.data,
'company_size': form.company_size.data,
'biggest_challenge': form.biggest_challenge.data,
'construction_focus': form.construction_focus.data
}
# Get request info
request_info = {
'ip': request.remote_addr,
'referrer': request.referrer or 'Direct',
'user_agent': request.headers.get('User-Agent', 'Unknown')
}
# Send directly to Slack (no database)
success = send_lead_to_slack(lead_data, request_info)
# Return appropriate message
if lead_data['is_beta_tester']:
message = "Awesome! You're on the beta list. We'll reach out soon with early access."
else:
message = "Great! We'll notify you when RateRight launches."
return jsonify({
'success': True,
'message': message,
'is_beta': lead_data['is_beta_tester']
})
# Form validation failed - show what went wrong
print(f"[DEBUG] Form errors: {form.errors}")
return jsonify({
'success': False,
'errors': form.errors,
'message': 'Please check all required fields'
}), 400
@app.route('/jobs/post', methods=['GET', 'POST'])
def post_job():
"""Post a new construction job (requires login)"""
from flask_login import current_user
from app.models import Job, Category
from app.extensions import db
from decimal import Decimal
if not current_user.is_authenticated:
flash('Please login to post a job', 'warning')
# Save the intended destination for after login
session['next_url'] = url_for('post_job')
return redirect(url_for('login'))
if current_user.role != 'contractor':
flash('Only contractors can post jobs', 'error')
return redirect(url_for('dashboard'))
if request.method == 'POST':
try:
# Get form data
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
category_id = request.form.get('category_id', type=int)
location = request.form.get('location', '').strip()
budget_min = request.form.get('budget_min')
budget_max = request.form.get('budget_max')
hourly_rate = request.form.get('hourly_rate')
whs_requirements = request.form.get('whs_requirements', '').strip()
white_card_required = 'white_card_required' in request.form
insurance_required = 'insurance_required' in request.form
# Validation
errors = []
if not title:
errors.append('Job title is required')
if not description:
errors.append('Job description is required')
if not category_id:
errors.append('Job category is required')
if not location:
errors.append('Job location is required')
# Validate category exists
if category_id:
category = Category.query.get(category_id)
if not category or not category.is_active:
errors.append('Invalid job category selected')
# Convert budget/rate to Decimal if provided
budget_min_decimal = None
budget_max_decimal = None
hourly_rate_decimal = None
try:
if budget_min:
budget_min_decimal = Decimal(str(budget_min))
if budget_min_decimal < 0:
errors.append('Minimum budget cannot be negative')
except (ValueError, TypeError):
errors.append('Invalid minimum budget amount')
try:
if budget_max:
budget_max_decimal = Decimal(str(budget_max))
if budget_max_decimal < 0:
errors.append('Maximum budget cannot be negative')
except (ValueError, TypeError):
errors.append('Invalid maximum budget amount')
try:
if hourly_rate:
hourly_rate_decimal = Decimal(str(hourly_rate))
if hourly_rate_decimal < 0:
errors.append('Hourly rate cannot be negative')
except (ValueError, TypeError):
errors.append('Invalid hourly rate amount')
# Validate budget range
if budget_min_decimal and budget_max_decimal and budget_min_decimal > budget_max_decimal:
errors.append('Minimum budget cannot be greater than maximum budget')
if errors:
for error in errors:
flash(error, 'error')
categories = Category.query.filter_by(is_active=True).order_by(Category.sort_order, Category.name).all()
return render_template('jobs/post.html', categories=categories)
# Create new job
job = Job(
title=title,
description=description,
contractor_id=current_user.id,
category_id=category_id,
location=location,
budget_min=budget_min_decimal,
budget_max=budget_max_decimal,
hourly_rate=hourly_rate_decimal,
whs_requirements=whs_requirements or f"Category: {category.name}. Risk Level: {category.whs_risk_level}.",
white_card_required=white_card_required,
insurance_required=insurance_required,
status='open'
)
db.session.add(job)
db.session.commit()
# NOTIFICATION TRIGGER: Job posting notification
try:
# Get nearby workers for this job category and location
from app.models.user import User
nearby_workers = User.query.filter(
User.role == 'worker',
User.is_active == True,
User.location.ilike(f'%{location}%') # Simple location matching
).all()
# Send notification to relevant workers
for worker in nearby_workers:
notification_service.send_notification(
user_id=worker.id,
notification_type=NotificationType.JOB_MATCH,
title="New Job Posted",
content=f"New {category.name} job: {title} in {location}",
action_url=url_for('job_details', job_id=job.id, _external=False)
)
print(f"[NOTIFICATION] Job posting notifications sent to {len(nearby_workers)} workers")
except Exception as notification_error:
# Don't fail job posting if notification fails
print(f"[NOTIFICATION ERROR] Failed to send job posting notifications: {notification_error}")
flash('Job posted successfully!', 'success')
return redirect(url_for('job_details', job_id=job.id))
except Exception as e:
db.session.rollback()
flash(f'Error posting job: {str(e)}', 'error')
categories = Category.query.filter_by(is_active=True).order_by(Category.sort_order, Category.name).all()
return render_template('jobs/post.html', categories=categories)
# GET request - show the form
categories = Category.query.filter_by(is_active=True).order_by(Category.sort_order, Category.name).all()
return render_template('jobs/post.html', categories=categories)
@app.route('/jobs/')
def job_details(job_id):
"""View specific job details"""
from app.models import Job, Application
job = Job.query.get_or_404(job_id) # Retrieve job from database, or return 404
# Check if current user has already applied to this job
user_applied = False
if current_user.is_authenticated and current_user.role == 'worker':
existing_app = Application.query.filter_by(
job_id=job_id,
worker_id=current_user.id
).first()
user_applied = existing_app is not None
return render_template('jobs/details.html', job=job, user_applied=user_applied)
@app.route('/jobs//apply', methods=['POST'])
@login_required
def apply_to_job(job_id):
"""Apply to a job"""
if request.method == 'POST':
from app.models import Application
from app.extensions import db
from app.models import Job # Import Job model
if current_user.role != 'worker':
flash('Only workers can apply to jobs.', 'error')
return redirect(url_for('job_details', job_id=job_id))
job = Job.query.get_or_404(job_id)
if job.status != 'open':
flash('This job is no longer accepting applications.', 'error')
return redirect(url_for('job_details', job_id=job_id))
# Check if already applied
existing_app = Application.query.filter_by(
job_id=job_id,
worker_id=current_user.id
).first()
if existing_app:
flash('You have already applied to this job.', 'warning')
return redirect(url_for('job_details', job_id=job_id))
# Get form data
proposed_rate = request.form.get('proposed_rate')
cover_letter = request.form.get('cover_letter', '').strip()
# Validate proposed rate
proposed_rate_decimal = None
if proposed_rate:
try:
proposed_rate_decimal = float(proposed_rate)
if proposed_rate_decimal < 0:
flash('Proposed rate cannot be negative.', 'error')
return redirect(url_for('job_details', job_id=job_id))
except (ValueError, TypeError):
flash('Invalid proposed rate.', 'error')
return redirect(url_for('job_details', job_id=job_id))
# Create application
application = Application(
job_id=job_id,
worker_id=current_user.id,
proposed_rate=proposed_rate_decimal,
cover_letter=cover_letter,
abn_verified=True, # Auto-verify for now
insurance_verified=current_user.public_liability_insurance,
status='pending'
)
try:
db.session.add(application)
# Update job applications count
job.applications_count = (job.applications_count or 0) + 1
db.session.commit()
# NOTIFICATION TRIGGER: Application notification
try:
# Send notification to job poster (contractor)
notification_service.send_notification(
user_id=job.contractor_id,
notification_type=NotificationType.JOB_APPLICATION_UPDATE,
title="New Application",
content=f"{current_user.first_name} {current_user.last_name} applied to '{job.title}'",
action_url=url_for('job_details', job_id=job.id, _external=False)
)
print(f"[NOTIFICATION] Application notification sent to contractor {job.contractor_id}")
except Exception as notification_error:
# Don't fail application submission if notification fails
print(f"[NOTIFICATION ERROR] Failed to send application notification: {notification_error}")
flash('Application submitted successfully!', 'success')
return redirect(url_for('dashboard'))
except Exception as e:
db.session.rollback()
flash(f'Error submitting application: {str(e)}', 'error')
print(f"Database error: {e}") # For debugging
return redirect(url_for('job_details', job_id=job_id))
@app.route('/login', methods=['GET', 'POST'])
def login():
"""User login page and web form handler"""
from app.models import User
from app.extensions import db
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
if request.method == 'POST':
try:
# Handle web form login (creates Flask-Login session)
email = request.form.get('email')
password = request.form.get('password')
if not email or not password:
flash('Email and password are required', 'error')
return render_template('auth/login.html')
# Add debugging
print(f"Login attempt for email: {email}")
# Check database connection
try:
from sqlalchemy import text
db.session.execute(text('SELECT 1'))
print("✅ Database connection OK")
except Exception as db_error:
print(f"⌠Database connection error: {db_error}")
flash('Database connection error. Please try again later.', 'error')
return render_template('auth/login.html')
# Query user with error handling
try:
user = User.query.filter_by(email=email).first()
print(f"User found: {user is not None}")
except Exception as query_error:
print(f"⌠User query error: {query_error}")
flash('Error retrieving user information. Please try again.', 'error')
return render_template('auth/login.html')
if user and user.check_password(password) and user.is_active:
# Create Flask-Login session
login_user(user, remember=request.form.get('remember'))
flash('Login successful!', 'success')
# Check for saved redirect URL (job posting fix)
next_url = session.pop('next_url', None)
if next_url:
return redirect(next_url)
return redirect(url_for('dashboard'))
else:
flash('Invalid email or password', 'error')
except Exception as e:
print(f"⌠Login error: {str(e)}")
import traceback
print(traceback.format_exc())
flash('An error occurred during login. Please try again.', 'error')
return render_template('auth/login.html')
return render_template('auth/login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
"""User registration page and handler"""
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
if request.method == 'POST':
from app.models.user import User
from app.extensions import db
# Get form data
email = request.form.get('email', '').strip().lower()
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
role = request.form.get('role')
phone_number = request.form.get('phone_number', '').strip()
location = request.form.get('location', '').strip()
abn_number = request.form.get('abn_number', '').strip()
# Validation
errors = []
if not email:
errors.append('Email is required')
elif User.query.filter_by(email=email).first():
errors.append('Email already registered')
if not password or len(password) < 6:
errors.append('Password must be at least 6 characters')
elif password != confirm_password:
errors.append('Passwords do not match')
if not first_name:
errors.append('First name is required')
if not last_name:
errors.append('Last name is required')
if role not in ['contractor', 'worker']:
errors.append('Please select a valid role')
if not phone_number:
errors.append('Phone number is required')
if not location:
errors.append('Location is required')
if not abn_number or len(abn_number) != 11:
errors.append('Valid 11-digit ABN is required')
if errors:
for error in errors:
flash(error, 'error')
return render_template('auth/register.html')
try:
# Create new user
# Generate username from email (before @ symbol)
username = email.split('@')[0]
user = User(
username=username, # Auto-generate username from email
email=email,
first_name=first_name,
last_name=last_name,
role=role,
phone_number=phone_number,
location=location,
abn_number=abn_number,
privacy_consent=True,
terms_accepted=True,
terms_accepted_date=datetime.utcnow()
)
user.set_password(password)
db.session.add(user)
db.session.commit()
login_user(user)
flash('Registration successful! Welcome to RateRight!', 'success')
return redirect(url_for('dashboard'))
except Exception as e:
db.session.rollback()
flash(f'Registration failed: {str(e)}', 'error')
return render_template('auth/register.html')
return render_template('auth/register.html')
@app.route('/logout')
def logout():
"""Logout user and completely clear all sessions"""
from flask import make_response
logout_user() # Clear Flask-Login session
session.clear() # Clear Flask session
# Create response that clears everything
response = make_response('''
Logging out...
Logging out...
''')
# Clear all cookies by setting them to expire
response.set_cookie('session', '', expires=0)
response.set_cookie('remember_token', '', expires=0)
return response
@app.route('/dashboard')
def dashboard():
"""User dashboard - redirect based on role"""
# Check if user is logged in via Flask-Login
if not current_user.is_authenticated:
return redirect(url_for('login'))
# Redirect based on user role
return redirect(url_for(f'dashboard_{current_user.role}'))
@app.route('/dashboard/contractor')
def dashboard_contractor():
"""Contractor dashboard"""
import traceback
try:
from app.models import Job, Application, Contract
from app.extensions import db
if not current_user.is_authenticated or current_user.role != 'contractor':
return redirect(url_for('login'))
# Initialize variables with defaults
posted_jobs = []
recent_applications = []
pending_applications = []
active_contracts = []
pending_payouts = 0
total_applications = 0
# Fetch contractor's posted jobs with error handling
try:
posted_jobs = Job.query.filter_by(contractor_id=current_user.id).order_by(Job.date_posted.desc()).all()
print(f"✅ Fetched {len(posted_jobs)} posted jobs")
except Exception as e:
print(f"⌠Error fetching posted jobs: {e}")
posted_jobs = []
# Get all applications for contractor's jobs with error handling
try:
recent_applications = db.session.query(Application).join(Job).filter(
Job.contractor_id == current_user.id
).order_by(Application.date_applied.desc()).all()
print(f"✅ Fetched {len(recent_applications)} recent applications")
except Exception as e:
print(f"⌠Error fetching recent applications: {e}")
recent_applications = []
# Get pending applications specifically with error handling
try:
pending_applications = db.session.query(Application).join(Job).filter(
Job.contractor_id == current_user.id,
Application.status == 'pending'
).all()
print(f"✅ Fetched {len(pending_applications)} pending applications")
except Exception as e:
print(f"⌠Error fetching pending applications: {e}")
pending_applications = []
# Get contractor's contracts with error handling
try:
active_contracts = Contract.query.filter(
Contract.contractor_id == current_user.id,
Contract.status.in_(['pending_agreement', 'contractor_signed', 'worker_signed', 'active', 'pending_review', 'pending_rating'])
).order_by(Contract.created_at.desc()).all()
print(f"✅ Fetched {len(active_contracts)} active contracts")
except Exception as e:
print(f"⌠Error fetching active contracts: {e}")
active_contracts = []
# Calculate pending payouts with error handling
try:
pending_payouts = 0
for contract in active_contracts:
if contract.status in ['active', 'pending_review']:
pending_payouts += float(contract.agreed_rate or 0)
print(f"✅ Calculated pending payouts: ${pending_payouts}")
except Exception as e:
print(f"⌠Error calculating pending payouts: {e}")
pending_payouts = 0
total_applications = len(recent_applications)
return render_template('dashboard/contractor.html',
posted_jobs=posted_jobs,
total_applications=total_applications,
recent_applications=recent_applications,
pending_applications=pending_applications,
active_contracts=active_contracts,
pending_payouts=pending_payouts)
except Exception as e:
# Capture full error details
error_details = traceback.format_exc()
print(f"⌠DASHBOARD ERROR: {str(e)}")
print(f"Full traceback:\n{error_details}")
# Return detailed error page in development, generic in production
if app.config.get('DEBUG', False):
return f"""
Dashboard Error
Error: {str(e)}
{error_details}
Debug Info:
- User authenticated: {current_user.is_authenticated if 'current_user' in locals() else 'Unknown'}
- User role: {current_user.role if 'current_user' in locals() and hasattr(current_user, 'role') else 'Unknown'}
- User ID: {current_user.id if 'current_user' in locals() and hasattr(current_user, 'id') else 'Unknown'}
""", 500
else:
return "Internal server error. Please contact support.", 500
@app.route('/dashboard/worker')
def dashboard_worker():
"""Worker dashboard"""
import traceback
try:
from app.models import Application, Contract
if not current_user.is_authenticated or current_user.role != 'worker':
return redirect(url_for('login'))
# Initialize variables with defaults
applications = []
active_contracts = []
# Get worker's applications with error handling
try:
applications = Application.query.filter_by(worker_id=current_user.id).order_by(Application.date_applied.desc()).all()
print(f"✅ Fetched {len(applications)} applications for worker")
except Exception as e:
print(f"⌠Error fetching worker applications: {e}")
applications = []
# Get worker's contracts with error handling
try:
active_contracts = Contract.query.filter(
Contract.worker_id == current_user.id,
Contract.status.in_(['pending_agreement', 'contractor_signed', 'worker_signed', 'active', 'pending_review', 'pending_rating'])
).order_by(Contract.created_at.desc()).all()
print(f"✅ Fetched {len(active_contracts)} active contracts for worker")
except Exception as e:
print(f"⌠Error fetching worker contracts: {e}")
active_contracts = []
return render_template('dashboard/worker.html',
applications=applications,
active_contracts=active_contracts)
except Exception as e:
# Capture full error details
error_details = traceback.format_exc()
print(f"⌠WORKER DASHBOARD ERROR: {str(e)}")
print(f"Full traceback:\n{error_details}")
# Return detailed error page in development, generic in production
if app.config.get('DEBUG', False):
return f"""
Dashboard Error
Error: {str(e)}
{error_details}
Debug Info:
- User authenticated: {current_user.is_authenticated if 'current_user' in locals() else 'Unknown'}
- User role: {current_user.role if 'current_user' in locals() and hasattr(current_user, 'role') else 'Unknown'}
- User ID: {current_user.id if 'current_user' in locals() and hasattr(current_user, 'id') else 'Unknown'}
""", 500
else:
return "Internal server error. Please contact support.", 500
@app.route('/my-jobs')
@login_required
def my_jobs():
"""Display user's jobs"""
try:
from app.models.job import Job
from app.models.contract import Contract
# Get jobs based on user role
if current_user.role == 'contractor':
# Contractor sees jobs they've posted
jobs = Job.query.filter_by(contractor_id=current_user.id).order_by(Job.date_posted.desc()).all()
template = 'jobs/employer_jobs.html'
else:
# Worker sees jobs they're working on
contracts = Contract.query.filter_by(
worker_id=current_user.id
).order_by(Contract.created_at.desc()).all()
jobs = [contract.job for contract in contracts if contract.job]
template = 'jobs/worker_jobs.html'
return render_template(template, jobs=jobs)
except Exception as e:
app.logger.error(f"Error in my_jobs route: {str(e)}")
# Fallback to dashboard if there's an error
flash('Unable to load jobs. Please try again.', 'error')
return redirect(url_for('dashboard'))
@app.route('/earnings')
@login_required
def earnings():
"""Redirect to worker analytics"""
# Check if the analytics route exists first
try:
return redirect(url_for('analytics_worker'))
except:
# If analytics_worker doesn't exist, redirect to payouts
return redirect(url_for('payouts'))
@app.route('/profile')
def profile():
"""User profile page - role-specific templates"""
if not current_user.is_authenticated:
flash('Please login to access your profile', 'warning')
return redirect(url_for('login'))
# Import models needed for both roles
from app.models.contract import Contract
if current_user.role == 'contractor':
from app.models.job import Job
# Contractor-specific metrics
posted_jobs = Job.query.filter_by(contractor_id=current_user.id).count()
active_contracts = Contract.query.filter_by(
contractor_id=current_user.id,
status='active'
).count()
return render_template('auth/contractor_profile.html',
posted_jobs=posted_jobs,
active_contracts=active_contracts)
# Worker profile (default)
completed_jobs = Contract.query.filter_by(
worker_id=current_user.id,
status='completed'
).count()
return render_template('auth/worker_profile.html',
completed_jobs=completed_jobs)
@app.route('/health')
def app_health():
"""Simple health check for the web application"""
return {
"status": "healthy",
"message": "RateRight web application is running",
"timestamp": datetime.utcnow().isoformat()
}
# API info is handled in __init__.py
@app.route('/applications//respond', methods=['GET', 'POST'])
@login_required
def respond_to_application(application_id):
"""Accept or reject job application (contractor only)"""
from app.extensions import db
from app.models import Application # Import Application model
from app.models import Job # Import Job model
if current_user.role != 'contractor':
flash('Only contractors can respond to applications.', 'error')
return redirect(url_for('dashboard'))
application = Application.query.get_or_404(application_id)
# Check if user owns the job
if application.job.contractor_id != current_user.id:
flash('You can only respond to applications for your own jobs.', 'error')
return redirect(url_for('dashboard'))
if request.method == 'GET':
# Show application details for review
return render_template('applications/respond.html', application=application)
action = request.form.get('action')
if action == 'accept':
application.status = 'accepted'
# Close the job and reject other applications
application.job.status = 'assigned'
# Reject all other pending applications for this job
other_applications = Application.query.filter_by(
job_id=application.job_id,
status='pending'
).filter(Application.id != application_id).all()
for other_app in other_applications:
other_app.status = 'rejected'
# Create Contract record
from app.models import Contract
from datetime import date, timedelta
# Calculate end date (estimate 30 days from start)
start_date = date.today()
end_date = start_date + timedelta(days=30)
# Debug rate transfer
proposed_rate = application.proposed_rate
fallback_rate = application.job.budget_max or application.job.hourly_rate or 0
final_rate = proposed_rate if proposed_rate is not None else fallback_rate
print(f"[?] Contract Rate Debug:")
print(f" Application proposed_rate: {proposed_rate}")
print(f" Job budget_max: {application.job.budget_max}")
print(f" Job hourly_rate: {application.job.hourly_rate}")
print(f" Final agreed_rate: {final_rate}")
# Determine if this is hourly or total rate based on job data
rate_type = 'hourly' if application.job.hourly_rate else 'total'
# Create contract with proper workflow status
contract = Contract(
job_id=application.job_id,
contractor_id=application.job.contractor_id,
worker_id=application.worker_id,
agreed_rate=final_rate,
rate_type=rate_type, # Set correct rate type
start_date=start_date,
end_date=end_date,
scope_of_work=application.job.description or "Contract work as per job posting",
status='pending_agreement' # Correct! Requires both parties to agree
)
# Add and commit contract with proper error handling
db.session.add(contract)
try:
db.session.commit()
print(f"[?] Contract created successfully: ID {contract.id}")
# Verify what was actually saved to database
saved_contract = Contract.query.get(contract.id)
print(f"[?] VERIFICATION - Saved contract agreed_rate: {saved_contract.agreed_rate}")
print(f"[?] VERIFICATION - Saved contract type: {type(saved_contract.agreed_rate)}")
# Verify contract was created with valid ID
if contract.id:
flash(f'Application accepted! Contract #{contract.id} created and ready for review.', 'success')
return redirect(url_for('contracts_review', contract_id=contract.id))
else:
flash('Contract created but ID assignment failed. Check your dashboard.', 'warning')
return redirect(url_for('dashboard_contractor'))
except Exception as e:
db.session.rollback()
flash(f'Error creating contract: {str(e)}', 'error')
print(f"Contract creation error: {e}") # For debugging
return redirect(url_for('dashboard_contractor'))
elif action == 'reject':
application.status = 'rejected'
flash(f'Application from {application.worker.first_name} {application.worker.last_name} rejected.', 'info')
try:
db.session.commit()
except Exception as e:
db.session.rollback()
flash('Error updating application. Please try again.', 'error')
return redirect(url_for('dashboard_contractor'))
@app.route('/contracts//review', methods=['GET', 'POST'])
@login_required
def contracts_review(contract_id):
"""Review contract details and sign"""
from app.models import Contract
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Check if user is party to this contract
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You do not have access to this contract.', 'error')
return redirect(url_for('dashboard'))
if request.method == 'POST':
action = request.form.get('action')
if action == 'contractor_review' and current_user.id == contract.contractor_id:
contract.contractor_reviewed = True
flash('Contract marked as reviewed by contractor.', 'success')
elif action == 'worker_review' and current_user.id == contract.worker_id:
contract.worker_reviewed = True
flash('Contract marked as reviewed by worker.', 'success')
else:
flash('Invalid action or insufficient permissions.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
db.session.commit()
return redirect(url_for('contracts_review', contract_id=contract_id))
return render_template('contracts/review.html', contract=contract)
@app.route('/contracts//sign', methods=['GET', 'POST'])
@login_required
def contracts_sign(contract_id):
"""Enhanced signing with signature capture and PDF generation"""
from app.models import Contract
from app.extensions import db
from app.services.pdf_service import ContractPDFService
import base64
import os
contract = Contract.query.get_or_404(contract_id)
if not contract.can_sign(current_user.id):
flash('You cannot sign this contract at this time.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
# Auto-mark as reviewed when signing
if current_user.id == contract.contractor_id:
contract.contractor_reviewed = True
elif current_user.id == contract.worker_id:
contract.worker_reviewed = True
if request.method == 'POST':
try:
# Process signature data if provided
signature_data = request.form.get('signature')
if signature_data:
# Save signature as PNG file (NO DATABASE CHANGES)
signatures_dir = 'contracts/signatures'
os.makedirs(signatures_dir, exist_ok=True)
# Remove base64 prefix if present
if 'data:image/png;base64,' in signature_data:
signature_data = signature_data.split(',')[1]
# Save signature file
signature_filename = f"signature_{contract_id}_{current_user.id}.png"
signature_path = os.path.join(signatures_dir, signature_filename)
with open(signature_path, 'wb') as f:
f.write(base64.b64decode(signature_data))
# Generate signed PDF using PDF service
pdf_service = ContractPDFService()
pdf_path = pdf_service.generate_contract_pdf({
'contract_id': contract_id,
'user_id': current_user.id,
'signature_file': signature_path,
'contract': contract
})
current_app.logger.info(f"Generated signed contract PDF: {pdf_path}")
# DRIVE INTEGRATION: Upload signed contract to Google Drive
if pdf_path and os.path.exists(pdf_path):
try:
from app.services.drive_oauth_service import DriveOAuthService
drive_service = DriveOAuthService()
drive_link = drive_service.upload_signed_contract(pdf_path, contract_id)
if drive_link:
current_app.logger.info(f"Contract {contract_id} successfully uploaded to Google Drive: {drive_link}")
flash('Contract PDF uploaded to Google Drive for secure storage.', 'success')
else:
current_app.logger.warning(f"Drive upload failed for contract {contract_id} - OAuth may need renewal")
# Don't fail the signing process, just log the issue
except Exception as drive_error:
current_app.logger.error(f"Drive upload error for contract {contract_id}: {str(drive_error)}")
# Drive upload failure shouldn't break contract signing
pass
# Use existing sign_contract method (preserves all existing logic)
success = contract.sign_contract(current_user.id)
if success:
db.session.commit()
if contract.status == 'active':
flash('Contract fully executed! Work can now begin.', 'success')
else:
flash('Contract signed successfully. Waiting for other party.', 'success')
else:
flash('Failed to sign contract.', 'error')
except Exception as e:
current_app.logger.error(f"Signature processing error: {str(e)}")
flash('Signature processing failed, but contract signing completed.', 'warning')
return redirect(url_for('contracts_review', contract_id=contract_id))
# GET request - show signing confirmation page
return render_template('contracts/review.html', contract=contract, show_sign_form=True)
@app.route('/contracts//mark-complete', methods=['POST'])
@login_required
def mark_contract_complete(contract_id):
"""Worker marks contract work as complete"""
from app.models import Contract
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Only worker can mark as complete
if contract.worker_id != current_user.id:
flash('Only the worker can mark work as complete.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
if contract.status != 'active':
flash('Contract must be active to mark complete.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
# Update contract status
contract.status = 'pending_review'
# Find associated payment if exists
payment = contract.payments.filter_by(status='held_escrow').first()
if payment:
payment.status = 'pending_release'
payment.release_conditions_met = True
db.session.commit()
flash('Work marked as complete - awaiting contractor review.', 'success')
return redirect(url_for('contract_closeout', contract_id=contract_id))
@app.route('/contracts//approve-completion', methods=['POST'])
@login_required
def approve_contract_completion(contract_id):
"""Contractor approves work completion - ENFORCES MANDATORY RATING"""
from app.models import Contract
from app.extensions import db
from app.services.rating_service import rating_service
from datetime import datetime
contract = Contract.query.get_or_404(contract_id)
# Only contractor can approve completion
if contract.contractor_id != current_user.id:
flash('Only the contractor can approve completion.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
if contract.status != 'pending_review':
flash('Contract is not pending completion review.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
# CRITICAL: Check if mandatory ratings are complete before allowing completion
completion_validation = rating_service.validate_contract_completion_request(
contract_id, current_user.id
)
if not completion_validation['can_complete']:
# If ratings are required, transition to rating stage instead of completing
if completion_validation.get('requires_rating'):
rating_transition = rating_service.transition_contract_to_rating_stage(contract_id)
if rating_transition['success']:
flash('Work approved! Both parties must now rate each other before contract completion.', 'warning')
else:
flash(f'Error transitioning to rating stage: {rating_transition["error"]}', 'error')
else:
flash(completion_validation['error'], 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
# If we reach here, ratings are complete - proceed with completion
# Find payment in escrow
payment = contract.payments.filter_by(status='pending_release').first()
if payment:
payment.status = 'released'
payment.date_released = datetime.utcnow()
# Update contract status to completed (ratings already verified)
contract.status = 'completed'
contract.mutual_rating_completed_date = datetime.utcnow()
db.session.commit()
flash('Work approved and payment released successfully! Contract completed with mutual ratings.', 'success')
return redirect(url_for('contract_closeout', contract_id=contract_id))
@app.route('/contracts//dispute-completion', methods=['POST'])
@login_required
def dispute_contract_completion(contract_id):
"""Contractor disputes work completion"""
from app.models import Contract
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Only contractor can dispute
if contract.contractor_id != current_user.id:
flash('Only the contractor can dispute completion.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
if contract.status != 'pending_review':
flash('Contract is not pending completion review.', 'error')
return redirect(url_for('contract_closeout', contract_id=contract_id))
dispute_reason = request.form.get('reason', 'Work does not meet requirements')
# Update contract status
contract.status = 'disputed'
# Find payment and mark as disputed
payment = contract.payments.filter_by(status='pending_release').first()
if payment:
payment.status = 'disputed'
db.session.commit()
flash(f'Completion disputed: {dispute_reason}', 'warning')
return redirect(url_for('contract_closeout', contract_id=contract_id))
@app.route('/contracts', methods=['GET'])
@login_required
def contracts_list():
"""List all contracts for current user"""
from app.models import Contract
if current_user.role == 'contractor':
contracts = Contract.query.filter_by(contractor_id=current_user.id).order_by(Contract.created_at.desc()).all()
else:
contracts = Contract.query.filter_by(worker_id=current_user.id).order_by(Contract.created_at.desc()).all()
return render_template('contracts/list.html', contracts=contracts)
@app.route('/applications//accept', methods=['POST'])
@login_required
def accept_application(application_id):
"""Accept a job application and create contract"""
from app.models import Application, Contract
from datetime import date, timedelta
if not current_user.is_authenticated or current_user.role != 'contractor':
flash('Only contractors can accept applications', 'error')
return redirect(url_for('login'))
application = Application.query.get_or_404(application_id)
# Check if contractor owns this job
if application.job.contractor_id != current_user.id:
flash('You can only accept applications for your own jobs', 'error')
return redirect(url_for('applications_contractor_list'))
if application.status != 'pending':
flash('Application has already been processed', 'warning')
return redirect(url_for('applications_contractor_list'))
try:
# Accept the application
application.status = 'accepted'
# Close the job
application.job.status = 'assigned'
# Create contract automatically
start_date = date.today() + timedelta(days=7) # Start in 1 week
end_date = start_date + timedelta(days=30) # 30 day contract
contract = Contract(
job_id=application.job_id,
contractor_id=current_user.id,
worker_id=application.worker_id,
agreed_rate=application.proposed_rate if application.proposed_rate is not None else (application.job.budget_max or application.job.hourly_rate or 5000.00),
rate_type='total',
start_date=start_date,
end_date=end_date,
scope_of_work=application.job.description,
payment_terms='completion',
status='pending_agreement'
)
db.session.add(contract)
# Reject other pending applications for this job
other_applications = Application.query.filter_by(
job_id=application.job_id,
status='pending'
).filter(Application.id != application_id).all()
for other_app in other_applications:
other_app.status = 'rejected'
db.session.commit()
# NOTIFICATION TRIGGER: Contract acceptance notification
try:
# Send notification to worker that application was accepted
notification_service.send_notification(
user_id=application.worker_id,
notification_type=NotificationType.CONTRACT_AWARDED,
title="Application Accepted!",
content=f"Your application for '{application.job.title}' was accepted. Contract #{contract.id} created.",
action_url=url_for('contracts_review', contract_id=contract.id, _external=False)
)
print(f"[NOTIFICATION] Contract acceptance notification sent to worker {application.worker_id}")
except Exception as notification_error:
# Don't fail contract creation if notification fails
print(f"[NOTIFICATION ERROR] Failed to send contract acceptance notification: {notification_error}")
flash(f'Application accepted! Contract #{contract.id} created and ready for review.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Error accepting application: {str(e)}', 'error')
return redirect(url_for('applications_contractor_list'))
@app.route('/contracts/')
@login_required
def contract_review(contract_id):
"""Review contract details and sign"""
from app.models import Contract
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Check authorization
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You are not authorized to view this contract.', 'danger')
return redirect(url_for('dashboard'))
return render_template('contracts/review.html', contract=contract)
@app.route('/contracts//closeout')
@login_required
def contract_closeout(contract_id):
"""Contract close-out interface"""
from app.models import Contract
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Check authorization
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You are not authorized to view this contract.', 'danger')
return redirect(url_for('dashboard'))
return render_template('contracts/closeout.html', contract=contract)
@app.route('/contracts//rate', methods=['GET', 'POST'])
@login_required
def rate_contract(contract_id):
"""Rate contract performance (both parties can rate) - USES RATING SERVICE"""
from app.models.contract import Contract
from app.services.rating_service import rating_service
from app.extensions import db
contract = Contract.query.get_or_404(contract_id)
# Check authorization using rating service
eligibility = rating_service.validate_rating_eligibility(contract_id, current_user.id)
if not eligibility['can_rate']:
flash(eligibility['error'], 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
if request.method == 'POST':
try:
# Get form data
overall_rating = int(request.form.get('overall_rating'))
quality_rating = int(request.form.get('quality_rating'))
communication_rating = int(request.form.get('communication_rating'))
reliability_rating = int(request.form.get('reliability_rating', overall_rating)) # Fallback to overall
professionalism_rating = int(request.form.get('professionalism_rating', overall_rating)) # Fallback to overall
comment = request.form.get('comment', '').strip()
# Validate ratings (1-5 scale)
rating_values = [overall_rating, quality_rating, communication_rating, reliability_rating, professionalism_rating]
for rating in rating_values:
if not (1 <= rating <= 5):
flash('All ratings must be between 1 and 5.', 'error')
return render_template('contracts/rate.html', contract=contract)
# Prepare rating data for service
rating_data = {
'overall_score': overall_rating,
'quality_score': quality_rating,
'communication_score': communication_rating,
'reliability_score': reliability_rating,
'professionalism_score': professionalism_rating,
'review_text': comment,
'is_public': True # Default to public
}
# Use rating service to create rating with all business logic
result = rating_service.create_rating(contract_id, current_user.id, rating_data)
if result['success']:
flash(result['message'], 'success')
return redirect(url_for('contracts_review', contract_id=contract_id))
else:
flash(f'Error submitting rating: {result["error"]}', 'error')
return render_template('contracts/rate.html', contract=contract)
except (ValueError, TypeError) as e:
flash('Invalid rating values provided.', 'error')
return render_template('contracts/rate.html', contract=contract)
except Exception as e:
flash(f'Error submitting rating: {str(e)}', 'error')
return render_template('contracts/rate.html', contract=contract)
# GET request - show rating form
return render_template('contracts/rate.html', contract=contract)
@app.route('/payouts')
@login_required
def payouts():
"""View payouts for workers"""
from app.models.contract import Payment, Contract
from app.extensions import db
if current_user.role == 'worker':
# Get all payments for worker's contracts
payments = db.session.query(Payment).join(Contract).filter(
Contract.worker_id == current_user.id
).order_by(Payment.date_initiated.desc()).all()
# Calculate totals
total_earned = sum(p.net_to_worker for p in payments if p.status == 'released')
pending_payouts = sum(p.net_to_worker for p in payments if p.status in ['pending', 'held_escrow'])
return render_template('payouts/worker.html',
payments=payments,
total_earned=total_earned,
pending_payouts=pending_payouts)
elif current_user.role == 'contractor':
# Get all payments for contractor's contracts
payments = db.session.query(Payment).join(Contract).filter(
Contract.contractor_id == current_user.id
).order_by(Payment.date_initiated.desc()).all()
# Calculate totals
total_paid = sum(p.gross_amount for p in payments if p.status == 'released')
pending_payments = sum(p.gross_amount for p in payments if p.status in ['pending', 'held_escrow'])
return render_template('payouts/contractor.html',
payments=payments,
total_paid=total_paid,
pending_payments=pending_payments)
else:
flash('Access denied.', 'error')
return redirect(url_for('dashboard'))
# PHASE 1: BLUEPRINT PREFIX FIXES
@app.route('/messages')
@login_required
def messages_wrapper():
"""Messages interface - connects to existing API"""
from app.models import User
from app.extensions import db
# Get user's conversations
try:
# This connects to existing messaging functionality
return render_template('messages/chat.html',
current_user=current_user,
conversations=[])
except Exception as e:
flash('Messages temporarily unavailable.', 'warning')
return redirect(url_for('dashboard'))
@app.route('/messages/')
@login_required
def message_view_by_id(message_id):
"""View specific message by ID - redirects to conversation with sender"""
try:
from app.models.message import Message
from app.extensions import db
# Find the message
message = Message.query.get_or_404(message_id)
# Verify current user is sender or receiver
if current_user.id not in [message.sender_id, message.receiver_id]:
flash('You do not have access to this message.', 'error')
return redirect(url_for('messages_wrapper'))
# Determine the other user ID (who to have conversation with)
other_user_id = message.sender_id if current_user.id == message.receiver_id else message.receiver_id
# Mark message as read if current user is the receiver
if current_user.id == message.receiver_id and not message.read_at:
from app.services.message_service import message_service
message_service.mark_message_as_read(message_id, current_user.id)
# Redirect to conversation with the other user
return redirect(url_for('messages_view', user_id=other_user_id))
except Exception as e:
logger.error(f"Error accessing message {message_id}: {e}")
flash('Message not found.', 'error')
return redirect(url_for('messages_wrapper'))
@app.route('/messages/view')
@login_required
def messages_view():
"""Contract-context messaging view with bidirectional support"""
from app.models import User, Contract
from app.extensions import db
try:
# Get URL parameters for contract context
contract_id = request.args.get('contract_id', type=int)
user_id = request.args.get('user_id', type=int)
# Initialize variables
contract = None
other_user = None
# Validate contract context if provided
if contract_id:
contract = Contract.query.get_or_404(contract_id)
# Verify current user is party to this contract
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You do not have access to this contract.', 'error')
return redirect(url_for('dashboard'))
# Validate target user if provided
if user_id:
other_user = User.query.get_or_404(user_id)
# Verify user access - if contract context exists, verify the target user is the other party
if contract:
expected_user_id = contract.worker_id if current_user.id == contract.contractor_id else contract.contractor_id
if user_id != expected_user_id:
flash('Invalid user for this contract.', 'error')
return redirect(url_for('dashboard'))
# Render messaging interface with contract context
return render_template('messages/chat.html',
current_user=current_user,
contract=contract,
other_user=other_user,
conversations=[],
contract_context=True)
except Exception as e:
flash(f'Error accessing messages: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/notifications')
@login_required
def notifications_wrapper():
"""Notifications interface - connects to existing API"""
try:
# Get user's notifications from notification service
notifications = notification_service.get_user_notifications(
user_id=current_user.id,
limit=50, # Show recent 50 notifications
offset=0,
unread_only=False
)
# Get unread count for badge
unread_count = notification_service.get_unread_count(current_user.id)
print(f"[NOTIFICATION DEBUG] Found {len(notifications)} notifications for user {current_user.id}")
print(f"[NOTIFICATION DEBUG] Unread count: {unread_count}")
return render_template('notifications/list.html',
notifications=notifications,
unread_count=unread_count)
except Exception as e:
print(f"[NOTIFICATION ERROR] Error loading notifications: {e}")
flash('Notifications temporarily unavailable.', 'warning')
return redirect(url_for('dashboard'))
# PHASE 2: CORE USER WORKFLOW ROUTES
@app.route('/jobs/employer')
@login_required
def jobs_employer():
"""Employer job management dashboard"""
from app.models import Job
if current_user.role not in ['contractor', 'employer']:
flash('Access denied. Employer access required.', 'error')
return redirect(url_for('dashboard'))
try:
user_jobs = Job.query.filter_by(contractor_id=current_user.id).order_by(Job.date_posted.desc()).all()
return render_template('jobs/employer_jobs.html', jobs=user_jobs)
except Exception as e:
flash(f'Error loading jobs: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/jobs/worker')
@login_required
def jobs_worker():
"""Worker job discovery and applications"""
from app.models import Job, Application
from app.extensions import db
if current_user.role != 'worker':
flash('Access denied. Worker access required.', 'error')
return redirect(url_for('dashboard'))
try:
# Get jobs suitable for worker (exclude those already applied to)
applied_job_ids = db.session.query(Application.job_id).filter_by(worker_id=current_user.id).subquery()
available_jobs = Job.query.filter(
Job.status == 'open',
~Job.id.in_(applied_job_ids)
).order_by(Job.date_posted.desc()).all()
return render_template('jobs/worker_jobs.html', jobs=available_jobs)
except Exception as e:
flash(f'Error loading jobs: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/applications/received')
@login_required
def applications_received():
"""Employer application management"""
from app.models import Application, Job
from app.extensions import db
if current_user.role not in ['contractor', 'employer']:
flash('Access denied. Employer access required.', 'error')
return redirect(url_for('dashboard'))
try:
# Get all applications for employer's jobs
received_applications = db.session.query(Application).join(Job).filter(
Job.contractor_id == current_user.id
).order_by(Application.date_applied.desc()).all()
return render_template('applications/contractor_list.html', applications=received_applications)
except Exception as e:
flash(f'Error loading applications: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/settings')
@login_required
def user_settings():
"""User settings and preferences"""
return render_template('auth/settings.html', user=current_user)
# PHASE 3: CONTRACT MANAGEMENT ROUTES
@app.route('/contracts/pending')
@login_required
def contracts_pending():
"""Contracts awaiting signature"""
from app.models import Contract
try:
pending_contracts = Contract.query.filter_by(
status='pending_agreement'
).filter(
(Contract.contractor_id == current_user.id) |
(Contract.worker_id == current_user.id)
).order_by(Contract.created_at.desc()).all()
return render_template('contracts/list.html',
contracts=pending_contracts,
filter_type='pending',
title='Pending Contracts')
except Exception as e:
flash(f'Error loading pending contracts: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/contracts/active')
@login_required
def contracts_active():
"""Active contracts in progress"""
from app.models import Contract
try:
active_contracts = Contract.query.filter_by(
status='active'
).filter(
(Contract.contractor_id == current_user.id) |
(Contract.worker_id == current_user.id)
).order_by(Contract.created_at.desc()).all()
return render_template('contracts/list.html',
contracts=active_contracts,
filter_type='active',
title='Active Contracts')
except Exception as e:
flash(f'Error loading active contracts: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/contracts/completed')
@login_required
def contracts_completed():
"""Completed contract history"""
from app.models import Contract
try:
completed_contracts = Contract.query.filter_by(
status='completed'
).filter(
(Contract.contractor_id == current_user.id) |
(Contract.worker_id == current_user.id)
).order_by(Contract.created_at.desc()).all()
return render_template('contracts/list.html',
contracts=completed_contracts,
filter_type='completed',
title='Completed Contracts')
except Exception as e:
flash(f'Error loading completed contracts: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/contracts/needs-rating')
@login_required
def contracts_needs_rating():
"""Contracts needing rating/review"""
from app.models import Contract
try:
rating_contracts = Contract.query.filter_by(
status='pending_rating'
).filter(
(Contract.contractor_id == current_user.id) |
(Contract.worker_id == current_user.id)
).order_by(Contract.created_at.desc()).all()
return render_template('contracts/list.html',
contracts=rating_contracts,
filter_type='needs_rating',
title='Contracts Needing Rating')
except Exception as e:
flash(f'Error loading contracts needing rating: {str(e)}', 'error')
return redirect(url_for('dashboard'))
# PHASE 4: FINANCIAL MANAGEMENT ROUTES
@app.route('/payments')
@login_required
def payments_history():
"""Payment history and management"""
from app.models.contract import Payment, Contract
from app.extensions import db
try:
if current_user.role in ['contractor', 'employer']:
# Get payments made by contractor
payments = db.session.query(Payment).join(Contract).filter(
Contract.contractor_id == current_user.id
).order_by(Payment.date_initiated.desc()).all()
# Calculate totals
total_paid = sum(p.gross_amount for p in payments if p.status == 'released')
pending_payments = sum(p.gross_amount for p in payments if p.status in ['pending', 'held_escrow'])
return render_template('payments/history.html',
payments=payments,
total_paid=total_paid,
pending_payments=pending_payments)
else:
flash('Access denied.', 'error')
return redirect(url_for('dashboard'))
except Exception as e:
flash(f'Error loading payments: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/payments/pending')
@login_required
def payments_pending():
"""Outstanding payments"""
from app.models.contract import Payment, Contract
from app.extensions import db
try:
if current_user.role in ['contractor', 'employer']:
pending_payments = db.session.query(Payment).join(Contract).filter(
Contract.contractor_id == current_user.id,
Payment.status.in_(['pending', 'held_escrow'])
).order_by(Payment.date_initiated.desc()).all()
return render_template('payments/pending.html', payments=pending_payments)
else:
flash('Access denied.', 'error')
return redirect(url_for('dashboard'))
except Exception as e:
flash(f'Error loading pending payments: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/payouts/pending')
@login_required
def payouts_pending():
"""Pending contractor payouts"""
from app.models.contract import Payment, Contract
from app.extensions import db
try:
if current_user.role == 'worker':
pending_payouts = db.session.query(Payment).join(Contract).filter(
Contract.worker_id == current_user.id,
Payment.status.in_(['pending', 'held_escrow'])
).order_by(Payment.date_initiated.desc()).all()
return render_template('payouts/pending.html', payouts=pending_payouts)
else:
flash('Access denied.', 'error')
return redirect(url_for('dashboard'))
except Exception as e:
flash(f'Error loading pending payouts: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/invoices')
@login_required
def invoices_list():
"""Invoice generation and management"""
from app.models import Invoice
try:
user_invoices = Invoice.query.filter(
(Invoice.contractor_id == current_user.id) |
(Invoice.worker_id == current_user.id)
).order_by(Invoice.created_at.desc()).all()
return render_template('invoices/list.html', invoices=user_invoices)
except Exception as e:
flash(f'Error loading invoices: {str(e)}', 'error')
return redirect(url_for('dashboard'))
@app.route('/contracts//payment')
@login_required
def contract_payment_form(contract_id):
"""Secure Stripe Elements payment form for contract"""
from app.models import Contract, Payment
from flask_jwt_extended import create_access_token
from flask import current_app
try:
# Get contract and verify access
contract = Contract.query.get_or_404(contract_id)
# Only contractor can make payments
if contract.contractor_id != current_user.id:
flash('Only the contract owner can make payments.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
# Check contract is ready for payment
if not contract.is_fully_signed():
flash('Contract must be fully signed before payment.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
# Check if payment already exists
existing_payment = Payment.query.filter(
Payment.contract_id == contract.id,
Payment.status.in_(['pending_payment', 'held_escrow', 'pending_release'])
).first()
if existing_payment:
flash('Payment already exists for this contract.', 'warning')
return redirect(url_for('contracts_review', contract_id=contract_id))
# Calculate payment amount (including fees and GST)
base_amount = float(contract.agreed_rate)
platform_fee = base_amount * 0.07 # 7% platform fee
payment_processing_fee = base_amount * 0.029 # 2.9% Stripe fee
gst_amount = base_amount * 0.1 # 10% GST
total_payment_amount = base_amount + platform_fee + payment_processing_fee + gst_amount
# Get Stripe publishable key
stripe_publishable_key = current_app.config.get('STRIPE_PUBLISHABLE_KEY')
if not stripe_publishable_key:
flash('Payment system not configured. Please contact support.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
# Create JWT token for API authentication
jwt_token = create_access_token(identity=current_user.id)
return render_template('payments/payment_form.html',
contract=contract,
payment_amount=total_payment_amount,
stripe_publishable_key=stripe_publishable_key,
jwt_token=jwt_token,
current_user=current_user)
except Exception as e:
logger.error(f"Error loading payment form for contract {contract_id}: {e}")
flash('Error loading payment form.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
# PHASE 5: PROFILE MANAGEMENT ROUTES
@app.route('/profile/edit')
@login_required
def profile_edit():
"""Profile editing interface"""
return render_template('profile/edit.html', user=current_user)
@app.route('/profile/verification')
@login_required
def profile_verification():
"""Document verification system"""
return render_template('profile/verification.html', user=current_user)
@app.route('/profile/insurance')
@login_required
def profile_insurance():
"""Insurance management"""
return render_template('profile/insurance.html', user=current_user)
# STRIPE CONNECT ROUTES
@app.route('/api/stripe/setup-payouts')
@login_required
def stripe_setup_payouts():
"""Initialize Stripe Connect onboarding for payouts"""
from app.services.stripe_service import stripe_service
from app.extensions import db
from flask import url_for
try:
# Check if user already has Stripe account
if current_user.stripe_account_id:
# Get account status
account_status = stripe_service.get_account_status(current_user.stripe_account_id)
if account_status['charges_enabled'] and account_status['payouts_enabled']:
flash('Your payout account is already set up and active!', 'success')
return redirect(url_for('user_settings'))
# Account exists but needs completion
account_id = current_user.stripe_account_id
else:
# Create new Stripe Connect account
user_data = {
'email': current_user.email,
'first_name': current_user.first_name,
'last_name': current_user.last_name
}
account = stripe_service.create_connect_account(user_data)
account_id = account.id
# Save account ID to user
current_user.stripe_account_id = account_id
db.session.commit()
# Create onboarding link
return_url = url_for('stripe_onboarding_return', _external=True)
refresh_url = url_for('stripe_onboarding_refresh', _external=True)
onboarding_link = stripe_service.create_onboarding_link(
account_id,
return_url,
refresh_url
)
# Redirect to Stripe hosted onboarding
return redirect(onboarding_link.url)
except Exception as e:
app.logger.error(f"Stripe Connect setup error: {str(e)}")
flash('Error setting up payouts. Please try again later.', 'error')
return redirect(url_for('user_settings'))
@app.route('/api/stripe/onboarding/return')
@login_required
def stripe_onboarding_return():
"""Handle return from Stripe onboarding"""
from app.services.stripe_service import stripe_service
from app.extensions import db
try:
if current_user.stripe_account_id:
# Check account status
account_status = stripe_service.get_account_status(current_user.stripe_account_id)
# Update user record
current_user.stripe_onboarding_complete = account_status['details_submitted']
current_user.stripe_payouts_enabled = account_status['payouts_enabled']
current_user.stripe_charges_enabled = account_status['charges_enabled']
db.session.commit()
if account_status['payouts_enabled']:
flash('Payout setup completed successfully! You can now receive payments.', 'success')
elif account_status['details_submitted']:
flash('Account information submitted. Stripe is reviewing your details.', 'info')
else:
flash('Setup incomplete. Please complete all required information.', 'warning')
else:
flash('Account setup error. Please try again.', 'error')
except Exception as e:
app.logger.error(f"Stripe onboarding return error: {str(e)}")
flash('Error checking account status.', 'error')
return redirect(url_for('user_settings'))
@app.route('/api/stripe/onboarding/refresh')
@login_required
def stripe_onboarding_refresh():
"""Handle refresh/restart of Stripe onboarding"""
# Redirect back to setup
return redirect(url_for('stripe_setup_payouts'))
@app.route('/api/stripe/dashboard')
@login_required
def stripe_dashboard():
"""Redirect to Stripe Express dashboard"""
from app.services.stripe_service import stripe_service
try:
if not current_user.stripe_account_id:
flash('Please set up your payout account first.', 'warning')
return redirect(url_for('user_settings'))
# Create dashboard login link
dashboard_link = stripe_service.create_dashboard_link(current_user.stripe_account_id)
# Redirect to Stripe dashboard
return redirect(dashboard_link.url)
except Exception as e:
app.logger.error(f"Stripe dashboard error: {str(e)}")
flash('Error accessing dashboard. Please try again later.', 'error')
return redirect(url_for('user_settings'))
# ==================== TIME TRACKING ROUTES ====================
@app.route('/contracts//hours')
@login_required
def contract_hours_view(contract_id):
"""View contract time tracking data"""
from app.services.time_tracking_service import time_tracking_service
from app.models import Contract
try:
# Verify user has access to this contract
contract = Contract.query.get_or_404(contract_id)
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You do not have access to this contract.', 'error')
return redirect(url_for('dashboard'))
# Get contract hours data
hours_data = time_tracking_service.get_contract_hours(contract_id)
if not hours_data.get('success'):
flash(f"Error loading hours: {hours_data.get('error')}", 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
return render_template('contracts/hours.html',
contract=contract,
hours_data=hours_data)
except Exception as e:
logger.error(f"Error viewing contract hours {contract_id}: {e}")
flash('Error loading time tracking data.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
@app.route('/contracts//hours/edit', methods=['GET', 'POST'])
@login_required
def contract_hours_edit(contract_id):
"""Edit contract hours (fixes user issue!)"""
from app.services.time_tracking_service import time_tracking_service
from app.models import Contract
from datetime import date
try:
# Verify user has access to this contract
contract = Contract.query.get_or_404(contract_id)
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You do not have access to this contract.', 'error')
return redirect(url_for('dashboard'))
if request.method == 'POST':
action = request.form.get('action')
if action == 'add_hours':
# Add new time entry
hours = request.form.get('hours', type=float)
work_date_str = request.form.get('work_date')
description = request.form.get('description', '')
location = request.form.get('location', '')
# Parse date
try:
work_date = date.fromisoformat(work_date_str) if work_date_str else date.today()
except ValueError:
flash('Invalid work date format.', 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
# Validate hours
if not hours or hours <= 0:
flash('Hours must be greater than 0.', 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
# Create time entry
result = time_tracking_service.create_time_entry(
contract_id=contract_id,
worker_id=contract.worker_id,
hours=hours,
work_date=work_date,
description=description,
location=location
)
if result.get('success'):
flash(result.get('message'), 'success')
else:
flash(f"Error adding hours: {result.get('error')}", 'error')
elif action == 'edit_entry':
# Edit existing time entry
entry_id = request.form.get('entry_id', type=int)
hours = request.form.get('hours', type=float)
description = request.form.get('description')
location = request.form.get('location')
if not entry_id:
flash('Entry ID is required for editing.', 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
result = time_tracking_service.edit_time_entry(
entry_id=entry_id,
hours=hours,
description=description,
location=location,
user_id=current_user.id
)
if result.get('success'):
flash(result.get('message'), 'success')
else:
flash(f"Error editing hours: {result.get('error')}", 'error')
elif action == 'delete_entry':
# Delete time entry
entry_id = request.form.get('entry_id', type=int)
if not entry_id:
flash('Entry ID is required for deletion.', 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
result = time_tracking_service.delete_time_entry(
entry_id=entry_id,
user_id=current_user.id
)
if result.get('success'):
flash(result.get('message'), 'success')
else:
flash(f"Error deleting hours: {result.get('error')}", 'error')
elif action == 'approve_hours':
# Approve time entries (contractor only)
if contract.contractor_id != current_user.id:
flash('Only the contractor can approve hours.', 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
entry_ids_str = request.form.getlist('entry_ids')
entry_ids = [int(id) for id in entry_ids_str if id.isdigit()]
result = time_tracking_service.approve_time_entries(
contract_id=contract_id,
approver_id=current_user.id,
entry_ids=entry_ids if entry_ids else None
)
if result.get('success'):
flash(result.get('message'), 'success')
else:
flash(f"Error approving hours: {result.get('error')}", 'error')
return redirect(url_for('contract_hours_edit', contract_id=contract_id))
# GET request - show edit form
hours_data = time_tracking_service.get_contract_hours(contract_id)
if not hours_data.get('success'):
flash(f"Error loading hours: {hours_data.get('error')}", 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
return render_template('contracts/hours_edit.html',
contract=contract,
hours_data=hours_data,
is_contractor=current_user.id == contract.contractor_id)
except Exception as e:
logger.error(f"Error editing contract hours {contract_id}: {e}")
flash('Error loading hour editing interface.', 'error')
return redirect(url_for('contracts_review', contract_id=contract_id))
@app.route('/contracts//hours/summary')
@login_required
def contract_hours_summary(contract_id):
"""Hour summary for contract (JSON response)"""
from app.services.time_tracking_service import time_tracking_service
from app.models import Contract
try:
# Verify user has access to this contract
contract = Contract.query.get_or_404(contract_id)
if current_user.id not in [contract.contractor_id, contract.worker_id]:
return jsonify({
"success": False,
"error": "Access denied"
}), 403
# Get calculation summary
summary = time_tracking_service.calculate_total_hours(
contract_id=contract_id,
approved_only=False # Include all hours
)
if summary.get('success'):
# Add contract details
summary['contract'] = {
'id': contract.id,
'status': contract.status,
'agreed_rate': float(contract.agreed_rate),
'rate_type': contract.rate_type,
'worker_name': f"{contract.worker.first_name} {contract.worker.last_name}",
'contractor_name': f"{contract.contractor.first_name} {contract.contractor.last_name}"
}
return jsonify(summary)
else:
return jsonify(summary), 400
except Exception as e:
logger.error(f"Error getting contract hours summary {contract_id}: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-entries', methods=['POST'])
@login_required
def api_create_time_entry():
"""API endpoint to create new time entries"""
from app.services.time_tracking_service import time_tracking_service
from datetime import date
try:
data = request.get_json()
if not data:
return jsonify({
"success": False,
"error": "JSON data required"
}), 400
# Extract required fields
contract_id = data.get('contract_id', type=int)
hours = data.get('hours', type=float)
# Extract optional fields
work_date_str = data.get('work_date')
description = data.get('description', '')
location = data.get('location', '')
# Validate required fields
if not contract_id or not hours:
return jsonify({
"success": False,
"error": "contract_id and hours are required"
}), 400
# Parse work date
work_date = None
if work_date_str:
try:
work_date = date.fromisoformat(work_date_str)
except ValueError:
return jsonify({
"success": False,
"error": "Invalid work_date format. Use YYYY-MM-DD"
}), 400
# Verify contract access
from app.models import Contract
contract = Contract.query.get(contract_id)
if not contract:
return jsonify({
"success": False,
"error": "Contract not found"
}), 404
if current_user.id not in [contract.contractor_id, contract.worker_id]:
return jsonify({
"success": False,
"error": "Access denied"
}), 403
# Create time entry
result = time_tracking_service.create_time_entry(
contract_id=contract_id,
worker_id=contract.worker_id,
hours=hours,
work_date=work_date,
description=description,
location=location
)
if result.get('success'):
return jsonify(result), 201
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error creating time entry via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-entries/', methods=['PUT'])
@login_required
def api_edit_time_entry(entry_id):
"""API endpoint to edit existing time entries"""
from app.services.time_tracking_service import time_tracking_service
from datetime import date
try:
data = request.get_json()
if not data:
return jsonify({
"success": False,
"error": "JSON data required"
}), 400
# Extract optional update fields
hours = data.get('hours', type=float)
description = data.get('description')
location = data.get('location')
work_date_str = data.get('work_date')
# Parse work date if provided
work_date = None
if work_date_str:
try:
work_date = date.fromisoformat(work_date_str)
except ValueError:
return jsonify({
"success": False,
"error": "Invalid work_date format. Use YYYY-MM-DD"
}), 400
# Edit time entry
result = time_tracking_service.edit_time_entry(
entry_id=entry_id,
hours=hours,
description=description,
location=location,
work_date=work_date,
user_id=current_user.id
)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error editing time entry {entry_id} via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-entries/', methods=['DELETE'])
@login_required
def api_delete_time_entry(entry_id):
"""API endpoint to delete time entries"""
from app.services.time_tracking_service import time_tracking_service
try:
result = time_tracking_service.delete_time_entry(
entry_id=entry_id,
user_id=current_user.id
)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error deleting time entry {entry_id} via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/contracts//time-entries')
@login_required
def api_get_contract_time_entries(contract_id):
"""API endpoint to get all time entries for a contract"""
from app.services.time_tracking_service import time_tracking_service
try:
# Get contract hours with full details
result = time_tracking_service.get_contract_hours(contract_id)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error getting contract time entries {contract_id} via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/contracts//approve-hours', methods=['POST'])
@login_required
def api_approve_contract_hours(contract_id):
"""API endpoint to approve time entries for a contract"""
from app.services.time_tracking_service import time_tracking_service
try:
data = request.get_json() or {}
entry_ids = data.get('entry_ids') # Optional - if None, approves all pending
result = time_tracking_service.approve_time_entries(
contract_id=contract_id,
approver_id=current_user.id,
entry_ids=entry_ids
)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error approving hours {contract_id} via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-tracking/clock-in', methods=['POST'])
@login_required
def api_clock_in():
"""API endpoint for worker clock-in"""
from app.services.time_tracking_service import time_tracking_service
try:
data = request.get_json()
if not data:
return jsonify({
"success": False,
"error": "JSON data required"
}), 400
contract_id = data.get('contract_id', type=int)
location = data.get('location', '')
description = data.get('description', '')
if not contract_id:
return jsonify({
"success": False,
"error": "contract_id is required"
}), 400
result = time_tracking_service.clock_in(
worker_id=current_user.id,
contract_id=contract_id,
location=location,
description=description
)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error clocking in via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-tracking/clock-out', methods=['POST'])
@login_required
def api_clock_out():
"""API endpoint for worker clock-out"""
from app.services.time_tracking_service import time_tracking_service
try:
data = request.get_json() or {}
entry_id = data.get('entry_id', type=int)
description = data.get('description', '')
result = time_tracking_service.clock_out(
worker_id=current_user.id,
entry_id=entry_id,
description=description
)
if result.get('success'):
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error clocking out via API: {e}")
return jsonify({
"success": False,
"error": "Internal server error"
}), 500
@app.route('/api/time-tracking/active-clock-in')
@login_required
def api_get_active_clock_in():
"""API endpoint to get worker's active clock-in"""
from app.services.time_tracking_service import time_tracking_service
try:
result = time_tracking_service.get_active_clock_in(current_user.id)
return jsonify(result)
except Exception as e:
logger.error(f"Error getting active clock-in via API: {e}")
return jsonify({
"active": False,
"error": "Internal server error"
}), 500
# ==================== EXPORT FUNCTIONALITY ====================
@app.route('/contracts//hours/export')
@login_required
def export_contract_hours(contract_id):
"""Export contract hours as CSV or PDF"""
from app.services.export_service import export_service
from app.models import Contract
from flask import make_response
from datetime import date, datetime
try:
# Verify user has access to this contract
contract = Contract.query.get_or_404(contract_id)
if current_user.id not in [contract.contractor_id, contract.worker_id]:
flash('You do not have access to this contract.', 'error')
return redirect(url_for('dashboard'))
# Get export parameters
export_format = request.args.get('format', 'csv').lower()
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
approved_only = request.args.get('approved_only', 'false').lower() == 'true'
# Parse dates if provided
start_date = None
end_date = None
if start_date_str:
try:
start_date = date.fromisoformat(start_date_str)
except ValueError:
flash('Invalid start date format.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
if end_date_str:
try:
end_date = date.fromisoformat(end_date_str)
except ValueError:
flash('Invalid end date format.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
# Validate date range
if start_date and end_date and start_date > end_date:
flash('Start date cannot be after end date.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
# Export based on format
if export_format == 'pdf':
export_result = export_service.export_contract_hours_pdf(
contract_id=contract_id,
start_date=start_date,
end_date=end_date,
approved_only=approved_only
)
if export_result.get('success'):
response = make_response(export_result['pdf_data'])
response.headers['Content-Type'] = 'application/pdf'
response.headers['Content-Disposition'] = f'attachment; filename="{export_result["filename"]}"'
return response
else:
flash(f"Export failed: {export_result.get('error')}", 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
else: # Default to CSV
export_result = export_service.export_contract_hours_csv(
contract_id=contract_id,
start_date=start_date,
end_date=end_date,
approved_only=approved_only
)
if export_result.get('success'):
response = make_response(export_result['csv_data'])
response.headers['Content-Type'] = 'text/csv'
response.headers['Content-Disposition'] = f'attachment; filename="{export_result["filename"]}"'
return response
else:
flash(f"Export failed: {export_result.get('error')}", 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
except Exception as e:
logger.error(f"Error exporting contract hours {contract_id}: {e}")
flash('Error generating export.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
@app.route('/contracts//payroll/export')
@login_required
def export_payroll_summary(contract_id):
"""Export payroll summary for contractor"""
from app.services.export_service import export_service
from app.models import Contract
from flask import make_response
from datetime import date
try:
# Verify user is contractor for this contract
contract = Contract.query.get_or_404(contract_id)
if current_user.id != contract.contractor_id:
flash('Only contractors can export payroll data.', 'error')
return redirect(url_for('dashboard'))
# Get required date range parameters
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if not start_date_str or not end_date_str:
flash('Start date and end date are required for payroll export.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
try:
start_date = date.fromisoformat(start_date_str)
end_date = date.fromisoformat(end_date_str)
except ValueError:
flash('Invalid date format.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
# Export payroll summary
export_result = export_service.export_payroll_summary_csv(
contractor_id=current_user.id,
start_date=start_date,
end_date=end_date,
approved_only=True # Payroll should only include approved hours
)
if export_result.get('success'):
response = make_response(export_result['csv_data'])
response.headers['Content-Type'] = 'text/csv'
response.headers['Content-Disposition'] = f'attachment; filename="{export_result["filename"]}"'
return response
else:
flash(f"Payroll export failed: {export_result.get('error')}", 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
except Exception as e:
logger.error(f"Error exporting payroll summary: {e}")
flash('Error generating payroll export.', 'error')
return redirect(url_for('contract_hours_view', contract_id=contract_id))
# ==================== OAUTH DRIVE INTEGRATION ROUTES ====================
@app.route('/oauth2callback')
@login_required
def oauth2callback():
"""Handle Google Drive OAuth callback"""
try:
from app.services.drive_oauth_service import DriveOAuthService
authorization_code = request.args.get('code')
if not authorization_code:
flash('OAuth authorization failed - no authorization code received.', 'error')
return redirect(url_for('dashboard'))
drive_service = DriveOAuthService()
success = drive_service.handle_oauth_callback(authorization_code)
if success:
flash('Google Drive successfully connected! Contract PDFs will now be automatically uploaded.', 'success')
else:
flash('Failed to connect Google Drive. Please try again.', 'error')
return redirect(url_for('dashboard'))
except Exception as e:
logger.error(f"OAuth callback error: {str(e)}")
flash('Error connecting to Google Drive. Please try again later.', 'error')
return redirect(url_for('dashboard'))
@app.route('/drive/connect')
@login_required
def drive_connect():
"""Initiate Google Drive OAuth connection"""
try:
from app.services.drive_oauth_service import DriveOAuthService
drive_service = DriveOAuthService()
auth_url = drive_service.get_authorization_url()
return redirect(auth_url)
except Exception as e:
logger.error(f"Drive connect error: {str(e)}")
flash('Error initiating Google Drive connection. Please try again later.', 'error')
return redirect(url_for('dashboard'))
def register_edge_error_handlers(app):
"""Register Edge-specific error handlers"""
@app.errorhandler(500)
def handle_edge_500_error(error):
"""Enhanced 500 error handler for Edge browser issues"""
from flask import request
import traceback
user_agent = request.headers.get('User-Agent', '')
is_edge = 'Edg/' in user_agent
if is_edge:
print(f"[EDGE] 500 error detected in Edge browser")
print(f"[EDGE] URL: {request.url}")
print(f"[EDGE] Method: {request.method}")
print(f"[EDGE] Headers: {dict(request.headers)}")
print(f"[EDGE] Error: {str(error)}")
# Return Edge-friendly error page
edge_error_html = """
RateRight - Loading...
RateRight - Microsoft Edge Compatibility
Optimizing experience for Edge browser...
Redirecting automatically...
Click here if not redirected
"""
return edge_error_html, 500
# Standard error handling for other browsers
return f"500 - Internal Server Error
RateRight encountered an error: {str(error)}
", 500