timekeeper/app/attendance.py
2026-01-15 15:46:35 -05:00

330 lines
12 KiB
Python

from datetime import date, timedelta
from typing import List, Dict, Tuple, Optional
from fastapi import APIRouter, Request, Depends, Query, HTTPException
from fastapi.responses import PlainTextResponse
from sqlalchemy import func, and_, case
from sqlalchemy.orm import Session
from .db import get_session
from .models import Employee, TimeEntry, TimesheetStatus
router = APIRouter(prefix="/attendance", tags=["Attendance"])
def _daterange(d1: date, d2: date):
cur = d1
step = timedelta(days=1)
while cur <= d2:
yield cur
cur += step
def _to_date(v: Optional[str], fallback: date) -> date:
try:
if v:
return date.fromisoformat(v)
except Exception:
pass
return fallback
@router.get("", name="attendance_builder")
def attendance_builder_page(
request: Request,
start: Optional[str] = Query(None), # YYYY-MM-DD
end: Optional[str] = Query(None), # YYYY-MM-DD
employee_id: Optional[str] = Query("all"), # "all" or single id
include_weekends: int = Query(0),
db: Session = Depends(get_session),
):
# Admin-only
if not request.session.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
# Defaults: current month
today = date.today()
default_start = today.replace(day=1)
start_d = _to_date(start, default_start)
end_d = _to_date(end, today)
if end_d < start_d:
start_d, end_d = end_d, start_d
# Employees list
all_emps = db.query(Employee).order_by(Employee.name.asc()).all()
# Determine selected ids from dropdown value
selected_ids: List[int]
if not employee_id or employee_id == "all":
selected_ids = [e.id for e in all_emps]
else:
try:
selected_ids = [int(employee_id)]
except Exception:
selected_ids = [e.id for e in all_emps]
# Normalize PTO type (trim+lower)
normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, "")))
# Hours and flags per PTO subtype
off_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("off")
off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag")
sick_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("sick")
sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag")
pto_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("pto")
# Aggregate per employee/date (SUBMITTED ONLY)
q = (
db.query(
TimeEntry.employee_id.label("emp_id"),
TimeEntry.work_date.label("d"),
func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"),
func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"),
pto_hours_expr,
off_hours_expr,
off_flag_expr,
sick_hours_expr,
sick_flag_expr,
func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"),
func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"), # "Other" from bereavement bucket
)
.join(
TimesheetStatus,
and_(
TimesheetStatus.timesheet_id == TimeEntry.timesheet_id,
TimesheetStatus.employee_id == TimeEntry.employee_id,
TimesheetStatus.status == "submitted",
),
)
.filter(
TimeEntry.work_date >= start_d,
TimeEntry.work_date <= end_d,
TimeEntry.employee_id.in_(selected_ids) if selected_ids else True,
)
)
rows = (
q.group_by(TimeEntry.employee_id, TimeEntry.work_date)
.order_by(TimeEntry.employee_id.asc(), TimeEntry.work_date.asc())
.all()
)
# Build (emp_id, date) -> summary
per_day: Dict[Tuple[int, date], Dict[str, float]] = {}
for r in rows:
worked = float(r.total or 0) - float(r.breaks or 0)
if worked < 0:
worked = 0.0
per_day[(int(r.emp_id), r.d)] = {
"worked": worked,
"pto": float(r.pto or 0),
"off": float(r.off or 0),
"off_flag": int(r.off_flag or 0),
"sick": float(r.sick or 0),
"sick_flag": int(r.sick_flag or 0),
"holiday": float(r.holiday or 0),
"other": float(r.other or 0),
}
days = [d for d in _daterange(start_d, end_d) if include_weekends or d.weekday() < 5]
visual = []
for e in all_emps:
if selected_ids and e.id not in selected_ids:
continue
# Has any submitted data in range? Used for classifying gaps.
has_data = any(k[0] == e.id for k in per_day.keys())
cells = []
totals = {
"worked_days": 0,
"off_days": 0,
"sick_days": 0,
"pto_days": 0,
"holiday_days": 0,
"other_days": 0,
}
hours = {"worked": 0.0, "off": 0.0, "sick": 0.0, "pto": 0.0, "holiday": 0.0, "other": 0.0}
for d in days:
info = per_day.get((e.id, d))
weekend = d.weekday() >= 5
if info:
# Precedence: holiday > off > sick > pto > other > worked
if info["holiday"] > 0:
st = "holiday"
totals["holiday_days"] += 1
hours["holiday"] += info["holiday"]
elif info["off_flag"] > 0 or info["off"] > 0:
st = "off"
totals["off_days"] += 1
hours["off"] += info["off"]
elif info["sick_flag"] > 0 or info["sick"] > 0:
st = "sick"
totals["sick_days"] += 1
hours["sick"] += info["sick"]
elif info["pto"] > 0:
st = "pto"
totals["pto_days"] += 1
hours["pto"] += info["pto"]
elif info["other"] > 0:
st = "other"
totals["other_days"] += 1
hours["other"] += info["other"]
elif info["worked"] > 0:
st = "worked"
totals["worked_days"] += 1
hours["worked"] += info["worked"]
else:
st = "nodata"
else:
if weekend:
st = "weekend"
else:
# Gaps (weekday without submitted row) count as Off when the employee has any submissions in range
st = "off" if has_data else "nodata"
if st == "off":
totals["off_days"] += 1
# No hours added for inferred Off gap
cells.append({"date": d, "status": st})
visual.append({
"employee": e,
"cells": cells,
"totals": totals,
"hours": {k: round(v, 2) for k, v in hours.items()},
})
return request.app.state.templates.TemplateResponse(
"attendance.html",
{
"request": request,
"employees": all_emps,
"selected_ids": selected_ids,
"selected_employee_id": (None if (not employee_id or employee_id == 'all') else int(employee_id)),
"start": start_d,
"end": end_d,
"days": days,
"include_weekends": include_weekends,
"visual": visual,
},
)
@router.get("/export.csv", response_class=PlainTextResponse)
def attendance_export_csv(
request: Request,
start: str = Query(...),
end: str = Query(...),
employee_id: Optional[str] = Query("all"),
include_weekends: int = Query(0),
db: Session = Depends(get_session),
):
if not request.session.get("is_admin"):
raise HTTPException(status_code=403, detail="Admin access required")
start_d = _to_date(start, date.today())
end_d = _to_date(end, date.today())
if end_d < start_d:
start_d, end_d = end_d, start_d
ids: List[int] = []
all_emps = db.query(Employee).order_by(Employee.name.asc()).all()
if not employee_id or employee_id == "all":
ids = [e.id for e in all_emps]
else:
try:
ids = [int(employee_id)]
except Exception:
ids = [e.id for e in all_emps]
normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, "")))
off_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("off")
off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag")
sick_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("sick")
sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag")
pto_hours_expr = func.coalesce(
func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
0,
).label("pto")
q = (
db.query(
Employee.name,
TimeEntry.employee_id,
TimeEntry.work_date,
func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"),
func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"),
pto_hours_expr,
off_hours_expr,
off_flag_expr,
sick_hours_expr,
sick_flag_expr,
func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"),
func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"),
)
.join(Employee, Employee.id == TimeEntry.employee_id)
.join(
TimesheetStatus,
and_(
TimesheetStatus.timesheet_id == TimeEntry.timesheet_id,
TimesheetStatus.employee_id == TimeEntry.employee_id,
TimesheetStatus.status == "submitted",
),
)
.filter(
TimeEntry.work_date >= start_d,
TimeEntry.work_date <= end_d,
TimeEntry.employee_id.in_(ids) if ids else True,
)
)
rows = (
q.group_by(Employee.name, TimeEntry.employee_id, TimeEntry.work_date)
.order_by(Employee.name.asc(), TimeEntry.work_date.asc())
.all()
)
out = ["Employee,Date,Status,WorkedHours,OffHours,SickHours,PTOHours,HolidayHours,OtherHours"]
for r in rows:
worked = float(r.total or 0) - float(r.breaks or 0)
if worked < 0:
worked = 0.0
if (r.holiday or 0) > 0:
st = "holiday"
elif int(r.off_flag or 0) > 0 or (r.off or 0) > 0:
st = "off"
elif int(r.sick_flag or 0) > 0 or (r.sick or 0) > 0:
st = "sick"
elif (r.pto or 0) > 0:
st = "pto"
elif (r.other or 0) > 0:
st = "other"
elif worked > 0:
st = "worked"
else:
st = "nodata"
out.append(
f"{r[0]},{r.work_date.isoformat()},{st},{worked:.2f},{float(r.off or 0):.2f},"
f"{float(r.sick or 0):.2f},{float(r.pto or 0):.2f},{float(r.holiday or 0):.2f},{float(r.other or 0):.2f}"
)
return PlainTextResponse("\n".join(out), media_type="text/csv")