"""Import duty schedule: delete range, insert duties/unavailable/vacation. Accepts session.""" import logging from datetime import date, timedelta from sqlalchemy.orm import Session from duty_teller.cache import invalidate_duty_related_caches from duty_teller.db.models import Duty from duty_teller.db.repository import ( delete_duties_in_range, get_or_create_user_by_full_name, get_users_by_full_names, ) from duty_teller.importers.duty_schedule import DutyScheduleResult from duty_teller.utils.dates import day_start_iso, day_end_iso, duty_to_iso logger = logging.getLogger(__name__) def _consecutive_date_ranges(dates: list[date]) -> list[tuple[date, date]]: """Sort dates and merge consecutive ones into (first, last) ranges. Empty list -> [].""" if not dates: return [] sorted_dates = sorted(set(dates)) ranges: list[tuple[date, date]] = [] start_d = end_d = sorted_dates[0] for d in sorted_dates[1:]: # Merge consecutive days into one range; gap starts a new range if (d - end_d).days == 1: end_d = d else: ranges.append((start_d, end_d)) start_d = end_d = d ranges.append((start_d, end_d)) return ranges def run_import( session: Session, result: DutyScheduleResult, hour_utc: int, minute_utc: int, ) -> tuple[int, int, int, int]: """Run duty-schedule import: delete range per user, bulk insert duties. Batched: users fetched in one query, missing created; bulk_insert_mappings. One commit at end. Args: session: DB session. result: Parsed duty schedule (start_date, end_date, entries). hour_utc: Handover hour in UTC (0-23). minute_utc: Handover minute in UTC (0-59). Returns: Tuple (num_users, num_duty, num_unavailable, num_vacation). """ logger.info( "Import started: range %s..%s, %d entries", result.start_date, result.end_date, len(result.entries), ) from_date_str = result.start_date.isoformat() to_date_str = result.end_date.isoformat() num_duty = num_unavailable = num_vacation = 0 # Batch: get all users by full_name, create missing (no commit until end) names = [e.full_name for e in result.entries] users_map = get_users_by_full_names(session, names) for name in names: if name not in users_map: users_map[name] = get_or_create_user_by_full_name( session, name, commit=False ) # Delete range per user (no commit) for entry in result.entries: user = users_map[entry.full_name] delete_duties_in_range( session, user.id, from_date_str, to_date_str, commit=False ) # Build rows for bulk insert duty_rows: list[dict] = [] for entry in result.entries: user = users_map[entry.full_name] for d in entry.duty_dates: start_at = duty_to_iso(d, hour_utc, minute_utc) d_next = d + timedelta(days=1) end_at = duty_to_iso(d_next, hour_utc, minute_utc) duty_rows.append( { "user_id": user.id, "start_at": start_at, "end_at": end_at, "event_type": "duty", } ) num_duty += 1 for d in entry.unavailable_dates: duty_rows.append( { "user_id": user.id, "start_at": day_start_iso(d), "end_at": day_end_iso(d), "event_type": "unavailable", } ) num_unavailable += 1 for start_d, end_d in _consecutive_date_ranges(entry.vacation_dates): duty_rows.append( { "user_id": user.id, "start_at": day_start_iso(start_d), "end_at": day_end_iso(end_d), "event_type": "vacation", } ) num_vacation += 1 if duty_rows: session.bulk_insert_mappings(Duty, duty_rows) session.commit() invalidate_duty_related_caches() logger.info( "Import done: %d users, %d duty, %d unavailable, %d vacation", len(result.entries), num_duty, num_unavailable, num_vacation, ) return (len(result.entries), num_duty, num_unavailable, num_vacation)