from datetime import date, timedelta from typing import List, Dict, Tuple, Optional from fastapi import APIRouter, Request, Depends, Query, HTTPException from fastapi.responses import PlainTextResponse from sqlalchemy import func, and_, case from sqlalchemy.orm import Session from .db import get_session from .models import Employee, TimeEntry, TimesheetStatus router = APIRouter(prefix="/attendance", tags=["Attendance"]) def _daterange(d1: date, d2: date): cur = d1 step = timedelta(days=1) while cur <= d2: yield cur cur += step def _to_date(v: Optional[str], fallback: date) -> date: try: if v: return date.fromisoformat(v) except Exception: pass return fallback @router.get("", name="attendance_builder") def attendance_builder_page( request: Request, start: Optional[str] = Query(None), # YYYY-MM-DD end: Optional[str] = Query(None), # YYYY-MM-DD employee_id: Optional[str] = Query("all"), # "all" or single id include_weekends: int = Query(0), db: Session = Depends(get_session), ): # Admin-only if not request.session.get("is_admin"): raise HTTPException(status_code=403, detail="Admin access required") # Defaults: current month today = date.today() default_start = today.replace(day=1) start_d = _to_date(start, default_start) end_d = _to_date(end, today) if end_d < start_d: start_d, end_d = end_d, start_d # Employees list all_emps = db.query(Employee).order_by(Employee.name.asc()).all() # Determine selected ids from dropdown value selected_ids: List[int] if not employee_id or employee_id == "all": selected_ids = [e.id for e in all_emps] else: try: selected_ids = [int(employee_id)] except Exception: selected_ids = [e.id for e in all_emps] # Normalize PTO type (trim+lower) normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, ""))) # Hours and flags per PTO subtype off_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("off") off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag") sick_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("sick") sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag") pto_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("pto") # Aggregate per employee/date (SUBMITTED ONLY) q = ( db.query( TimeEntry.employee_id.label("emp_id"), TimeEntry.work_date.label("d"), func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"), func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"), pto_hours_expr, off_hours_expr, off_flag_expr, sick_hours_expr, sick_flag_expr, func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"), func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"), # "Other" from bereavement bucket ) .join( TimesheetStatus, and_( TimesheetStatus.timesheet_id == TimeEntry.timesheet_id, TimesheetStatus.employee_id == TimeEntry.employee_id, TimesheetStatus.status == "submitted", ), ) .filter( TimeEntry.work_date >= start_d, TimeEntry.work_date <= end_d, TimeEntry.employee_id.in_(selected_ids) if selected_ids else True, ) ) rows = ( q.group_by(TimeEntry.employee_id, TimeEntry.work_date) .order_by(TimeEntry.employee_id.asc(), TimeEntry.work_date.asc()) .all() ) # Build (emp_id, date) -> summary per_day: Dict[Tuple[int, date], Dict[str, float]] = {} for r in rows: worked = float(r.total or 0) - float(r.breaks or 0) if worked < 0: worked = 0.0 per_day[(int(r.emp_id), r.d)] = { "worked": worked, "pto": float(r.pto or 0), "off": float(r.off or 0), "off_flag": int(r.off_flag or 0), "sick": float(r.sick or 0), "sick_flag": int(r.sick_flag or 0), "holiday": float(r.holiday or 0), "other": float(r.other or 0), } days = [d for d in _daterange(start_d, end_d) if include_weekends or d.weekday() < 5] visual = [] for e in all_emps: if selected_ids and e.id not in selected_ids: continue # Has any submitted data in range? Used for classifying gaps. has_data = any(k[0] == e.id for k in per_day.keys()) cells = [] totals = { "worked_days": 0, "off_days": 0, "sick_days": 0, "pto_days": 0, "holiday_days": 0, "other_days": 0, } hours = {"worked": 0.0, "off": 0.0, "sick": 0.0, "pto": 0.0, "holiday": 0.0, "other": 0.0} for d in days: info = per_day.get((e.id, d)) weekend = d.weekday() >= 5 if info: # Precedence: holiday > off > sick > pto > other > worked if info["holiday"] > 0: st = "holiday" totals["holiday_days"] += 1 hours["holiday"] += info["holiday"] elif info["off_flag"] > 0 or info["off"] > 0: st = "off" totals["off_days"] += 1 hours["off"] += info["off"] elif info["sick_flag"] > 0 or info["sick"] > 0: st = "sick" totals["sick_days"] += 1 hours["sick"] += info["sick"] elif info["pto"] > 0: st = "pto" totals["pto_days"] += 1 hours["pto"] += info["pto"] elif info["other"] > 0: st = "other" totals["other_days"] += 1 hours["other"] += info["other"] elif info["worked"] > 0: st = "worked" totals["worked_days"] += 1 hours["worked"] += info["worked"] else: st = "nodata" else: if weekend: st = "weekend" else: # Gaps (weekday without submitted row) count as Off when the employee has any submissions in range st = "off" if has_data else "nodata" if st == "off": totals["off_days"] += 1 # No hours added for inferred Off gap cells.append({"date": d, "status": st}) visual.append({ "employee": e, "cells": cells, "totals": totals, "hours": {k: round(v, 2) for k, v in hours.items()}, }) return request.app.state.templates.TemplateResponse( "attendance.html", { "request": request, "employees": all_emps, "selected_ids": selected_ids, "selected_employee_id": (None if (not employee_id or employee_id == 'all') else int(employee_id)), "start": start_d, "end": end_d, "days": days, "include_weekends": include_weekends, "visual": visual, }, ) @router.get("/export.csv", response_class=PlainTextResponse) def attendance_export_csv( request: Request, start: str = Query(...), end: str = Query(...), employee_id: Optional[str] = Query("all"), include_weekends: int = Query(0), db: Session = Depends(get_session), ): if not request.session.get("is_admin"): raise HTTPException(status_code=403, detail="Admin access required") start_d = _to_date(start, date.today()) end_d = _to_date(end, date.today()) if end_d < start_d: start_d, end_d = end_d, start_d ids: List[int] = [] all_emps = db.query(Employee).order_by(Employee.name.asc()).all() if not employee_id or employee_id == "all": ids = [e.id for e in all_emps] else: try: ids = [int(employee_id)] except Exception: ids = [e.id for e in all_emps] normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, ""))) off_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("off") off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag") sick_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("sick") sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag") pto_hours_expr = func.coalesce( func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)), 0, ).label("pto") q = ( db.query( Employee.name, TimeEntry.employee_id, TimeEntry.work_date, func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"), func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"), pto_hours_expr, off_hours_expr, off_flag_expr, sick_hours_expr, sick_flag_expr, func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"), func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"), ) .join(Employee, Employee.id == TimeEntry.employee_id) .join( TimesheetStatus, and_( TimesheetStatus.timesheet_id == TimeEntry.timesheet_id, TimesheetStatus.employee_id == TimeEntry.employee_id, TimesheetStatus.status == "submitted", ), ) .filter( TimeEntry.work_date >= start_d, TimeEntry.work_date <= end_d, TimeEntry.employee_id.in_(ids) if ids else True, ) ) rows = ( q.group_by(Employee.name, TimeEntry.employee_id, TimeEntry.work_date) .order_by(Employee.name.asc(), TimeEntry.work_date.asc()) .all() ) out = ["Employee,Date,Status,WorkedHours,OffHours,SickHours,PTOHours,HolidayHours,OtherHours"] for r in rows: worked = float(r.total or 0) - float(r.breaks or 0) if worked < 0: worked = 0.0 if (r.holiday or 0) > 0: st = "holiday" elif int(r.off_flag or 0) > 0 or (r.off or 0) > 0: st = "off" elif int(r.sick_flag or 0) > 0 or (r.sick or 0) > 0: st = "sick" elif (r.pto or 0) > 0: st = "pto" elif (r.other or 0) > 0: st = "other" elif worked > 0: st = "worked" else: st = "nodata" out.append( f"{r[0]},{r.work_date.isoformat()},{st},{worked:.2f},{float(r.off or 0):.2f}," f"{float(r.sick or 0):.2f},{float(r.pto or 0):.2f},{float(r.holiday or 0):.2f},{float(r.other or 0):.2f}" ) return PlainTextResponse("\n".join(out), media_type="text/csv")