950 lines
38 KiB
Python
950 lines
38 KiB
Python
import os
|
|
import io
|
|
import csv
|
|
import json
|
|
from datetime import datetime, date as date_type, time as time_type, timedelta
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from decimal import Decimal
|
|
|
|
from fastapi import APIRouter, Request, Depends, UploadFile, File, Form, Query, HTTPException
|
|
from fastapi.responses import RedirectResponse, HTMLResponse
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, func, text
|
|
|
|
from .db import get_session
|
|
from .models import Base, Employee, TimeEntry, TimesheetPeriod
|
|
from .utils import enumerate_timesheets_global, D, q2
|
|
from .process_excel import (
|
|
detect_header_map,
|
|
parse_date,
|
|
parse_datetime_value,
|
|
parse_time_value,
|
|
safe_decimal,
|
|
)
|
|
|
|
router = APIRouter(prefix="/import/department", tags=["Department Import"])
|
|
|
|
class ImportBatch(Base):
|
|
__tablename__ = "import_batches"
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
timesheet_id = Column(Integer, index=True, nullable=False)
|
|
source_name = Column(String(255), nullable=True)
|
|
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
|
|
|
|
class ImportBatchItem(Base):
|
|
__tablename__ = "import_batch_items"
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
batch_id = Column(Integer, ForeignKey("import_batches.id"), nullable=False, index=True)
|
|
time_entry_id = Column(Integer, ForeignKey("time_entries.id"), nullable=False, index=True)
|
|
|
|
# -------------------------
|
|
# Mapping helpers
|
|
# -------------------------
|
|
|
|
# Hide "Hours Worked Minus Break" from the mapping UI to avoid confusion; backend still auto-detects it.
|
|
TARGET_FIELDS: List[Tuple[str, str, bool]] = [
|
|
("employee", "Employee Name", True),
|
|
("work_date", "Work Date", True),
|
|
("clock_in", "Clock In", False),
|
|
("clock_out", "Clock Out", False),
|
|
("break_hours", "Break Hours", False),
|
|
("total_hours", "Hours Worked", False),
|
|
# ("total_minus_break", "Hours Worked Minus Break", False), # HIDDEN FROM UI
|
|
("pto_hours", "PTO Hours", False),
|
|
("pto_type", "PTO Type", False),
|
|
("holiday_hours", "Holiday Hours", False),
|
|
("bereavement_hours", "Bereavement/Other Hours", False),
|
|
]
|
|
|
|
def _store_upload_file(data: bytes, original_name: str) -> str:
|
|
os.makedirs("uploads", exist_ok=True)
|
|
_, ext = os.path.splitext(original_name)
|
|
slug = f"dept-map-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{os.getpid()}{ext or ''}"
|
|
path = os.path.join("uploads", slug)
|
|
with open(path, "wb") as f:
|
|
f.write(data)
|
|
return path
|
|
|
|
def _list_sheets_with_headers(xlsx_path: str) -> List[Dict[str, Any]]:
|
|
import openpyxl
|
|
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
|
|
out: List[Dict[str, Any]] = []
|
|
for ws in wb.worksheets:
|
|
header_row_idx = None
|
|
header_vals = []
|
|
auto_map = {}
|
|
for r in range(1, min(ws.max_row, 50) + 1):
|
|
vals = [cell.value for cell in ws[r]]
|
|
header_vals = vals
|
|
auto_map = detect_header_map(vals)
|
|
if auto_map:
|
|
header_row_idx = r
|
|
break
|
|
out.append({
|
|
"sheet_name": ws.title,
|
|
"header_row_idx": header_row_idx,
|
|
"header_vals": header_vals,
|
|
"auto_map": auto_map,
|
|
})
|
|
return out
|
|
|
|
def _default_sheet_name(sheets_info: List[Dict[str, Any]]) -> Optional[str]:
|
|
names = [s["sheet_name"] for s in sheets_info] if sheets_info else []
|
|
for nm in names:
|
|
if nm.lower().strip() == "final time clock report":
|
|
return nm
|
|
for s in (sheets_info or []):
|
|
if s.get("auto_map"):
|
|
return s["sheet_name"]
|
|
return names[0] if names else None
|
|
|
|
# Helpers
|
|
def _hours_from_value(value) -> Decimal:
|
|
if value is None or value == "":
|
|
return D(0)
|
|
if isinstance(value, (int, float)):
|
|
val = float(value)
|
|
if val < 0:
|
|
val = 0.0
|
|
hours = val * 24.0 if val <= 1.0 else val
|
|
return q2(D(hours))
|
|
if isinstance(value, datetime):
|
|
t = value.time()
|
|
return q2(D(t.hour) + D(t.minute) / D(60) + D(t.second) / D(3600))
|
|
if isinstance(value, time_type):
|
|
return q2(D(value.hour) + D(value.minute) / D(60) + D(value.second) / D(3600))
|
|
if isinstance(value, str):
|
|
v = value.strip()
|
|
if ":" in v:
|
|
try:
|
|
parts = [int(p) for p in v.split(":")]
|
|
if len(parts) == 2:
|
|
h, m = parts
|
|
return q2(D(h) + D(m) / D(60))
|
|
elif len(parts) == 3:
|
|
h, m, s = parts
|
|
return q2(D(h) + D(m) / D(60) + D(s) / D(3600))
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return q2(D(v))
|
|
except Exception:
|
|
return D(0)
|
|
return D(0)
|
|
|
|
def _find_header_index(header_vals: List[Any], include: List[str]) -> Optional[int]:
|
|
hn = [str(v).strip().lower() if v is not None else "" for v in header_vals]
|
|
for i, h in enumerate(hn):
|
|
if all(tok in h for tok in include):
|
|
return i
|
|
return None
|
|
|
|
def _parse_rows_with_mapping(xlsx_path: str, sheet_name: str, header_row_idx: int, mapping: Dict[str, Optional[int]]) -> List[Dict]:
|
|
import openpyxl
|
|
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
|
|
ws = wb[sheet_name]
|
|
rows_out: List[Dict] = []
|
|
|
|
# Detect helpful headers (even if hidden in UI)
|
|
header_vals = [cell.value for cell in ws[header_row_idx]]
|
|
idx_break_start = _find_header_index(header_vals, ["break", "start", "time"])
|
|
idx_break_end = _find_header_index(header_vals, ["break", "end", "time"])
|
|
idx_total_minus_break = _find_header_index(header_vals, ["hours", "worked", "minus", "break"])
|
|
idx_shift_start = _find_header_index(header_vals, ["shift", "start", "time"])
|
|
idx_shift_end = _find_header_index(header_vals, ["shift", "end", "time"])
|
|
idx_hours_scheduled = _find_header_index(header_vals, ["hours", "scheduled"])
|
|
|
|
max_row = ws.max_row or 0
|
|
for r in range((header_row_idx or 1) + 1, max_row + 1):
|
|
vals = [cell.value for cell in ws[r]]
|
|
|
|
def getv(key: str):
|
|
idx = mapping.get(key)
|
|
if idx is None:
|
|
return None
|
|
return vals[idx] if idx < len(vals) else None
|
|
|
|
def get_header(idx: Optional[int]):
|
|
if idx is None:
|
|
return None
|
|
return vals[idx] if idx < len(vals) else None
|
|
|
|
emp_raw = getv("employee")
|
|
date_raw = getv("work_date")
|
|
if not emp_raw or not date_raw:
|
|
continue
|
|
|
|
employee_name = str(emp_raw).strip()
|
|
work_date = parse_date(date_raw)
|
|
if not work_date:
|
|
continue
|
|
|
|
clock_in_dt = None
|
|
clock_out_dt = None
|
|
|
|
ci_raw = getv("clock_in")
|
|
co_raw = getv("clock_out")
|
|
if ci_raw is not None:
|
|
clock_in_dt = parse_datetime_value(ci_raw)
|
|
if not clock_in_dt:
|
|
t = parse_time_value(ci_raw)
|
|
clock_in_dt = datetime.combine(work_date, t) if t else None
|
|
if co_raw is not None:
|
|
clock_out_dt = parse_datetime_value(co_raw)
|
|
if not clock_out_dt:
|
|
t = parse_time_value(co_raw)
|
|
clock_out_dt = datetime.combine(work_date, t) if t else None
|
|
if clock_in_dt and clock_out_dt and clock_out_dt <= clock_in_dt:
|
|
clock_out_dt = clock_out_dt + timedelta(days=1)
|
|
|
|
# Break hours from mapped column
|
|
break_hours_taken = safe_decimal(getv("break_hours"), D(0))
|
|
|
|
# If blank/zero, derive from Break Start/End
|
|
if (break_hours_taken is None) or (break_hours_taken == D(0)):
|
|
bs_raw = get_header(idx_break_start)
|
|
be_raw = get_header(idx_break_end)
|
|
start_dt = None
|
|
end_dt = None
|
|
if bs_raw is not None:
|
|
start_dt = parse_datetime_value(bs_raw)
|
|
if not start_dt:
|
|
bt = parse_time_value(bs_raw)
|
|
start_dt = datetime.combine(work_date, bt) if bt else None
|
|
if be_raw is not None:
|
|
end_dt = parse_datetime_value(be_raw)
|
|
if not end_dt:
|
|
et = parse_time_value(be_raw)
|
|
end_dt = datetime.combine(work_date, et) if et else None
|
|
if start_dt and end_dt:
|
|
if end_dt <= start_dt:
|
|
end_dt = end_dt + timedelta(days=1)
|
|
break_hours_taken = q2(D((end_dt - start_dt).total_seconds()) / D(3600))
|
|
|
|
# Total hours ("Hours Worked")
|
|
total_raw = getv("total_hours") if mapping.get("total_hours") is not None else None
|
|
total_from_sheet = safe_decimal(total_raw) if (total_raw not in (None, "")) else None
|
|
|
|
# Fallback from "Hours Worked Minus Break" (hidden in UI but auto-detected)
|
|
alt_raw = getv("total_minus_break") if mapping.get("total_minus_break") is not None else get_header(idx_total_minus_break)
|
|
alt_total_minus_break = safe_decimal(alt_raw) if (alt_raw not in (None, "")) else None
|
|
|
|
# Scheduled hours: use explicit "Hours Scheduled" or derive from Shift Start/End
|
|
scheduled_hours = None
|
|
hs_raw = get_header(idx_hours_scheduled)
|
|
if hs_raw not in (None, ""):
|
|
scheduled_hours = safe_decimal(hs_raw, D(0))
|
|
else:
|
|
ss_raw = get_header(idx_shift_start)
|
|
se_raw = get_header(idx_shift_end)
|
|
ss_dt = None
|
|
se_dt = None
|
|
if ss_raw is not None:
|
|
ss_dt = parse_datetime_value(ss_raw)
|
|
if not ss_dt:
|
|
st = parse_time_value(ss_raw)
|
|
ss_dt = datetime.combine(work_date, st) if st else None
|
|
if se_raw is not None:
|
|
se_dt = parse_datetime_value(se_raw)
|
|
if not se_dt:
|
|
et = parse_time_value(se_raw)
|
|
se_dt = datetime.combine(work_date, et) if et else None
|
|
if ss_dt and se_dt:
|
|
if se_dt <= ss_dt:
|
|
se_dt = se_dt + timedelta(days=1)
|
|
scheduled_hours = q2(D((se_dt - ss_dt).total_seconds()) / D(3600))
|
|
|
|
pto_hours = safe_decimal(getv("pto_hours"), D(0))
|
|
pto_type_val = getv("pto_type")
|
|
pto_type = (str(pto_type_val).strip() if pto_type_val is not None and not isinstance(pto_type_val, (int, float, datetime)) else None)
|
|
|
|
holiday_hours = safe_decimal(getv("holiday_hours"), D(0))
|
|
bereavement_hours = safe_decimal(getv("bereavement_hours"), D(0))
|
|
|
|
# Determine Total Hours (Hours Worked)
|
|
if total_from_sheet is None:
|
|
if alt_total_minus_break is not None and break_hours_taken is not None:
|
|
total_from_sheet = q2(alt_total_minus_break + break_hours_taken)
|
|
else:
|
|
if clock_in_dt and clock_out_dt:
|
|
total_from_sheet = q2(D((clock_out_dt - clock_in_dt).total_seconds()) / D(3600))
|
|
else:
|
|
total_from_sheet = D(0)
|
|
else:
|
|
total_from_sheet = q2(total_from_sheet)
|
|
|
|
if pto_hours > D(0):
|
|
pto_type = None
|
|
|
|
rows_out.append({
|
|
"employee_name": employee_name,
|
|
"work_date": work_date,
|
|
"clock_in": clock_in_dt,
|
|
"clock_out": clock_out_dt,
|
|
"break_hours": q2(break_hours_taken or D(0)),
|
|
"total_hours": q2(total_from_sheet),
|
|
"scheduled_hours": q2(scheduled_hours or D(0)),
|
|
"pto_hours": q2(pto_hours),
|
|
"pto_type": pto_type,
|
|
"holiday_hours": q2(holiday_hours),
|
|
"bereavement_hours": q2(bereavement_hours),
|
|
})
|
|
|
|
return rows_out
|
|
|
|
def _csv_headers_and_map(csv_bytes: bytes) -> Tuple[List[str], Dict[str, int]]:
|
|
text_stream = io.StringIO(csv_bytes.decode("utf-8", errors="replace"))
|
|
reader = csv.reader(text_stream)
|
|
rows = list(reader)
|
|
if not rows:
|
|
return [], {}
|
|
header_vals = rows[0]
|
|
auto_map = detect_header_map(header_vals)
|
|
return header_vals, auto_map
|
|
|
|
def _parse_csv_with_mapping(csv_path: str, mapping: Dict[str, Optional[int]]) -> List[Dict]:
|
|
rows_out: List[Dict] = []
|
|
with open(csv_path, "r", encoding="utf-8", errors="replace") as f:
|
|
reader = csv.reader(f)
|
|
all_rows = list(reader)
|
|
if not all_rows:
|
|
return rows_out
|
|
|
|
header_vals = all_rows[0]
|
|
idx_break_start = _find_header_index(header_vals, ["break", "start", "time"])
|
|
idx_break_end = _find_header_index(header_vals, ["break", "end", "time"])
|
|
idx_total_minus_break = _find_header_index(header_vals, ["hours", "worked", "minus", "break"])
|
|
idx_shift_start = _find_header_index(header_vals, ["shift", "start", "time"])
|
|
idx_shift_end = _find_header_index(header_vals, ["shift", "end", "time"])
|
|
idx_hours_scheduled = _find_header_index(header_vals, ["hours", "scheduled"])
|
|
|
|
body = all_rows[1:]
|
|
for vals in body:
|
|
def getv(key: str):
|
|
idx = mapping.get(key)
|
|
if idx is None:
|
|
return None
|
|
if idx < 0 or idx >= len(vals):
|
|
return None
|
|
return vals[idx]
|
|
|
|
def get_header(idx: Optional[int]):
|
|
if idx is None or idx < 0 or idx >= len(vals):
|
|
return None
|
|
return vals[idx]
|
|
|
|
emp_raw = getv("employee")
|
|
date_raw = getv("work_date")
|
|
if not emp_raw or not date_raw:
|
|
continue
|
|
|
|
employee_name = str(emp_raw).strip()
|
|
work_date = parse_date(date_raw)
|
|
if not work_date:
|
|
continue
|
|
|
|
clock_in_dt = None
|
|
clock_out_dt = None
|
|
|
|
ci_raw = getv("clock_in")
|
|
co_raw = getv("clock_out")
|
|
if ci_raw is not None:
|
|
clock_in_dt = parse_datetime_value(ci_raw)
|
|
if not clock_in_dt:
|
|
t = parse_time_value(ci_raw)
|
|
clock_in_dt = datetime.combine(work_date, t) if t else None
|
|
if co_raw is not None:
|
|
clock_out_dt = parse_datetime_value(co_raw)
|
|
if not clock_out_dt:
|
|
t = parse_time_value(co_raw)
|
|
clock_out_dt = datetime.combine(work_date, t) if t else None
|
|
if clock_in_dt and clock_out_dt and clock_out_dt <= clock_in_dt:
|
|
clock_out_dt = clock_out_dt + timedelta(days=1)
|
|
|
|
break_hours_taken = safe_decimal(getv("break_hours"), D(0))
|
|
|
|
if (break_hours_taken is None) or (break_hours_taken == D(0)):
|
|
bs_raw = get_header(idx_break_start)
|
|
be_raw = get_header(idx_break_end)
|
|
start_dt = None
|
|
end_dt = None
|
|
if bs_raw is not None:
|
|
start_dt = parse_datetime_value(bs_raw)
|
|
if not start_dt:
|
|
bt = parse_time_value(bs_raw)
|
|
start_dt = datetime.combine(work_date, bt) if bt else None
|
|
if be_raw is not None:
|
|
end_dt = parse_datetime_value(be_raw)
|
|
if not end_dt:
|
|
et = parse_time_value(be_raw)
|
|
end_dt = datetime.combine(work_date, et) if et else None
|
|
if start_dt and end_dt:
|
|
if end_dt <= start_dt:
|
|
end_dt = end_dt + timedelta(days=1)
|
|
break_hours_taken = q2(D((end_dt - start_dt).total_seconds()) / D(3600))
|
|
|
|
total_raw = getv("total_hours") if mapping.get("total_hours") is not None else None
|
|
total_from_sheet = safe_decimal(total_raw) if (total_raw not in (None, "")) else None
|
|
|
|
alt_raw = getv("total_minus_break") if mapping.get("total_minus_break") is not None else get_header(idx_total_minus_break)
|
|
alt_total_minus_break = safe_decimal(alt_raw) if (alt_raw not in (None, "")) else None
|
|
|
|
# Scheduled hours
|
|
scheduled_hours = None
|
|
hs_raw = get_header(idx_hours_scheduled)
|
|
if hs_raw not in (None, ""):
|
|
scheduled_hours = safe_decimal(hs_raw, D(0))
|
|
else:
|
|
ss_raw = get_header(idx_shift_start)
|
|
se_raw = get_header(idx_shift_end)
|
|
ss_dt = None
|
|
se_dt = None
|
|
if ss_raw is not None:
|
|
ss_dt = parse_datetime_value(ss_raw)
|
|
if not ss_dt:
|
|
st = parse_time_value(ss_raw)
|
|
ss_dt = datetime.combine(work_date, st) if st else None
|
|
if se_raw is not None:
|
|
se_dt = parse_datetime_value(se_raw)
|
|
if not se_dt:
|
|
et = parse_time_value(se_raw)
|
|
se_dt = datetime.combine(work_date, et) if et else None
|
|
if ss_dt and se_dt:
|
|
if se_dt <= ss_dt:
|
|
se_dt = se_dt + timedelta(days=1)
|
|
scheduled_hours = q2(D((se_dt - ss_dt).total_seconds()) / D(3600))
|
|
|
|
pto_hours = safe_decimal(getv("pto_hours"), D(0))
|
|
pto_type_val = getv("pto_type")
|
|
pto_type = (str(pto_type_val).strip() if pto_type_val is not None else None)
|
|
|
|
holiday_hours = safe_decimal(getv("holiday_hours"), D(0))
|
|
bereavement_hours = safe_decimal(getv("bereavement_hours"), D(0))
|
|
|
|
if total_from_sheet is None:
|
|
if alt_total_minus_break is not None and break_hours_taken is not None:
|
|
total_from_sheet = q2(alt_total_minus_break + break_hours_taken)
|
|
else:
|
|
if clock_in_dt and clock_out_dt:
|
|
total_from_sheet = q2(D((clock_out_dt - clock_in_dt).total_seconds()) / D(3600))
|
|
else:
|
|
total_from_sheet = D(0)
|
|
else:
|
|
total_from_sheet = q2(total_from_sheet)
|
|
|
|
if pto_hours > D(0):
|
|
pto_type = None
|
|
|
|
rows_out.append({
|
|
"employee_name": employee_name,
|
|
"work_date": work_date,
|
|
"clock_in": clock_in_dt,
|
|
"clock_out": clock_out_dt,
|
|
"break_hours": q2(break_hours_taken or D(0)),
|
|
"total_hours": q2(total_from_sheet),
|
|
"scheduled_hours": q2(scheduled_hours or D(0)),
|
|
"pto_hours": q2(pto_hours),
|
|
"pto_type": pto_type,
|
|
"holiday_hours": q2(holiday_hours),
|
|
"bereavement_hours": q2(bereavement_hours),
|
|
})
|
|
return rows_out
|
|
|
|
# -------------------------
|
|
# Routes
|
|
# -------------------------
|
|
|
|
def _active_timesheet(db: Session) -> Optional[TimesheetPeriod]:
|
|
sheets = enumerate_timesheets_global(db)
|
|
if not sheets:
|
|
return None
|
|
tid = sheets[-1][0]
|
|
return db.query(TimesheetPeriod).get(tid)
|
|
|
|
def _within_period(d: date_type, ts: TimesheetPeriod) -> bool:
|
|
return ts.period_start <= d <= ts.period_end
|
|
|
|
def _dedup_exists(db: Session, employee_id: int, timesheet_id: int, work_date: date_type, clock_in: Optional[datetime], clock_out: Optional[datetime]) -> bool:
|
|
q = db.query(TimeEntry).filter(
|
|
TimeEntry.employee_id == employee_id,
|
|
TimeEntry.timesheet_id == timesheet_id,
|
|
TimeEntry.work_date == work_date,
|
|
)
|
|
for r in q.all():
|
|
if (r.clock_in or None) == (clock_in or None) and (r.clock_out or None) == (clock_out or None):
|
|
return True
|
|
return False
|
|
|
|
@router.get("", response_class=HTMLResponse)
|
|
def importer_home(request: Request, db: Session = Depends(get_session), timesheet_id: Optional[int] = Query(None)):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
sheets = enumerate_timesheets_global(db)
|
|
period_options = [{"timesheet_id": tid, "display": (name or f"{ps}..{pe}")} for tid, ps, pe, name in sheets]
|
|
active_ts = db.query(TimesheetPeriod).get(timesheet_id) if timesheet_id else _active_timesheet(db)
|
|
return request.app.state.templates.TemplateResponse(
|
|
"dept_importer_upload.html",
|
|
{"request": request, "period_options": period_options, "active_ts": active_ts.id if active_ts else None},
|
|
)
|
|
|
|
@router.post("/upload", response_class=HTMLResponse)
|
|
async def importer_upload(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
timesheet_id: int = Form(...),
|
|
restrict_to_period: int = Form(1),
|
|
db: Session = Depends(get_session),
|
|
):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
ts = db.query(TimesheetPeriod).get(timesheet_id)
|
|
if not ts:
|
|
raise HTTPException(status_code=404, detail="Time Period not found")
|
|
|
|
data = await file.read()
|
|
ext = os.path.splitext(file.filename.lower())[1]
|
|
if ext not in (".xlsx", ".xlsm", ".xls", ".csv", ".txt"):
|
|
raise HTTPException(status_code=400, detail="Unsupported file type. Please upload XLSX/XLS or CSV/TXT.")
|
|
uploaded_path = _store_upload_file(data, file.filename)
|
|
|
|
ctx: Dict[str, Any] = {
|
|
"kind": "excel" if ext in (".xlsx", ".xlsm", ".xls") else "csv",
|
|
"path": uploaded_path,
|
|
"timesheet_id": timesheet_id,
|
|
"restrict_to_period": int(restrict_to_period),
|
|
"mode": "department",
|
|
}
|
|
|
|
if ctx["kind"] == "excel":
|
|
sheets_info = _list_sheets_with_headers(uploaded_path)
|
|
if not sheets_info:
|
|
raise HTTPException(status_code=400, detail="No sheets found in workbook.")
|
|
default_sheet = _default_sheet_name(sheets_info) or sheets_info[0]["sheet_name"]
|
|
ctx["sheets_info"] = sheets_info
|
|
ctx["default_sheet"] = default_sheet
|
|
else:
|
|
header_vals, auto_map = _csv_headers_and_map(data)
|
|
if not header_vals:
|
|
raise HTTPException(status_code=400, detail="Empty CSV")
|
|
ctx["sheets_info"] = [{
|
|
"sheet_name": "CSV",
|
|
"header_row_idx": 1,
|
|
"header_vals": header_vals,
|
|
"auto_map": auto_map,
|
|
}]
|
|
ctx["default_sheet"] = "CSV"
|
|
|
|
os.makedirs("uploads", exist_ok=True)
|
|
map_slug = f"dept-mapctx-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{os.getpid()}.json"
|
|
map_path = os.path.join("uploads", map_slug)
|
|
with open(map_path, "w", encoding="utf-8") as f:
|
|
json.dump(ctx, f)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"dept_importer_map.html",
|
|
{
|
|
"request": request,
|
|
"map_slug": map_slug,
|
|
"timesheet_id": timesheet_id,
|
|
"restrict_to_period": int(restrict_to_period),
|
|
"sheets_info": ctx["sheets_info"],
|
|
"sheet_name": ctx["default_sheet"],
|
|
"target_fields": TARGET_FIELDS,
|
|
"kind": ctx["kind"],
|
|
"mode": ctx["mode"],
|
|
},
|
|
)
|
|
|
|
@router.get("/start-initial", response_class=HTMLResponse)
|
|
def start_initial_mapping(
|
|
request: Request,
|
|
timesheet_id: int = Query(...),
|
|
src: str = Query(...),
|
|
):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
if not os.path.exists(src):
|
|
raise HTTPException(status_code=400, detail="Uploaded file not found; please re-upload.")
|
|
|
|
_, ext = os.path.splitext(src.lower())
|
|
if ext not in (".xlsx", ".xlsm", ".xls", ".csv", ".txt"):
|
|
raise HTTPException(status_code=400, detail="Unsupported file type.")
|
|
|
|
ctx: Dict[str, Any] = {
|
|
"kind": "excel" if ext in (".xlsx", ".xlsm", ".xls") else "csv",
|
|
"path": src,
|
|
"timesheet_id": timesheet_id,
|
|
"restrict_to_period": 0,
|
|
"mode": "initial",
|
|
}
|
|
|
|
if ctx["kind"] == "excel":
|
|
sheets_info = _list_sheets_with_headers(src)
|
|
if not sheets_info:
|
|
raise HTTPException(status_code=400, detail="No sheets found in workbook.")
|
|
default_sheet = _default_sheet_name(sheets_info) or sheets_info[0]["sheet_name"]
|
|
ctx["sheets_info"] = sheets_info
|
|
ctx["default_sheet"] = default_sheet
|
|
else:
|
|
with open(src, "rb") as f:
|
|
data = f.read()
|
|
header_vals, auto_map = _csv_headers_and_map(data)
|
|
if not header_vals:
|
|
raise HTTPException(status_code=400, detail="Empty CSV")
|
|
ctx["sheets_info"] = [{
|
|
"sheet_name": "CSV",
|
|
"header_row_idx": 1,
|
|
"header_vals": header_vals,
|
|
"auto_map": auto_map,
|
|
}]
|
|
ctx["default_sheet"] = "CSV"
|
|
|
|
os.makedirs("uploads", exist_ok=True)
|
|
map_slug = f"dept-mapctx-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{os.getpid()}.json"
|
|
map_path = os.path.join("uploads", map_slug)
|
|
with open(map_path, "w", encoding="utf-8") as f:
|
|
json.dump(ctx, f)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"dept_importer_map.html",
|
|
{
|
|
"request": request,
|
|
"map_slug": map_slug,
|
|
"timesheet_id": timesheet_id,
|
|
"restrict_to_period": 0,
|
|
"sheets_info": ctx["sheets_info"],
|
|
"sheet_name": ctx["default_sheet"],
|
|
"target_fields": TARGET_FIELDS,
|
|
"kind": ctx["kind"],
|
|
"mode": ctx["mode"],
|
|
},
|
|
)
|
|
|
|
@router.get("/map", response_class=HTMLResponse)
|
|
def importer_map_get(
|
|
request: Request,
|
|
map_slug: str = Query(...),
|
|
sheet_name: Optional[str] = Query(None),
|
|
):
|
|
path = os.path.join("uploads", map_slug)
|
|
if not os.path.exists(path):
|
|
raise HTTPException(status_code=400, detail="Mapping context expired. Please re-upload.")
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
ctx = json.load(f)
|
|
|
|
sheets_info = ctx.get("sheets_info") or []
|
|
timesheet_id = ctx.get("timesheet_id")
|
|
restrict_to_period = int(ctx.get("restrict_to_period") or 1)
|
|
selected = sheet_name or ctx.get("default_sheet")
|
|
return request.app.state.templates.TemplateResponse(
|
|
"dept_importer_map.html",
|
|
{
|
|
"request": request,
|
|
"map_slug": map_slug,
|
|
"timesheet_id": timesheet_id,
|
|
"restrict_to_period": restrict_to_period,
|
|
"sheets_info": sheets_info,
|
|
"sheet_name": selected,
|
|
"target_fields": TARGET_FIELDS,
|
|
"kind": ctx.get("kind") or "excel",
|
|
"mode": ctx.get("mode") or "department",
|
|
},
|
|
)
|
|
|
|
@router.post("/preview-mapped", response_class=HTMLResponse)
|
|
async def importer_preview_mapped(
|
|
request: Request,
|
|
map_slug: str = Form(...),
|
|
timesheet_id: int = Form(...),
|
|
sheet_name: str = Form(...),
|
|
restrict_to_period: int = Form(1),
|
|
mode: str = Form("department"),
|
|
db: Session = Depends(get_session),
|
|
employee: Optional[str] = Form(None),
|
|
work_date: Optional[str] = Form(None),
|
|
clock_in: Optional[str] = Form(None),
|
|
clock_out: Optional[str] = Form(None),
|
|
break_hours: Optional[str] = Form(None),
|
|
total_hours: Optional[str] = Form(None),
|
|
total_minus_break: Optional[str] = Form(None), # hidden in UI; may be None
|
|
pto_hours: Optional[str] = Form(None),
|
|
pto_type: Optional[str] = Form(None),
|
|
holiday_hours: Optional[str] = Form(None),
|
|
bereavement_hours: Optional[str] = Form(None),
|
|
):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
ctx_path = os.path.join("uploads", map_slug)
|
|
if not os.path.exists(ctx_path):
|
|
raise HTTPException(status_code=400, detail="Mapping context expired. Please re-upload.")
|
|
|
|
with open(ctx_path, "r", encoding="utf-8") as f:
|
|
ctx = json.load(f)
|
|
|
|
kind = ctx.get("kind") or "excel"
|
|
src_path = ctx.get("path")
|
|
sheets_info = ctx.get("sheets_info") or []
|
|
sheet_info = next((s for s in sheets_info if s.get("sheet_name") == sheet_name), None)
|
|
if not sheet_info or not sheet_info.get("header_row_idx"):
|
|
raise HTTPException(status_code=400, detail="Selected sheet has no recognizable header row.")
|
|
|
|
def to_idx(v: Optional[str]) -> Optional[int]:
|
|
if v is None:
|
|
return None
|
|
v = v.strip()
|
|
if not v or v.lower() == "none":
|
|
return None
|
|
try:
|
|
return int(v)
|
|
except Exception:
|
|
return None
|
|
|
|
mapping = {
|
|
"employee": to_idx(employee),
|
|
"work_date": to_idx(work_date),
|
|
"clock_in": to_idx(clock_in),
|
|
"clock_out": to_idx(clock_out),
|
|
"break_hours": to_idx(break_hours),
|
|
"total_hours": to_idx(total_hours),
|
|
"total_minus_break": to_idx(total_minus_break), # may be None
|
|
"pto_hours": to_idx(pto_hours),
|
|
"pto_type": to_idx(pto_type),
|
|
"holiday_hours": to_idx(holiday_hours),
|
|
"bereavement_hours": to_idx(bereavement_hours),
|
|
}
|
|
|
|
if mapping["employee"] is None or mapping["work_date"] is None:
|
|
raise HTTPException(status_code=400, detail="Please select both Employee and Work Date columns.")
|
|
|
|
if kind == "excel":
|
|
norm_rows = _parse_rows_with_mapping(src_path, sheet_name, int(sheet_info["header_row_idx"]), mapping)
|
|
else:
|
|
norm_rows = _parse_csv_with_mapping(src_path, mapping)
|
|
|
|
ts = db.query(TimesheetPeriod).get(timesheet_id)
|
|
if not ts:
|
|
raise HTTPException(status_code=404, detail="Time Period not found")
|
|
|
|
total_before_filter = len(norm_rows)
|
|
|
|
filtered_rows = norm_rows
|
|
if restrict_to_period:
|
|
filtered_rows = [r for r in norm_rows if _within_period(r["work_date"], ts)]
|
|
if not filtered_rows and total_before_filter > 0:
|
|
filtered_rows = norm_rows
|
|
|
|
by_emp: Dict[str, List[Dict]] = {}
|
|
for r in filtered_rows:
|
|
by_emp.setdefault(r["employee_name"], []).append(r)
|
|
|
|
preview = []
|
|
for name, rows in sorted(by_emp.items(), key=lambda kv: kv[0].lower()):
|
|
emp = db.query(Employee).filter(func.lower(Employee.name) == func.lower(name)).first()
|
|
has_any_in_period = False
|
|
if emp:
|
|
has_any_in_period = db.query(TimeEntry).filter(TimeEntry.timesheet_id == timesheet_id, TimeEntry.employee_id == emp.id).first() is not None
|
|
preview.append({
|
|
"employee_name": name,
|
|
"status": ("Existing in period" if has_any_in_period else ("Existing employee" if emp else "New employee")),
|
|
"existing_employee_id": emp.id if emp else None,
|
|
"row_count": len(rows),
|
|
})
|
|
|
|
os.makedirs("uploads", exist_ok=True)
|
|
slug = f"dept-import-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{os.getpid()}.json"
|
|
path = os.path.join("uploads", slug)
|
|
def enc(o):
|
|
if isinstance(o, (datetime, date_type)):
|
|
return o.isoformat()
|
|
return str(o)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump({"timesheet_id": timesheet_id, "rows": filtered_rows}, f, default=enc)
|
|
|
|
return request.app.state.templates.TemplateResponse(
|
|
"dept_importer_preview.html",
|
|
{"request": request, "slug": slug, "timesheet_id": timesheet_id, "preview": preview, "mode": mode},
|
|
)
|
|
|
|
@router.post("/execute")
|
|
async def importer_execute(
|
|
request: Request,
|
|
slug: str = Form(...),
|
|
timesheet_id: int = Form(...),
|
|
selected_names: str = Form(...),
|
|
mode: str = Form("department"),
|
|
db: Session = Depends(get_session),
|
|
):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
path = os.path.join("uploads", slug)
|
|
if not os.path.exists(path):
|
|
raise HTTPException(status_code=400, detail="Import context expired. Please re-upload.")
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
payload = json.load(f)
|
|
|
|
if int(payload.get("timesheet_id")) != int(timesheet_id):
|
|
raise HTTPException(status_code=400, detail="Timesheet mismatch. Please re-upload.")
|
|
|
|
ts = db.query(TimesheetPeriod).get(timesheet_id)
|
|
if not ts:
|
|
raise HTTPException(status_code=404, detail="Time Period not found")
|
|
|
|
def conv(r: Dict) -> Dict:
|
|
ci = r.get("clock_in")
|
|
co = r.get("clock_out")
|
|
wd = r.get("work_date")
|
|
return {
|
|
"employee_name": r.get("employee_name"),
|
|
"work_date": date_type.fromisoformat(wd) if isinstance(wd, str) else wd,
|
|
"clock_in": (datetime.fromisoformat(ci) if isinstance(ci, str) else ci) if ci else None,
|
|
"clock_out": (datetime.fromisoformat(co) if isinstance(co, str) else co) if co else None,
|
|
"break_hours": float(r.get("break_hours") or 0),
|
|
"total_hours": float(r.get("total_hours") or 0),
|
|
"scheduled_hours": float(r.get("scheduled_hours") or 0),
|
|
"pto_hours": float(r.get("pto_hours") or 0),
|
|
"pto_type": (r.get("pto_type") or None),
|
|
"holiday_hours": float(r.get("holiday_hours") or 0),
|
|
"bereavement_hours": float(r.get("bereavement_hours") or 0),
|
|
}
|
|
|
|
rows = [conv(r) for r in (payload.get("rows") or [])]
|
|
|
|
selected_set = {s.strip() for s in (selected_names or "").split(",") if s.strip()}
|
|
rows = [r for r in rows if r["employee_name"] in selected_set]
|
|
|
|
week_rows = db.execute(text("SELECT day_date, week_number FROM week_assignments WHERE timesheet_id = :tid"), {"tid": timesheet_id}).fetchall()
|
|
week_map: Dict[date_type, int] = {row[0]: int(row[1]) for row in week_rows}
|
|
|
|
batch = ImportBatch(timesheet_id=timesheet_id, source_name=f"Department import {slug}", created_at=datetime.utcnow())
|
|
db.add(batch)
|
|
db.flush()
|
|
|
|
inserted_count = 0
|
|
by_emp: Dict[str, List[Dict]] = {}
|
|
for r in rows:
|
|
by_emp.setdefault(r["employee_name"], []).append(r)
|
|
|
|
min_date = None
|
|
max_date = None
|
|
|
|
for name, erows in by_emp.items():
|
|
emp = db.query(Employee).filter(func.lower(Employee.name) == func.lower(name)).first()
|
|
if not emp:
|
|
emp = Employee(name=name)
|
|
db.add(emp)
|
|
db.flush()
|
|
|
|
for r in erows:
|
|
wd: date_type = r["work_date"]
|
|
if min_date is None or wd < min_date:
|
|
min_date = wd
|
|
if max_date is None or wd > max_date:
|
|
max_date = wd
|
|
|
|
if wd not in week_map and mode == "department":
|
|
continue
|
|
|
|
ci = r["clock_in"]
|
|
co = r["clock_out"]
|
|
|
|
# Keep rows that indicate "scheduled but no show": scheduled_hours > 0 and no clocks or special hours
|
|
if (ci is None and co is None) and (r["pto_hours"] <= 0) and (r["holiday_hours"] <= 0) and (r["bereavement_hours"] <= 0):
|
|
if r.get("scheduled_hours", 0) <= 0:
|
|
# No clocks, no scheduled hours, nothing else -> skip
|
|
continue
|
|
# Otherwise, keep the row (total stays as provided/computed; paid will be zero)
|
|
# Incomplete clocks: skip unless PTO/holiday/bereavement-only
|
|
if (ci is None) ^ (co is None):
|
|
if (r["pto_hours"] <= 0) and (r["holiday_hours"] <= 0) and (r["bereavement_hours"] <= 0):
|
|
continue
|
|
|
|
if ci and co and co <= ci:
|
|
co = co + timedelta(days=1)
|
|
|
|
if (r.get("holiday_hours") or 0) > 0:
|
|
ci = None
|
|
co = None
|
|
|
|
if _dedup_exists(db, emp.id, timesheet_id, wd, ci, co):
|
|
continue
|
|
|
|
total_hours = q2(D(r["total_hours"] or 0))
|
|
brk = q2(D(r["break_hours"] or 0))
|
|
pto = q2(D(r["pto_hours"] or 0))
|
|
hol = q2(D(r["holiday_hours"] or 0))
|
|
ber = q2(D(r["bereavement_hours"] or 0))
|
|
|
|
worked = q2(D(total_hours) - D(brk))
|
|
if worked < D(0):
|
|
worked = q2(D(0))
|
|
hours_paid = q2(worked + D(pto) + D(hol) + D(ber))
|
|
|
|
te = TimeEntry(
|
|
employee_id=emp.id,
|
|
timesheet_id=timesheet_id,
|
|
work_date=wd,
|
|
clock_in=ci,
|
|
clock_out=co,
|
|
break_hours=brk,
|
|
total_hours=total_hours,
|
|
pto_hours=pto,
|
|
pto_type=(r["pto_type"] or None),
|
|
holiday_hours=hol,
|
|
bereavement_hours=ber,
|
|
hours_paid=hours_paid,
|
|
)
|
|
db.add(te)
|
|
db.flush()
|
|
db.add(ImportBatchItem(batch_id=batch.id, time_entry_id=te.id))
|
|
inserted_count += 1
|
|
|
|
db.commit()
|
|
try:
|
|
os.remove(path)
|
|
except Exception:
|
|
pass
|
|
|
|
if mode == "initial":
|
|
if min_date is None or max_date is None:
|
|
row = db.query(func.min(TimeEntry.work_date), func.max(TimeEntry.work_date)).filter(TimeEntry.timesheet_id == timesheet_id).one()
|
|
min_date, max_date = row[0], row[1]
|
|
|
|
if not min_date:
|
|
return RedirectResponse(url=f"/upload?error=No+rows+found+after+import", status_code=303)
|
|
|
|
from .utils import _semi_monthly_period_for_date as semi
|
|
ps1, pe1 = semi(min_date)
|
|
ps2, pe2 = semi(max_date or min_date)
|
|
ps, pe = (ps2, pe2) if (ps1, pe1) != (ps2, pe2) else (ps1, pe1)
|
|
|
|
ts = db.query(TimesheetPeriod).get(timesheet_id)
|
|
ts.period_start = ps
|
|
ts.period_end = pe
|
|
db.commit()
|
|
|
|
return RedirectResponse(url=f"/assign-weeks?timesheet_id={timesheet_id}", status_code=303)
|
|
|
|
msg = f"Imported {inserted_count} time entries from department file."
|
|
return RedirectResponse(url=f"/viewer?timesheet_id={timesheet_id}&msg={msg}", status_code=303)
|
|
|
|
@router.post("/undo-last")
|
|
def importer_undo_last(request: Request, timesheet_id: int = Form(...), db: Session = Depends(get_session)):
|
|
if not request.session.get("is_admin"):
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
batch = db.query(ImportBatch).filter(ImportBatch.timesheet_id == timesheet_id).order_by(ImportBatch.created_at.desc()).first()
|
|
if not batch:
|
|
return RedirectResponse(url=f"/viewer?timesheet_id={timesheet_id}&msg=No+department+imports+to+undo", status_code=303)
|
|
items = db.query(ImportBatchItem).filter(ImportBatchItem.batch_id == batch.id).all()
|
|
ids = [it.time_entry_id for it in items]
|
|
if items:
|
|
db.query(ImportBatchItem).filter(ImportBatchItem.batch_id == batch.id).delete(synchronize_session=False)
|
|
if ids:
|
|
db.query(TimeEntry).filter(TimeEntry.id.in_(ids)).delete(synchronize_session=False)
|
|
db.delete(batch)
|
|
db.commit()
|
|
return RedirectResponse(url=f"/viewer?timesheet_id={timesheet_id}&msg=Undid+last+department+import", status_code=303) |