Files
duty-teller/handlers/import_duty_schedule.py
Nikolay Tatarinov 3f4c7bf66c Implement date range handling for vacation and unavailable events
- Added helper functions to generate ISO 8601 formatted start and end times for calendar days.
- Introduced logic to merge consecutive vacation dates into a single record for improved data representation.
- Updated the duty schedule import process to utilize the new date handling functions for unavailable and vacation events.
- Enhanced integration tests to validate the correct handling of vacation periods and unavailable dates.
- Modified the web application to display formatted date ranges for vacation and unavailable events.
2026-02-17 23:06:23 +03:00

228 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Import duty-schedule: /import_duty_schedule (admin only). Two steps: handover time -> JSON file."""
import asyncio
import re
from datetime import date, datetime, timedelta, timezone
import config
from telegram import Update
from telegram.ext import CommandHandler, ContextTypes, MessageHandler, filters
from db.session import get_session
from db.repository import (
get_or_create_user_by_full_name,
delete_duties_in_range,
insert_duty,
)
from importers.duty_schedule import (
DutyScheduleParseError,
DutyScheduleResult,
parse_duty_schedule,
)
# HH:MM or HH:MM:SS, optional space + timezone (IANA or "UTC")
HANDOVER_TIME_RE = re.compile(
r"^\s*(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(?:\s+(\S+))?\s*$", re.IGNORECASE
)
def _parse_handover_time(text: str) -> tuple[int, int] | None:
"""Parse handover time string to (hour_utc, minute_utc). Returns None on failure."""
m = HANDOVER_TIME_RE.match(text)
if not m:
return None
hour = int(m.group(1))
minute = int(m.group(2))
# second = m.group(3) ignored
tz_str = (m.group(4) or "").strip()
if not tz_str or tz_str.upper() == "UTC":
return (hour % 24, minute)
try:
from zoneinfo import ZoneInfo
except ImportError:
try:
from backports.zoneinfo import ZoneInfo # type: ignore
except ImportError:
return None
try:
tz = ZoneInfo(tz_str)
except Exception:
return None
# Build datetime in that tz and convert to UTC
dt = datetime(2000, 1, 1, hour, minute, 0, tzinfo=tz)
utc = dt.astimezone(timezone.utc)
return (utc.hour, utc.minute)
def _duty_to_iso(d: date, hour_utc: int, minute_utc: int) -> str:
"""ISO 8601 with Z for start of duty on date d at given UTC time."""
dt = datetime(d.year, d.month, d.day, hour_utc, minute_utc, 0, tzinfo=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def _day_start_iso(d: date) -> str:
"""ISO 8601 start of calendar day UTC: YYYY-MM-DDT00:00:00Z."""
return d.isoformat() + "T00:00:00Z"
def _day_end_iso(d: date) -> str:
"""ISO 8601 end of calendar day UTC: YYYY-MM-DDT23:59:59Z."""
return d.isoformat() + "T23:59:59Z"
def _consecutive_date_ranges(dates: list[date]) -> list[tuple[date, date]]:
"""Sort dates and merge consecutive ones into (first, last) ranges. Empty list -> []."""
if not dates:
return []
sorted_dates = sorted(set(dates))
ranges: list[tuple[date, date]] = []
start_d = end_d = sorted_dates[0]
for d in sorted_dates[1:]:
if (d - end_d).days == 1:
end_d = d
else:
ranges.append((start_d, end_d))
start_d = end_d = d
ranges.append((start_d, end_d))
return ranges
async def import_duty_schedule_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user:
return
if not config.is_admin(update.effective_user.username or ""):
await update.message.reply_text("Доступ только для администраторов.")
return
context.user_data["awaiting_handover_time"] = True
await update.message.reply_text(
"Укажите время пересменки в формате ЧЧ:ММ и часовой пояс, "
"например 09:00 Europe/Moscow или 06:00 UTC."
)
async def handle_handover_time_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.effective_user or not update.message.text:
return
if not context.user_data.get("awaiting_handover_time"):
return
if not config.is_admin(update.effective_user.username or ""):
return
text = update.message.text.strip()
parsed = _parse_handover_time(text)
if parsed is None:
await update.message.reply_text(
"Не удалось разобрать время. Укажите, например: 09:00 Europe/Moscow"
)
return
hour_utc, minute_utc = parsed
context.user_data["handover_utc_time"] = (hour_utc, minute_utc)
context.user_data["awaiting_handover_time"] = False
context.user_data["awaiting_duty_schedule_file"] = True
await update.message.reply_text(
"Отправьте файл в формате duty-schedule (JSON)."
)
def _run_import(
database_url: str,
result: DutyScheduleResult,
hour_utc: int,
minute_utc: int,
) -> tuple[int, int, int, int]:
"""Returns (num_users, num_duty, num_unavailable, num_vacation)."""
session = get_session(database_url)
try:
from_date_str = result.start_date.isoformat()
to_date_str = result.end_date.isoformat()
num_duty = num_unavailable = num_vacation = 0
for entry in result.entries:
user = get_or_create_user_by_full_name(session, entry.full_name)
delete_duties_in_range(session, user.id, from_date_str, to_date_str)
for d in entry.duty_dates:
start_at = _duty_to_iso(d, hour_utc, minute_utc)
d_next = d + timedelta(days=1)
end_at = _duty_to_iso(d_next, hour_utc, minute_utc)
insert_duty(session, user.id, start_at, end_at, event_type="duty")
num_duty += 1
for d in entry.unavailable_dates:
insert_duty(
session,
user.id,
_day_start_iso(d),
_day_end_iso(d),
event_type="unavailable",
)
num_unavailable += 1
for start_d, end_d in _consecutive_date_ranges(entry.vacation_dates):
insert_duty(
session,
user.id,
_day_start_iso(start_d),
_day_end_iso(end_d),
event_type="vacation",
)
num_vacation += 1
return (len(result.entries), num_duty, num_unavailable, num_vacation)
finally:
session.close()
async def handle_duty_schedule_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not update.message or not update.message.document or not update.effective_user:
return
if not context.user_data.get("awaiting_duty_schedule_file"):
return
handover = context.user_data.get("handover_utc_time")
if not handover or not config.is_admin(update.effective_user.username or ""):
return
if not (update.message.document.file_name or "").lower().endswith(".json"):
await update.message.reply_text("Нужен файл с расширением .json")
return
hour_utc, minute_utc = handover
file_id = update.message.document.file_id
# Download and parse in async context
file = await context.bot.get_file(file_id)
raw = bytes(await file.download_as_bytearray())
try:
result = parse_duty_schedule(raw)
except DutyScheduleParseError as e:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
await update.message.reply_text(f"Ошибка разбора файла: {e}")
return
loop = asyncio.get_running_loop()
try:
num_users, num_duty, num_unavailable, num_vacation = await loop.run_in_executor(
None,
lambda: _run_import(config.DATABASE_URL, result, hour_utc, minute_utc),
)
except Exception as e:
await update.message.reply_text(f"Ошибка импорта: {e}")
else:
total = num_duty + num_unavailable + num_vacation
parts = [f"{num_users} пользователей", f"{num_duty} дежурств"]
if num_unavailable:
parts.append(f"{num_unavailable} недоступностей")
if num_vacation:
parts.append(f"{num_vacation} отпусков")
await update.message.reply_text(
"Импорт выполнен: " + ", ".join(parts) + f" (всего {total} событий)."
)
finally:
context.user_data.pop("awaiting_duty_schedule_file", None)
context.user_data.pop("handover_utc_time", None)
import_duty_schedule_handler = CommandHandler("import_duty_schedule", import_duty_schedule_cmd)
handover_time_handler = MessageHandler(
filters.TEXT & ~filters.COMMAND,
handle_handover_time_text,
)
duty_schedule_document_handler = MessageHandler(
filters.Document.FileExtension("json"),
handle_duty_schedule_document,
)