timekeeper/app/process_excel.py
2026-01-15 15:46:35 -05:00

356 lines
13 KiB
Python

import openpyxl
from datetime import datetime, time, date as date_type, timedelta
from typing import Optional, Dict, Any, List
from sqlalchemy.orm import Session
from decimal import Decimal
from .models import TimeEntry, Employee
from .utils import D, q2 # use shared Decimal helpers
def safe_decimal(value, default=Decimal("0")) -> Decimal:
if value is None or value == "":
return Decimal(default)
if isinstance(value, (int, float)):
# Convert via str to avoid binary float artifacts
return D(value)
if isinstance(value, datetime):
# numeric only
return Decimal(default)
if isinstance(value, str):
try:
return D(value.strip())
except Exception:
return Decimal(default)
return Decimal(default)
def parse_excel_serial_date(serial: float) -> Optional[date_type]:
try:
epoch = datetime(1899, 12, 30) # Excel epoch
return (epoch + timedelta(days=float(serial))).date()
except Exception:
return None
def parse_excel_serial_datetime(serial: float) -> Optional[datetime]:
try:
epoch = datetime(1899, 12, 30)
return epoch + timedelta(days=float(serial))
except Exception:
return None
def parse_date(value) -> Optional[date_type]:
if value is None:
return None
if isinstance(value, date_type) and not isinstance(value, datetime):
return value
if isinstance(value, datetime):
return value.date()
if isinstance(value, (int, float)):
return parse_excel_serial_date(value)
if isinstance(value, str):
v = value.strip()
for fmt in [
"%Y-%m-%d",
"%m/%d/%Y",
"%m/%d/%y",
"%d/%m/%Y",
"%d/%m/%y",
"%Y/%m/%d",
"%m-%d-%Y",
"%d-%m-%Y",
"%B %d, %Y",
"%b %d, %Y",
]:
try:
return datetime.strptime(v, fmt).date()
except ValueError:
continue
return None
def parse_time_value(value) -> Optional[time]:
if value is None or value == "":
return None
if isinstance(value, time):
return value
if isinstance(value, datetime):
return value.time()
if isinstance(value, (int, float)):
# Excel serial time: fraction of a day
try:
seconds = int(round(float(value) * 86400))
base = datetime(1970, 1, 1) + timedelta(seconds=seconds)
return base.time()
except Exception:
return None
if isinstance(value, str):
v = value.strip()
for fmt in ["%I:%M %p", "%H:%M", "%I:%M:%S %p", "%H:%M:%S"]:
try:
return datetime.strptime(v, fmt).time()
except ValueError:
continue
return None
def parse_datetime_value(value) -> Optional[datetime]:
"""
Parse a cell that may contain a full datetime (with date), handling:
- Python datetime objects
- Excel serial datetimes (float or int)
- Strings with date and time
Returns None if only time-of-day is present (use parse_time_value then bind to work_date).
"""
if value is None or value == "":
return None
if isinstance(value, datetime):
return value
if isinstance(value, (int, float)):
dt = parse_excel_serial_datetime(value)
return dt
if isinstance(value, str):
v = value.strip()
has_slash_date = "/" in v or "-" in v
if has_slash_date:
for fmt in [
"%m/%d/%Y %I:%M:%S %p",
"%m/%d/%Y %I:%M %p",
"%m/%d/%y %I:%M %p",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %H:%M",
]:
try:
return datetime.strptime(v, fmt)
except ValueError:
continue
return None
def norm(cell_val) -> str:
return str(cell_val).strip().lower() if cell_val is not None else ""
def pick_index(header: List[str], include: List[str], exclude: List[str] = []) -> Optional[int]:
for i, h in enumerate(header):
if not h:
continue
if all(tok in h for tok in include) and all(tok not in h for tok in exclude):
return i
return None
def detect_header_map(row_values: List[Any]) -> Dict[str, int]:
h = [norm(v) for v in row_values]
idx: Dict[str, int] = {}
# Required
idx_emp = pick_index(h, ["employee", "name"], []) or pick_index(h, ["employee"], []) or pick_index(h, ["name"], [])
idx_date = pick_index(h, ["date"], [])
if idx_emp is None or idx_date is None:
return {}
idx["employee"] = idx_emp
idx["work_date"] = idx_date
# Clock in/out
ci = pick_index(h, ["clock", "in", "time"], []) or pick_index(h, ["clock", "in"], []) or pick_index(h, ["time", "in"], [])
co = pick_index(h, ["clock", "out", "time"], []) or pick_index(h, ["clock", "out"], []) or pick_index(h, ["time", "out"], [])
if ci is not None:
idx["clock_in"] = ci
if co is not None:
idx["clock_out"] = co
# Break
br = (
pick_index(h, ["breaks", "hours", "taken"], [])
or pick_index(h, ["break", "hours", "taken"], [])
or pick_index(h, ["breaks", "taken"], [])
or pick_index(h, ["unpaid", "hours"], [])
or pick_index(h, ["break"], [])
or pick_index(h, ["lunch"], [])
)
if br is not None:
idx["break_hours"] = br
# Total: prefer "hours worked"
tot = (
pick_index(h, ["hours", "worked"], ["minus"])
or pick_index(h, ["worked"], ["minus"])
)
if tot is not None:
idx["total_hours"] = tot
else:
alt = pick_index(h, ["hours", "worked", "minus", "break"], []) or pick_index(h, ["worked", "minus", "break"], [])
if alt is not None:
idx["total_minus_break"] = alt
# PTO hours
ptoh = (
pick_index(h, ["pto", "hours"], [])
or pick_index(h, ["sick", "hours"], [])
or pick_index(h, ["vacation", "hours"], [])
or pick_index(h, ["vacation"], [])
)
if ptoh is not None:
idx["pto_hours"] = ptoh
# PTO Type (optional)
ptot = pick_index(h, ["pto", "type"], [])
if ptot is not None:
idx["pto_type"] = ptot
# Holiday / Bereavement
hol = pick_index(h, ["holiday"], [])
ber = pick_index(h, ["bereavement"], [])
if hol is not None:
idx["holiday_hours"] = hol
if ber is not None:
idx["bereavement_hours"] = ber
return idx
def import_workbook(path: str, db: Session, timesheet_id: Optional[int] = None) -> Dict[str, Any]:
wb = openpyxl.load_workbook(path, data_only=True)
inserted_total = 0
employees_seen = set()
sheets_processed = 0
for ws in wb.worksheets:
# Find header row
header_map: Dict[str, int] = {}
header_row_idx = None
for r in range(1, min(ws.max_row, 30) + 1):
row_values = [cell.value for cell in ws[r]]
header_map = detect_header_map(row_values)
if header_map:
header_row_idx = r
break
if not header_map or not header_row_idx:
continue
sheets_processed += 1
# Parse rows
for r in range(header_row_idx + 1, ws.max_row + 1):
vals = [cell.value for cell in ws[r]]
emp_raw = vals[header_map["employee"]] if len(vals) > header_map["employee"] else None
date_raw = vals[header_map["work_date"]] if len(vals) > header_map["work_date"] else None
if not emp_raw or not date_raw:
continue
employee_name = str(emp_raw).strip()
work_date = parse_date(date_raw)
if not work_date:
continue
# Optional fields as Decimal
clock_in_dt: Optional[datetime] = None
clock_out_dt: Optional[datetime] = None
break_hours_taken = D(0)
pto_hours = D(0)
pto_type = None
holiday_hours = D(0)
bereavement_hours = D(0)
total_from_sheet: Optional[Decimal] = None
alt_total_minus_break: Optional[Decimal] = None
# Clock In/Out: prefer full datetime if present in the cell; else bind time to work_date
if "clock_in" in header_map and len(vals) > header_map["clock_in"]:
ci_raw = vals[header_map["clock_in"]]
clock_in_dt = parse_datetime_value(ci_raw)
if not clock_in_dt:
t = parse_time_value(ci_raw)
clock_in_dt = datetime.combine(work_date, t) if t else None
if "clock_out" in header_map and len(vals) > header_map["clock_out"]:
co_raw = vals[header_map["clock_out"]]
clock_out_dt = parse_datetime_value(co_raw)
if not clock_out_dt:
t = parse_time_value(co_raw)
clock_out_dt = datetime.combine(work_date, t) if t else None
# Overnight
if clock_in_dt and clock_out_dt and clock_out_dt <= clock_in_dt:
clock_out_dt = clock_out_dt + timedelta(days=1)
# Breaks Hours Taken
if "break_hours" in header_map and len(vals) > header_map["break_hours"]:
break_hours_taken = safe_decimal(vals[header_map["break_hours"]], D(0))
# Total = Hours Worked (preferred)
if "total_hours" in header_map and len(vals) > header_map["total_hours"]:
total_from_sheet = safe_decimal(vals[header_map["total_hours"]], None)
# Fallback: Hours Worked Minus Break Hours -> infer Hours Worked by adding back break
if "total_minus_break" in header_map and len(vals) > header_map["total_minus_break"]:
alt_total_minus_break = safe_decimal(vals[header_map["total_minus_break"]], None)
# PTO / Holiday / Bereavement
if "pto_hours" in header_map and len(vals) > header_map["pto_hours"]:
pto_hours = safe_decimal(vals[header_map["pto_hours"]], D(0))
if "pto_type" in header_map and len(vals) > header_map["pto_type"]:
v = vals[header_map["pto_type"]]
pto_type = (str(v).strip() if v is not None and not isinstance(v, (int, float, datetime)) else None)
if "holiday_hours" in header_map and len(vals) > header_map["holiday_hours"]:
holiday_hours = safe_decimal(vals[header_map["holiday_hours"]], D(0))
if "bereavement_hours" in header_map and len(vals) > header_map["bereavement_hours"]:
bereavement_hours = safe_decimal(vals[header_map["bereavement_hours"]], D(0))
# Determine Total Hours (Hours Worked)
if total_from_sheet is None:
if alt_total_minus_break is not None and break_hours_taken is not None:
total_from_sheet = q2(alt_total_minus_break + break_hours_taken)
else:
if clock_in_dt and clock_out_dt:
total_from_sheet = q2(D((clock_out_dt - clock_in_dt).total_seconds()) / D(3600))
else:
total_from_sheet = D(0)
else:
total_from_sheet = q2(total_from_sheet)
# Force PTO type review: if PTO hours are present (including Vacation), blank pto_type
if pto_hours > D(0):
pto_type = None
# Employee
emp = db.query(Employee).filter(Employee.name == employee_name).first()
if not emp:
emp = Employee(name=employee_name)
db.add(emp)
db.flush()
employees_seen.add(emp.id)
entry = TimeEntry(
employee_id=emp.id,
work_date=work_date,
clock_in=clock_in_dt,
clock_out=clock_out_dt,
break_hours=q2(break_hours_taken),
total_hours=q2(total_from_sheet),
pto_hours=q2(pto_hours),
pto_type=pto_type,
holiday_hours=q2(holiday_hours),
bereavement_hours=q2(bereavement_hours),
timesheet_id=timesheet_id,
)
# Hours Paid = (Total - Break) + PTO + Holiday + Bereavement
worked = (D(entry.total_hours or 0) - D(entry.break_hours or 0))
if worked < D(0):
worked = D(0)
entry.hours_paid = q2(worked + D(entry.pto_hours or 0) + D(entry.holiday_hours or 0) + D(entry.bereavement_hours or 0))
db.add(entry)
inserted_total += 1
db.commit()
print(f"[import_workbook] Sheets processed: {sheets_processed}, rows inserted: {inserted_total}, employees: {len(employees_seen)}")
return {"rows": inserted_total, "employees": len(employees_seen)}