356 lines
13 KiB
Python
356 lines
13 KiB
Python
import openpyxl
|
|
from datetime import datetime, time, date as date_type, timedelta
|
|
from typing import Optional, Dict, Any, List
|
|
from sqlalchemy.orm import Session
|
|
from decimal import Decimal
|
|
|
|
from .models import TimeEntry, Employee
|
|
from .utils import D, q2 # use shared Decimal helpers
|
|
|
|
|
|
def safe_decimal(value, default=Decimal("0")) -> Decimal:
|
|
if value is None or value == "":
|
|
return Decimal(default)
|
|
if isinstance(value, (int, float)):
|
|
# Convert via str to avoid binary float artifacts
|
|
return D(value)
|
|
if isinstance(value, datetime):
|
|
# numeric only
|
|
return Decimal(default)
|
|
if isinstance(value, str):
|
|
try:
|
|
return D(value.strip())
|
|
except Exception:
|
|
return Decimal(default)
|
|
return Decimal(default)
|
|
|
|
|
|
def parse_excel_serial_date(serial: float) -> Optional[date_type]:
|
|
try:
|
|
epoch = datetime(1899, 12, 30) # Excel epoch
|
|
return (epoch + timedelta(days=float(serial))).date()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_excel_serial_datetime(serial: float) -> Optional[datetime]:
|
|
try:
|
|
epoch = datetime(1899, 12, 30)
|
|
return epoch + timedelta(days=float(serial))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def parse_date(value) -> Optional[date_type]:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, date_type) and not isinstance(value, datetime):
|
|
return value
|
|
if isinstance(value, datetime):
|
|
return value.date()
|
|
if isinstance(value, (int, float)):
|
|
return parse_excel_serial_date(value)
|
|
if isinstance(value, str):
|
|
v = value.strip()
|
|
for fmt in [
|
|
"%Y-%m-%d",
|
|
"%m/%d/%Y",
|
|
"%m/%d/%y",
|
|
"%d/%m/%Y",
|
|
"%d/%m/%y",
|
|
"%Y/%m/%d",
|
|
"%m-%d-%Y",
|
|
"%d-%m-%Y",
|
|
"%B %d, %Y",
|
|
"%b %d, %Y",
|
|
]:
|
|
try:
|
|
return datetime.strptime(v, fmt).date()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def parse_time_value(value) -> Optional[time]:
|
|
if value is None or value == "":
|
|
return None
|
|
if isinstance(value, time):
|
|
return value
|
|
if isinstance(value, datetime):
|
|
return value.time()
|
|
if isinstance(value, (int, float)):
|
|
# Excel serial time: fraction of a day
|
|
try:
|
|
seconds = int(round(float(value) * 86400))
|
|
base = datetime(1970, 1, 1) + timedelta(seconds=seconds)
|
|
return base.time()
|
|
except Exception:
|
|
return None
|
|
if isinstance(value, str):
|
|
v = value.strip()
|
|
for fmt in ["%I:%M %p", "%H:%M", "%I:%M:%S %p", "%H:%M:%S"]:
|
|
try:
|
|
return datetime.strptime(v, fmt).time()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def parse_datetime_value(value) -> Optional[datetime]:
|
|
"""
|
|
Parse a cell that may contain a full datetime (with date), handling:
|
|
- Python datetime objects
|
|
- Excel serial datetimes (float or int)
|
|
- Strings with date and time
|
|
Returns None if only time-of-day is present (use parse_time_value then bind to work_date).
|
|
"""
|
|
if value is None or value == "":
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value
|
|
if isinstance(value, (int, float)):
|
|
dt = parse_excel_serial_datetime(value)
|
|
return dt
|
|
if isinstance(value, str):
|
|
v = value.strip()
|
|
has_slash_date = "/" in v or "-" in v
|
|
if has_slash_date:
|
|
for fmt in [
|
|
"%m/%d/%Y %I:%M:%S %p",
|
|
"%m/%d/%Y %I:%M %p",
|
|
"%m/%d/%y %I:%M %p",
|
|
"%Y-%m-%d %H:%M:%S",
|
|
"%Y-%m-%d %H:%M",
|
|
"%m/%d/%Y %H:%M:%S",
|
|
"%m/%d/%Y %H:%M",
|
|
]:
|
|
try:
|
|
return datetime.strptime(v, fmt)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def norm(cell_val) -> str:
|
|
return str(cell_val).strip().lower() if cell_val is not None else ""
|
|
|
|
|
|
def pick_index(header: List[str], include: List[str], exclude: List[str] = []) -> Optional[int]:
|
|
for i, h in enumerate(header):
|
|
if not h:
|
|
continue
|
|
if all(tok in h for tok in include) and all(tok not in h for tok in exclude):
|
|
return i
|
|
return None
|
|
|
|
|
|
def detect_header_map(row_values: List[Any]) -> Dict[str, int]:
|
|
h = [norm(v) for v in row_values]
|
|
idx: Dict[str, int] = {}
|
|
|
|
# Required
|
|
idx_emp = pick_index(h, ["employee", "name"], []) or pick_index(h, ["employee"], []) or pick_index(h, ["name"], [])
|
|
idx_date = pick_index(h, ["date"], [])
|
|
if idx_emp is None or idx_date is None:
|
|
return {}
|
|
|
|
idx["employee"] = idx_emp
|
|
idx["work_date"] = idx_date
|
|
|
|
# Clock in/out
|
|
ci = pick_index(h, ["clock", "in", "time"], []) or pick_index(h, ["clock", "in"], []) or pick_index(h, ["time", "in"], [])
|
|
co = pick_index(h, ["clock", "out", "time"], []) or pick_index(h, ["clock", "out"], []) or pick_index(h, ["time", "out"], [])
|
|
if ci is not None:
|
|
idx["clock_in"] = ci
|
|
if co is not None:
|
|
idx["clock_out"] = co
|
|
|
|
# Break
|
|
br = (
|
|
pick_index(h, ["breaks", "hours", "taken"], [])
|
|
or pick_index(h, ["break", "hours", "taken"], [])
|
|
or pick_index(h, ["breaks", "taken"], [])
|
|
or pick_index(h, ["unpaid", "hours"], [])
|
|
or pick_index(h, ["break"], [])
|
|
or pick_index(h, ["lunch"], [])
|
|
)
|
|
if br is not None:
|
|
idx["break_hours"] = br
|
|
|
|
# Total: prefer "hours worked"
|
|
tot = (
|
|
pick_index(h, ["hours", "worked"], ["minus"])
|
|
or pick_index(h, ["worked"], ["minus"])
|
|
)
|
|
if tot is not None:
|
|
idx["total_hours"] = tot
|
|
else:
|
|
alt = pick_index(h, ["hours", "worked", "minus", "break"], []) or pick_index(h, ["worked", "minus", "break"], [])
|
|
if alt is not None:
|
|
idx["total_minus_break"] = alt
|
|
|
|
# PTO hours
|
|
ptoh = (
|
|
pick_index(h, ["pto", "hours"], [])
|
|
or pick_index(h, ["sick", "hours"], [])
|
|
or pick_index(h, ["vacation", "hours"], [])
|
|
or pick_index(h, ["vacation"], [])
|
|
)
|
|
if ptoh is not None:
|
|
idx["pto_hours"] = ptoh
|
|
|
|
# PTO Type (optional)
|
|
ptot = pick_index(h, ["pto", "type"], [])
|
|
if ptot is not None:
|
|
idx["pto_type"] = ptot
|
|
|
|
# Holiday / Bereavement
|
|
hol = pick_index(h, ["holiday"], [])
|
|
ber = pick_index(h, ["bereavement"], [])
|
|
if hol is not None:
|
|
idx["holiday_hours"] = hol
|
|
if ber is not None:
|
|
idx["bereavement_hours"] = ber
|
|
|
|
return idx
|
|
|
|
|
|
def import_workbook(path: str, db: Session, timesheet_id: Optional[int] = None) -> Dict[str, Any]:
|
|
wb = openpyxl.load_workbook(path, data_only=True)
|
|
|
|
inserted_total = 0
|
|
employees_seen = set()
|
|
sheets_processed = 0
|
|
|
|
for ws in wb.worksheets:
|
|
# Find header row
|
|
header_map: Dict[str, int] = {}
|
|
header_row_idx = None
|
|
for r in range(1, min(ws.max_row, 30) + 1):
|
|
row_values = [cell.value for cell in ws[r]]
|
|
header_map = detect_header_map(row_values)
|
|
if header_map:
|
|
header_row_idx = r
|
|
break
|
|
if not header_map or not header_row_idx:
|
|
continue
|
|
|
|
sheets_processed += 1
|
|
|
|
# Parse rows
|
|
for r in range(header_row_idx + 1, ws.max_row + 1):
|
|
vals = [cell.value for cell in ws[r]]
|
|
|
|
emp_raw = vals[header_map["employee"]] if len(vals) > header_map["employee"] else None
|
|
date_raw = vals[header_map["work_date"]] if len(vals) > header_map["work_date"] else None
|
|
if not emp_raw or not date_raw:
|
|
continue
|
|
|
|
employee_name = str(emp_raw).strip()
|
|
work_date = parse_date(date_raw)
|
|
if not work_date:
|
|
continue
|
|
|
|
# Optional fields as Decimal
|
|
clock_in_dt: Optional[datetime] = None
|
|
clock_out_dt: Optional[datetime] = None
|
|
break_hours_taken = D(0)
|
|
pto_hours = D(0)
|
|
pto_type = None
|
|
holiday_hours = D(0)
|
|
bereavement_hours = D(0)
|
|
total_from_sheet: Optional[Decimal] = None
|
|
alt_total_minus_break: Optional[Decimal] = None
|
|
|
|
# Clock In/Out: prefer full datetime if present in the cell; else bind time to work_date
|
|
if "clock_in" in header_map and len(vals) > header_map["clock_in"]:
|
|
ci_raw = vals[header_map["clock_in"]]
|
|
clock_in_dt = parse_datetime_value(ci_raw)
|
|
if not clock_in_dt:
|
|
t = parse_time_value(ci_raw)
|
|
clock_in_dt = datetime.combine(work_date, t) if t else None
|
|
|
|
if "clock_out" in header_map and len(vals) > header_map["clock_out"]:
|
|
co_raw = vals[header_map["clock_out"]]
|
|
clock_out_dt = parse_datetime_value(co_raw)
|
|
if not clock_out_dt:
|
|
t = parse_time_value(co_raw)
|
|
clock_out_dt = datetime.combine(work_date, t) if t else None
|
|
|
|
# Overnight
|
|
if clock_in_dt and clock_out_dt and clock_out_dt <= clock_in_dt:
|
|
clock_out_dt = clock_out_dt + timedelta(days=1)
|
|
|
|
# Breaks Hours Taken
|
|
if "break_hours" in header_map and len(vals) > header_map["break_hours"]:
|
|
break_hours_taken = safe_decimal(vals[header_map["break_hours"]], D(0))
|
|
|
|
# Total = Hours Worked (preferred)
|
|
if "total_hours" in header_map and len(vals) > header_map["total_hours"]:
|
|
total_from_sheet = safe_decimal(vals[header_map["total_hours"]], None)
|
|
|
|
# Fallback: Hours Worked Minus Break Hours -> infer Hours Worked by adding back break
|
|
if "total_minus_break" in header_map and len(vals) > header_map["total_minus_break"]:
|
|
alt_total_minus_break = safe_decimal(vals[header_map["total_minus_break"]], None)
|
|
|
|
# PTO / Holiday / Bereavement
|
|
if "pto_hours" in header_map and len(vals) > header_map["pto_hours"]:
|
|
pto_hours = safe_decimal(vals[header_map["pto_hours"]], D(0))
|
|
if "pto_type" in header_map and len(vals) > header_map["pto_type"]:
|
|
v = vals[header_map["pto_type"]]
|
|
pto_type = (str(v).strip() if v is not None and not isinstance(v, (int, float, datetime)) else None)
|
|
if "holiday_hours" in header_map and len(vals) > header_map["holiday_hours"]:
|
|
holiday_hours = safe_decimal(vals[header_map["holiday_hours"]], D(0))
|
|
if "bereavement_hours" in header_map and len(vals) > header_map["bereavement_hours"]:
|
|
bereavement_hours = safe_decimal(vals[header_map["bereavement_hours"]], D(0))
|
|
|
|
# Determine Total Hours (Hours Worked)
|
|
if total_from_sheet is None:
|
|
if alt_total_minus_break is not None and break_hours_taken is not None:
|
|
total_from_sheet = q2(alt_total_minus_break + break_hours_taken)
|
|
else:
|
|
if clock_in_dt and clock_out_dt:
|
|
total_from_sheet = q2(D((clock_out_dt - clock_in_dt).total_seconds()) / D(3600))
|
|
else:
|
|
total_from_sheet = D(0)
|
|
else:
|
|
total_from_sheet = q2(total_from_sheet)
|
|
|
|
# Force PTO type review: if PTO hours are present (including Vacation), blank pto_type
|
|
if pto_hours > D(0):
|
|
pto_type = None
|
|
|
|
# Employee
|
|
emp = db.query(Employee).filter(Employee.name == employee_name).first()
|
|
if not emp:
|
|
emp = Employee(name=employee_name)
|
|
db.add(emp)
|
|
db.flush()
|
|
employees_seen.add(emp.id)
|
|
|
|
entry = TimeEntry(
|
|
employee_id=emp.id,
|
|
work_date=work_date,
|
|
clock_in=clock_in_dt,
|
|
clock_out=clock_out_dt,
|
|
break_hours=q2(break_hours_taken),
|
|
total_hours=q2(total_from_sheet),
|
|
pto_hours=q2(pto_hours),
|
|
pto_type=pto_type,
|
|
holiday_hours=q2(holiday_hours),
|
|
bereavement_hours=q2(bereavement_hours),
|
|
timesheet_id=timesheet_id,
|
|
)
|
|
|
|
# Hours Paid = (Total - Break) + PTO + Holiday + Bereavement
|
|
worked = (D(entry.total_hours or 0) - D(entry.break_hours or 0))
|
|
if worked < D(0):
|
|
worked = D(0)
|
|
entry.hours_paid = q2(worked + D(entry.pto_hours or 0) + D(entry.holiday_hours or 0) + D(entry.bereavement_hours or 0))
|
|
|
|
db.add(entry)
|
|
inserted_total += 1
|
|
|
|
db.commit()
|
|
print(f"[import_workbook] Sheets processed: {sheets_processed}, rows inserted: {inserted_total}, employees: {len(employees_seen)}")
|
|
return {"rows": inserted_total, "employees": len(employees_seen)} |