330 lines
12 KiB
Python
330 lines
12 KiB
Python
from datetime import date, timedelta
|
|
from typing import List, Dict, Tuple, Optional
|
|
|
|
from fastapi import APIRouter, Request, Depends, Query, HTTPException
|
|
from fastapi.responses import PlainTextResponse
|
|
from sqlalchemy import func, and_, case
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .db import get_session
|
|
from .models import Employee, TimeEntry, TimesheetStatus
|
|
|
|
router = APIRouter(prefix="/attendance", tags=["Attendance"])
|
|
|
|
def _daterange(d1: date, d2: date):
|
|
cur = d1
|
|
step = timedelta(days=1)
|
|
while cur <= d2:
|
|
yield cur
|
|
cur += step
|
|
|
|
def _to_date(v: Optional[str], fallback: date) -> date:
|
|
try:
|
|
if v:
|
|
return date.fromisoformat(v)
|
|
except Exception:
|
|
pass
|
|
return fallback
|
|
|
|
@router.get("", name="attendance_builder")
|
|
def attendance_builder_page(
|
|
request: Request,
|
|
start: Optional[str] = Query(None), # YYYY-MM-DD
|
|
end: Optional[str] = Query(None), # YYYY-MM-DD
|
|
employee_id: Optional[str] = Query("all"), # "all" or single id
|
|
include_weekends: int = Query(0),
|
|
db: Session = Depends(get_session),
|
|
):
|
|
# Admin-only
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
# Defaults: current month
|
|
today = date.today()
|
|
default_start = today.replace(day=1)
|
|
start_d = _to_date(start, default_start)
|
|
end_d = _to_date(end, today)
|
|
if end_d < start_d:
|
|
start_d, end_d = end_d, start_d
|
|
|
|
# Employees list
|
|
all_emps = db.query(Employee).order_by(Employee.name.asc()).all()
|
|
|
|
# Determine selected ids from dropdown value
|
|
selected_ids: List[int]
|
|
if not employee_id or employee_id == "all":
|
|
selected_ids = [e.id for e in all_emps]
|
|
else:
|
|
try:
|
|
selected_ids = [int(employee_id)]
|
|
except Exception:
|
|
selected_ids = [e.id for e in all_emps]
|
|
|
|
# Normalize PTO type (trim+lower)
|
|
normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, "")))
|
|
|
|
# Hours and flags per PTO subtype
|
|
off_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("off")
|
|
off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag")
|
|
|
|
sick_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("sick")
|
|
sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag")
|
|
|
|
pto_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("pto")
|
|
|
|
# Aggregate per employee/date (SUBMITTED ONLY)
|
|
q = (
|
|
db.query(
|
|
TimeEntry.employee_id.label("emp_id"),
|
|
TimeEntry.work_date.label("d"),
|
|
func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"),
|
|
func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"),
|
|
pto_hours_expr,
|
|
off_hours_expr,
|
|
off_flag_expr,
|
|
sick_hours_expr,
|
|
sick_flag_expr,
|
|
func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"),
|
|
func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"), # "Other" from bereavement bucket
|
|
)
|
|
.join(
|
|
TimesheetStatus,
|
|
and_(
|
|
TimesheetStatus.timesheet_id == TimeEntry.timesheet_id,
|
|
TimesheetStatus.employee_id == TimeEntry.employee_id,
|
|
TimesheetStatus.status == "submitted",
|
|
),
|
|
)
|
|
.filter(
|
|
TimeEntry.work_date >= start_d,
|
|
TimeEntry.work_date <= end_d,
|
|
TimeEntry.employee_id.in_(selected_ids) if selected_ids else True,
|
|
)
|
|
)
|
|
|
|
rows = (
|
|
q.group_by(TimeEntry.employee_id, TimeEntry.work_date)
|
|
.order_by(TimeEntry.employee_id.asc(), TimeEntry.work_date.asc())
|
|
.all()
|
|
)
|
|
|
|
# Build (emp_id, date) -> summary
|
|
per_day: Dict[Tuple[int, date], Dict[str, float]] = {}
|
|
for r in rows:
|
|
worked = float(r.total or 0) - float(r.breaks or 0)
|
|
if worked < 0:
|
|
worked = 0.0
|
|
per_day[(int(r.emp_id), r.d)] = {
|
|
"worked": worked,
|
|
"pto": float(r.pto or 0),
|
|
"off": float(r.off or 0),
|
|
"off_flag": int(r.off_flag or 0),
|
|
"sick": float(r.sick or 0),
|
|
"sick_flag": int(r.sick_flag or 0),
|
|
"holiday": float(r.holiday or 0),
|
|
"other": float(r.other or 0),
|
|
}
|
|
|
|
days = [d for d in _daterange(start_d, end_d) if include_weekends or d.weekday() < 5]
|
|
|
|
visual = []
|
|
for e in all_emps:
|
|
if selected_ids and e.id not in selected_ids:
|
|
continue
|
|
|
|
# Has any submitted data in range? Used for classifying gaps.
|
|
has_data = any(k[0] == e.id for k in per_day.keys())
|
|
|
|
cells = []
|
|
totals = {
|
|
"worked_days": 0,
|
|
"off_days": 0,
|
|
"sick_days": 0,
|
|
"pto_days": 0,
|
|
"holiday_days": 0,
|
|
"other_days": 0,
|
|
}
|
|
hours = {"worked": 0.0, "off": 0.0, "sick": 0.0, "pto": 0.0, "holiday": 0.0, "other": 0.0}
|
|
|
|
for d in days:
|
|
info = per_day.get((e.id, d))
|
|
weekend = d.weekday() >= 5
|
|
|
|
if info:
|
|
# Precedence: holiday > off > sick > pto > other > worked
|
|
if info["holiday"] > 0:
|
|
st = "holiday"
|
|
totals["holiday_days"] += 1
|
|
hours["holiday"] += info["holiday"]
|
|
elif info["off_flag"] > 0 or info["off"] > 0:
|
|
st = "off"
|
|
totals["off_days"] += 1
|
|
hours["off"] += info["off"]
|
|
elif info["sick_flag"] > 0 or info["sick"] > 0:
|
|
st = "sick"
|
|
totals["sick_days"] += 1
|
|
hours["sick"] += info["sick"]
|
|
elif info["pto"] > 0:
|
|
st = "pto"
|
|
totals["pto_days"] += 1
|
|
hours["pto"] += info["pto"]
|
|
elif info["other"] > 0:
|
|
st = "other"
|
|
totals["other_days"] += 1
|
|
hours["other"] += info["other"]
|
|
elif info["worked"] > 0:
|
|
st = "worked"
|
|
totals["worked_days"] += 1
|
|
hours["worked"] += info["worked"]
|
|
else:
|
|
st = "nodata"
|
|
else:
|
|
if weekend:
|
|
st = "weekend"
|
|
else:
|
|
# Gaps (weekday without submitted row) count as Off when the employee has any submissions in range
|
|
st = "off" if has_data else "nodata"
|
|
if st == "off":
|
|
totals["off_days"] += 1
|
|
# No hours added for inferred Off gap
|
|
|
|
cells.append({"date": d, "status": st})
|
|
|
|
visual.append({
|
|
"employee": e,
|
|
"cells": cells,
|
|
"totals": totals,
|
|
"hours": {k: round(v, 2) for k, v in hours.items()},
|
|
})
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"attendance.html",
|
|
{
|
|
"request": request,
|
|
"employees": all_emps,
|
|
"selected_ids": selected_ids,
|
|
"selected_employee_id": (None if (not employee_id or employee_id == 'all') else int(employee_id)),
|
|
"start": start_d,
|
|
"end": end_d,
|
|
"days": days,
|
|
"include_weekends": include_weekends,
|
|
"visual": visual,
|
|
},
|
|
)
|
|
|
|
@router.get("/export.csv", response_class=PlainTextResponse)
|
|
def attendance_export_csv(
|
|
request: Request,
|
|
start: str = Query(...),
|
|
end: str = Query(...),
|
|
employee_id: Optional[str] = Query("all"),
|
|
include_weekends: int = Query(0),
|
|
db: Session = Depends(get_session),
|
|
):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
start_d = _to_date(start, date.today())
|
|
end_d = _to_date(end, date.today())
|
|
if end_d < start_d:
|
|
start_d, end_d = end_d, start_d
|
|
|
|
ids: List[int] = []
|
|
all_emps = db.query(Employee).order_by(Employee.name.asc()).all()
|
|
if not employee_id or employee_id == "all":
|
|
ids = [e.id for e in all_emps]
|
|
else:
|
|
try:
|
|
ids = [int(employee_id)]
|
|
except Exception:
|
|
ids = [e.id for e in all_emps]
|
|
|
|
normalized_pto = func.lower(func.trim(func.coalesce(TimeEntry.pto_type, "")))
|
|
off_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("off%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("off")
|
|
off_flag_expr = func.coalesce(func.max(case((normalized_pto.like("off%"), 1), else_=0)), 0).label("off_flag")
|
|
|
|
sick_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("sick%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("sick")
|
|
sick_flag_expr = func.coalesce(func.max(case((normalized_pto.like("sick%"), 1), else_=0)), 0).label("sick_flag")
|
|
|
|
pto_hours_expr = func.coalesce(
|
|
func.sum(case((normalized_pto.like("pto%"), func.coalesce(TimeEntry.pto_hours, 0)), else_=0)),
|
|
0,
|
|
).label("pto")
|
|
|
|
q = (
|
|
db.query(
|
|
Employee.name,
|
|
TimeEntry.employee_id,
|
|
TimeEntry.work_date,
|
|
func.coalesce(func.sum(TimeEntry.total_hours), 0).label("total"),
|
|
func.coalesce(func.sum(TimeEntry.break_hours), 0).label("breaks"),
|
|
pto_hours_expr,
|
|
off_hours_expr,
|
|
off_flag_expr,
|
|
sick_hours_expr,
|
|
sick_flag_expr,
|
|
func.coalesce(func.sum(TimeEntry.holiday_hours), 0).label("holiday"),
|
|
func.coalesce(func.sum(TimeEntry.bereavement_hours), 0).label("other"),
|
|
)
|
|
.join(Employee, Employee.id == TimeEntry.employee_id)
|
|
.join(
|
|
TimesheetStatus,
|
|
and_(
|
|
TimesheetStatus.timesheet_id == TimeEntry.timesheet_id,
|
|
TimesheetStatus.employee_id == TimeEntry.employee_id,
|
|
TimesheetStatus.status == "submitted",
|
|
),
|
|
)
|
|
.filter(
|
|
TimeEntry.work_date >= start_d,
|
|
TimeEntry.work_date <= end_d,
|
|
TimeEntry.employee_id.in_(ids) if ids else True,
|
|
)
|
|
)
|
|
|
|
rows = (
|
|
q.group_by(Employee.name, TimeEntry.employee_id, TimeEntry.work_date)
|
|
.order_by(Employee.name.asc(), TimeEntry.work_date.asc())
|
|
.all()
|
|
)
|
|
|
|
out = ["Employee,Date,Status,WorkedHours,OffHours,SickHours,PTOHours,HolidayHours,OtherHours"]
|
|
for r in rows:
|
|
worked = float(r.total or 0) - float(r.breaks or 0)
|
|
if worked < 0:
|
|
worked = 0.0
|
|
if (r.holiday or 0) > 0:
|
|
st = "holiday"
|
|
elif int(r.off_flag or 0) > 0 or (r.off or 0) > 0:
|
|
st = "off"
|
|
elif int(r.sick_flag or 0) > 0 or (r.sick or 0) > 0:
|
|
st = "sick"
|
|
elif (r.pto or 0) > 0:
|
|
st = "pto"
|
|
elif (r.other or 0) > 0:
|
|
st = "other"
|
|
elif worked > 0:
|
|
st = "worked"
|
|
else:
|
|
st = "nodata"
|
|
out.append(
|
|
f"{r[0]},{r.work_date.isoformat()},{st},{worked:.2f},{float(r.off or 0):.2f},"
|
|
f"{float(r.sick or 0):.2f},{float(r.pto or 0):.2f},{float(r.holiday or 0):.2f},{float(r.other or 0):.2f}"
|
|
)
|
|
|
|
return PlainTextResponse("\n".join(out), media_type="text/csv") |