From ef5dbca5dfdca2d316455aff4f9225c1b9968513 Mon Sep 17 00:00:00 2001 From: Nikolay Tatarinov Date: Tue, 17 Feb 2026 21:45:23 +0300 Subject: [PATCH] Implement duty schedule import functionality and enhance user management - Added a new command `/import_duty_schedule` for importing duty schedules via JSON, restricted to admin users. - Introduced a two-step import process: specifying handover time and uploading a JSON file. - Updated the database schema to allow `telegram_user_id` to be nullable for user creation by full name. - Implemented repository functions for user management, including `get_or_create_user_by_full_name` and `delete_duties_in_range`. - Enhanced README documentation with details on the new import command and JSON format requirements. - Added comprehensive tests for the duty schedule parser and integration tests for the import functionality. --- README.md | 15 ++ .../002_users_telegram_user_id_nullable.py | 35 ++++ db/__init__.py | 10 +- db/models.py | 4 +- db/repository.py | 43 +++++ handlers/__init__.py | 5 +- handlers/commands.py | 14 +- handlers/import_duty_schedule.py | 175 ++++++++++++++++++ importers/__init__.py | 1 + importers/duty_schedule.py | 89 +++++++++ tests/test_duty_schedule_parser.py | 92 +++++++++ .../test_import_duty_schedule_integration.py | 110 +++++++++++ tests/test_repository_duty_range.py | 93 ++++++++++ 13 files changed, 678 insertions(+), 8 deletions(-) create mode 100644 alembic/versions/002_users_telegram_user_id_nullable.py create mode 100644 handlers/import_duty_schedule.py create mode 100644 importers/__init__.py create mode 100644 importers/duty_schedule.py create mode 100644 tests/test_duty_schedule_parser.py create mode 100644 tests/test_import_duty_schedule_integration.py create mode 100644 tests/test_repository_duty_range.py diff --git a/README.md b/README.md index b014138..18806de 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,21 @@ Ensure `.env` exists (e.g. `cp .env.example .env`) and contains `BOT_TOKEN`. To add commands, define async handlers in `handlers/commands.py` (or a new module) and register them in `handlers/__init__.py`. +## Импорт расписания дежурств (duty-schedule) + +Команда **`/import_duty_schedule`** доступна только пользователям из `ADMIN_USERNAMES`. Импорт выполняется в два шага: + +1. **Время пересменки** — бот просит указать время и при необходимости часовой пояс (например `09:00 Europe/Moscow` или `06:00 UTC`). Время приводится к UTC и используется для границ смен при создании записей. +2. **Файл JSON** — отправьте файл в формате duty-schedule (см. ниже). + +Формат **duty-schedule**: +- **meta**: обязательное поле `start_date` (YYYY-MM-DD), опционально `weeks`; количество дней определяется по длине строки `duty`. +- **schedule**: массив объектов с полями: + - `name` — ФИО (строка); + - `duty` — строка с разделителем `;`: каждый элемент соответствует дню с `start_date` по порядку. Пусто или пробелы — нет дежурства; символы **б**, **Б**, **в**, **Н**, **О** — день дежурства. + +При повторном импорте дежурства в том же диапазоне дат для каждого пользователя заменяются новыми. + ## Tests Install dev dependencies and run pytest: diff --git a/alembic/versions/002_users_telegram_user_id_nullable.py b/alembic/versions/002_users_telegram_user_id_nullable.py new file mode 100644 index 0000000..b0c05b7 --- /dev/null +++ b/alembic/versions/002_users_telegram_user_id_nullable.py @@ -0,0 +1,35 @@ +"""Users: telegram_user_id nullable (for import by full_name) + +Revision ID: 002 +Revises: 001 +Create Date: 2025-02-17 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "002" +down_revision: Union[str, None] = "001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.alter_column( + "telegram_user_id", + existing_type=sa.BigInteger(), + nullable=True, + ) + + +def downgrade() -> None: + with op.batch_alter_table("users", schema=None) as batch_op: + batch_op.alter_column( + "telegram_user_id", + existing_type=sa.BigInteger(), + nullable=False, + ) diff --git a/db/__init__.py b/db/__init__.py index 26d70fd..459863c 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -3,7 +3,13 @@ from db.models import Base, User, Duty from db.schemas import UserCreate, UserInDb, DutyCreate, DutyInDb, DutyWithUser from db.session import get_engine, get_session_factory, get_session -from db.repository import get_or_create_user, get_duties, insert_duty +from db.repository import ( + delete_duties_in_range, + get_or_create_user, + get_or_create_user_by_full_name, + get_duties, + insert_duty, +) __all__ = [ "Base", @@ -17,7 +23,9 @@ __all__ = [ "get_engine", "get_session_factory", "get_session", + "delete_duties_in_range", "get_or_create_user", + "get_or_create_user_by_full_name", "get_duties", "insert_duty", "init_db", diff --git a/db/models.py b/db/models.py index 72101e0..e1762aa 100644 --- a/db/models.py +++ b/db/models.py @@ -14,8 +14,8 @@ class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - telegram_user_id: Mapped[int] = mapped_column( - BigInteger, unique=True, nullable=False + telegram_user_id: Mapped[int | None] = mapped_column( + BigInteger, unique=True, nullable=True ) full_name: Mapped[str] = mapped_column(Text, nullable=False) username: Mapped[str | None] = mapped_column(Text, nullable=True) diff --git a/db/repository.py b/db/repository.py index b717e9a..e45ca10 100644 --- a/db/repository.py +++ b/db/repository.py @@ -1,5 +1,7 @@ """Repository: get_or_create_user, get_duties, insert_duty.""" +from datetime import datetime, timedelta + from sqlalchemy.orm import Session from db.models import User, Duty @@ -35,6 +37,47 @@ def get_or_create_user( return user +def get_or_create_user_by_full_name(session: Session, full_name: str) -> User: + """Find user by exact full_name or create one with telegram_user_id=None (for duty-schedule import).""" + user = session.query(User).filter(User.full_name == full_name).first() + if user: + return user + user = User( + telegram_user_id=None, + full_name=full_name, + username=None, + first_name=None, + last_name=None, + ) + session.add(user) + session.commit() + session.refresh(user) + return user + + +def delete_duties_in_range( + session: Session, + user_id: int, + from_date: str, + to_date: str, +) -> int: + """Delete all duties of the user that overlap [from_date, to_date] (YYYY-MM-DD). Returns count deleted.""" + # start_at < to_date + 1 day so duties starting on to_date are included (start_at is ISO with T) + to_next = (datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)).strftime("%Y-%m-%d") + q = ( + session.query(Duty) + .filter( + Duty.user_id == user_id, + Duty.start_at < to_next, + Duty.end_at >= from_date, + ) + ) + count = q.count() + q.delete(synchronize_session=False) + session.commit() + return count + + def get_duties( session: Session, from_date: str, diff --git a/handlers/__init__.py b/handlers/__init__.py index 13cf0f0..0a2539b 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -2,10 +2,13 @@ from telegram.ext import Application -from . import commands, errors +from . import commands, errors, import_duty_schedule def register_handlers(app: Application) -> None: app.add_handler(commands.start_handler) app.add_handler(commands.help_handler) + app.add_handler(import_duty_schedule.import_duty_schedule_handler) + app.add_handler(import_duty_schedule.handover_time_handler) + app.add_handler(import_duty_schedule.duty_schedule_document_handler) app.add_error_handler(errors.error_handler) diff --git a/handlers/commands.py b/handlers/commands.py index b7dbd86..8879931 100644 --- a/handlers/commands.py +++ b/handlers/commands.py @@ -46,10 +46,16 @@ async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - if update.message: - await update.message.reply_text( - "Доступные команды:\n/start — Начать\n/help — Показать эту справку" - ) + if not update.message or not update.effective_user: + return + lines = [ + "Доступные команды:", + "/start — Начать", + "/help — Показать эту справку", + ] + if config.is_admin(update.effective_user.username or ""): + lines.append("/import_duty_schedule — Импорт расписания дежурств (JSON)") + await update.message.reply_text("\n".join(lines)) start_handler = CommandHandler("start", start) diff --git a/handlers/import_duty_schedule.py b/handlers/import_duty_schedule.py new file mode 100644 index 0000000..b0af60e --- /dev/null +++ b/handlers/import_duty_schedule.py @@ -0,0 +1,175 @@ +"""Import duty-schedule: /import_duty_schedule (admin only). Two steps: handover time -> JSON file.""" + +import asyncio +import re +from datetime import date, datetime, timedelta, timezone + +import config +from telegram import Update +from telegram.ext import CommandHandler, ContextTypes, MessageHandler, filters + +from db.session import get_session +from db.repository import ( + get_or_create_user_by_full_name, + delete_duties_in_range, + insert_duty, +) +from importers.duty_schedule import ( + DutyScheduleParseError, + DutyScheduleResult, + parse_duty_schedule, +) + +# HH:MM or HH:MM:SS, optional space + timezone (IANA or "UTC") +HANDOVER_TIME_RE = re.compile( + r"^\s*(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(?:\s+(\S+))?\s*$", re.IGNORECASE +) + + +def _parse_handover_time(text: str) -> tuple[int, int] | None: + """Parse handover time string to (hour_utc, minute_utc). Returns None on failure.""" + m = HANDOVER_TIME_RE.match(text) + if not m: + return None + hour = int(m.group(1)) + minute = int(m.group(2)) + # second = m.group(3) ignored + tz_str = (m.group(4) or "").strip() + if not tz_str or tz_str.upper() == "UTC": + return (hour % 24, minute) + try: + from zoneinfo import ZoneInfo + except ImportError: + try: + from backports.zoneinfo import ZoneInfo # type: ignore + except ImportError: + return None + try: + tz = ZoneInfo(tz_str) + except Exception: + return None + # Build datetime in that tz and convert to UTC + dt = datetime(2000, 1, 1, hour, minute, 0, tzinfo=tz) + utc = dt.astimezone(timezone.utc) + return (utc.hour, utc.minute) + + +def _duty_to_iso(d: date, hour_utc: int, minute_utc: int) -> str: + """ISO 8601 with Z for start of duty on date d at given UTC time.""" + dt = datetime(d.year, d.month, d.day, hour_utc, minute_utc, 0, tzinfo=timezone.utc) + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + +async def import_duty_schedule_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message or not update.effective_user: + return + if not config.is_admin(update.effective_user.username or ""): + await update.message.reply_text("Доступ только для администраторов.") + return + context.user_data["awaiting_handover_time"] = True + await update.message.reply_text( + "Укажите время пересменки в формате ЧЧ:ММ и часовой пояс, " + "например 09:00 Europe/Moscow или 06:00 UTC." + ) + + +async def handle_handover_time_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message or not update.effective_user or not update.message.text: + return + if not context.user_data.get("awaiting_handover_time"): + return + if not config.is_admin(update.effective_user.username or ""): + return + text = update.message.text.strip() + parsed = _parse_handover_time(text) + if parsed is None: + await update.message.reply_text( + "Не удалось разобрать время. Укажите, например: 09:00 Europe/Moscow" + ) + return + hour_utc, minute_utc = parsed + context.user_data["handover_utc_time"] = (hour_utc, minute_utc) + context.user_data["awaiting_handover_time"] = False + context.user_data["awaiting_duty_schedule_file"] = True + await update.message.reply_text( + "Отправьте файл в формате duty-schedule (JSON)." + ) + + +def _run_import( + database_url: str, + result: DutyScheduleResult, + hour_utc: int, + minute_utc: int, +) -> tuple[int, int]: + session = get_session(database_url) + try: + from_date_str = result.start_date.isoformat() + to_date_str = result.end_date.isoformat() + total_duties = 0 + for full_name, duty_dates in result.entries: + user = get_or_create_user_by_full_name(session, full_name) + delete_duties_in_range(session, user.id, from_date_str, to_date_str) + for d in duty_dates: + start_at = _duty_to_iso(d, hour_utc, minute_utc) + d_next = d + timedelta(days=1) + end_at = _duty_to_iso(d_next, hour_utc, minute_utc) + insert_duty(session, user.id, start_at, end_at) + total_duties += 1 + return (len(result.entries), total_duties) + finally: + session.close() + + +async def handle_duty_schedule_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + if not update.message or not update.message.document or not update.effective_user: + return + if not context.user_data.get("awaiting_duty_schedule_file"): + return + handover = context.user_data.get("handover_utc_time") + if not handover or not config.is_admin(update.effective_user.username or ""): + return + if not (update.message.document.file_name or "").lower().endswith(".json"): + await update.message.reply_text("Нужен файл с расширением .json") + return + + hour_utc, minute_utc = handover + file_id = update.message.document.file_id + + # Download and parse in async context + file = await context.bot.get_file(file_id) + raw = bytes(await file.download_as_bytearray()) + try: + result = parse_duty_schedule(raw) + except DutyScheduleParseError as e: + context.user_data.pop("awaiting_duty_schedule_file", None) + context.user_data.pop("handover_utc_time", None) + await update.message.reply_text(f"Ошибка разбора файла: {e}") + return + + loop = asyncio.get_running_loop() + try: + num_users, num_duties = await loop.run_in_executor( + None, + lambda: _run_import(config.DATABASE_URL, result, hour_utc, minute_utc), + ) + except Exception as e: + await update.message.reply_text(f"Ошибка импорта: {e}") + else: + await update.message.reply_text( + f"Импорт выполнен: {num_users} пользователей, {num_duties} дежурств." + ) + finally: + context.user_data.pop("awaiting_duty_schedule_file", None) + context.user_data.pop("handover_utc_time", None) + + +import_duty_schedule_handler = CommandHandler("import_duty_schedule", import_duty_schedule_cmd) +handover_time_handler = MessageHandler( + filters.TEXT & ~filters.COMMAND, + handle_handover_time_text, +) +duty_schedule_document_handler = MessageHandler( + filters.Document.FileExtension("json"), + handle_duty_schedule_document, +) diff --git a/importers/__init__.py b/importers/__init__.py new file mode 100644 index 0000000..a246326 --- /dev/null +++ b/importers/__init__.py @@ -0,0 +1 @@ +"""Importers for duty data (e.g. duty-schedule JSON).""" diff --git a/importers/duty_schedule.py b/importers/duty_schedule.py new file mode 100644 index 0000000..c3f8e6a --- /dev/null +++ b/importers/duty_schedule.py @@ -0,0 +1,89 @@ +"""Parser for duty-schedule JSON format. No DB access.""" + +import json +from dataclasses import dataclass +from datetime import date, timedelta + +# Символы, обозначающие день дежурства в ячейке duty (CSV с разделителем ;) +DUTY_MARKERS = frozenset({"б", "Б", "в", "Н", "О"}) + + +@dataclass +class DutyScheduleResult: + """Parsed duty schedule: start_date, end_date, and per-person duty dates.""" + + start_date: date + end_date: date + entries: list[tuple[str, list[date]]] # (full_name, list of duty dates) + + +class DutyScheduleParseError(Exception): + """Invalid or missing fields in duty-schedule JSON.""" + + pass + + +def parse_duty_schedule(raw_bytes: bytes) -> DutyScheduleResult: + """Parse duty-schedule JSON. Returns start_date, end_date, and list of (full_name, duty_dates). + + - meta.start_date (YYYY-MM-DD) and schedule (array) required. + - meta.weeks optional; number of days from max duty string length (split by ';'). + - For each schedule item: name (required), duty = CSV with ';'; index i = start_date + i days. + - Cell value after strip in DUTY_MARKERS => duty day. + """ + try: + data = json.loads(raw_bytes.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + raise DutyScheduleParseError(f"Invalid JSON or encoding: {e}") from e + + meta = data.get("meta") + if not meta or not isinstance(meta, dict): + raise DutyScheduleParseError("Missing or invalid 'meta'") + + start_str = meta.get("start_date") + if not start_str or not isinstance(start_str, str): + raise DutyScheduleParseError("Missing or invalid meta.start_date") + try: + start_date = date.fromisoformat(start_str.strip()) + except ValueError as e: + raise DutyScheduleParseError(f"Invalid meta.start_date: {start_str}") from e + + schedule = data.get("schedule") + if not isinstance(schedule, list): + raise DutyScheduleParseError("Missing or invalid 'schedule' (must be array)") + + max_days = 0 + entries: list[tuple[str, list[date]]] = [] + + for row in schedule: + if not isinstance(row, dict): + raise DutyScheduleParseError("schedule item must be an object") + name = row.get("name") + if name is None or not isinstance(name, str): + raise DutyScheduleParseError("schedule item must have 'name' (string)") + full_name = name.strip() + if not full_name: + raise DutyScheduleParseError("schedule item 'name' cannot be empty") + + duty_str = row.get("duty") + if duty_str is None: + duty_str = "" + if not isinstance(duty_str, str): + raise DutyScheduleParseError("schedule item 'duty' must be string") + + cells = [c.strip() for c in duty_str.split(";")] + max_days = max(max_days, len(cells)) + + duty_dates: list[date] = [] + for i, cell in enumerate(cells): + if cell in DUTY_MARKERS: + d = start_date + timedelta(days=i) + duty_dates.append(d) + entries.append((full_name, duty_dates)) + + if max_days == 0: + end_date = start_date + else: + end_date = start_date + timedelta(days=max_days - 1) + + return DutyScheduleResult(start_date=start_date, end_date=end_date, entries=entries) diff --git a/tests/test_duty_schedule_parser.py b/tests/test_duty_schedule_parser.py new file mode 100644 index 0000000..6472e95 --- /dev/null +++ b/tests/test_duty_schedule_parser.py @@ -0,0 +1,92 @@ +"""Tests for duty-schedule JSON parser.""" + +from datetime import date + +import pytest + +from importers.duty_schedule import ( + DUTY_MARKERS, + DutyScheduleParseError, + parse_duty_schedule, +) + + +def test_parse_valid_schedule(): + raw = ( + '{"meta": {"start_date": "2026-02-16", "weeks": 2}, ' + '"schedule": [' + '{"name": "Ivanov I.I.", "duty": "; ; \u0431 ; \u0411 ; \u0432 ; ;"}, ' + '{"name": "Petrov P.P.", "duty": " ; \u041d ; \u041e ; ; ; ;"}' + "]}" + ).encode("utf-8") + result = parse_duty_schedule(raw) + assert result.start_date == date(2026, 2, 16) + # Petrov has 7 cells -> end = start + 6 + assert result.end_date == date(2026, 2, 22) + assert len(result.entries) == 2 + names = [e[0] for e in result.entries] + assert "Ivanov I.I." in names + assert "Petrov P.P." in names + by_name = {e[0]: e[1] for e in result.entries} + # Ivanov: indices 2, 3, 4 are duty (б, Б, в) -> 2026-02-18, 19, 20 + ivan_dates = sorted(by_name["Ivanov I.I."]) + assert ivan_dates == [date(2026, 2, 18), date(2026, 2, 19), date(2026, 2, 20)] + # Petrov: indices 1, 2 (Н, О) -> 2026-02-17, 18 + petr_dates = sorted(by_name["Petrov P.P."]) + assert petr_dates == [date(2026, 2, 17), date(2026, 2, 18)] + + +def test_parse_empty_duty_string(): + raw = b'{"meta": {"start_date": "2026-02-01"}, "schedule": [{"name": "A", "duty": ""}]}' + result = parse_duty_schedule(raw) + assert result.start_date == date(2026, 2, 1) + assert result.end_date == date(2026, 2, 1) + assert result.entries == [("A", [])] + + +def test_parse_invalid_json(): + with pytest.raises(DutyScheduleParseError, match="Invalid JSON"): + parse_duty_schedule(b"not json") + + +def test_parse_missing_meta(): + with pytest.raises(DutyScheduleParseError, match="meta"): + parse_duty_schedule(b'{"schedule": []}') + + +def test_parse_missing_start_date(): + with pytest.raises(DutyScheduleParseError, match="start_date"): + parse_duty_schedule(b'{"meta": {"weeks": 1}, "schedule": []}') + + +def test_parse_invalid_start_date(): + with pytest.raises(DutyScheduleParseError, match="start_date|Invalid"): + parse_duty_schedule(b'{"meta": {"start_date": "not-a-date"}, "schedule": []}') + + +def test_parse_missing_schedule(): + with pytest.raises(DutyScheduleParseError, match="schedule"): + parse_duty_schedule(b'{"meta": {"start_date": "2026-02-01"}}') + + +def test_parse_schedule_not_array(): + with pytest.raises(DutyScheduleParseError, match="schedule"): + parse_duty_schedule(b'{"meta": {"start_date": "2026-02-01"}, "schedule": {}}') + + +def test_parse_schedule_item_missing_name(): + with pytest.raises(DutyScheduleParseError, match="name"): + parse_duty_schedule( + b'{"meta": {"start_date": "2026-02-01"}, "schedule": [{"duty": ";"}]}' + ) + + +def test_parse_schedule_item_empty_name(): + with pytest.raises(DutyScheduleParseError, match="empty"): + parse_duty_schedule( + b'{"meta": {"start_date": "2026-02-01"}, "schedule": [{"name": " ", "duty": ""}]}' + ) + + +def test_duty_markers(): + assert "б" in DUTY_MARKERS and "Б" in DUTY_MARKERS and "в" in DUTY_MARKERS diff --git a/tests/test_import_duty_schedule_integration.py b/tests/test_import_duty_schedule_integration.py new file mode 100644 index 0000000..7b2213e --- /dev/null +++ b/tests/test_import_duty_schedule_integration.py @@ -0,0 +1,110 @@ +"""Integration tests for duty-schedule import (parser + repo, no bot).""" + +from datetime import date + +import pytest + +from db import init_db +from db.models import Base +from db.repository import get_duties +from db.session import get_session +from importers.duty_schedule import DutyScheduleResult, parse_duty_schedule + +from handlers.import_duty_schedule import _run_import + + +@pytest.fixture +def db_url(): + return "sqlite:///:memory:" + + +@pytest.fixture(autouse=True) +def _reset_db_session(db_url): + """Ensure each test uses a fresh engine for :memory: (clear global cache for test URL).""" + import db.session as session_module + session_module._engine = None + session_module._SessionLocal = None + init_db(db_url) + yield + session_module._engine = None + session_module._SessionLocal = None + + +def test_import_creates_users_and_duties(db_url): + """Import creates users by full_name and correct duty records.""" + result = DutyScheduleResult( + start_date=date(2026, 2, 16), + end_date=date(2026, 2, 18), + entries=[ + ("Ivanov I.I.", [date(2026, 2, 16), date(2026, 2, 18)]), + ("Petrov P.P.", [date(2026, 2, 17)]), + ], + ) + num_users, num_duties = _run_import(db_url, result, 6, 0) + assert num_users == 2 + assert num_duties == 3 + + session = get_session(db_url) + try: + # to_date inclusive: duty on 18th has start_at 2026-02-18T06:00:00Z, so use 2026-02-19 + duties = get_duties(session, "2026-02-16", "2026-02-19") + finally: + session.close() + + assert len(duties) == 3 + starts = {d[0].start_at for d in duties} + assert "2026-02-16T06:00:00Z" in starts + assert "2026-02-17T06:00:00Z" in starts + assert "2026-02-18T06:00:00Z" in starts + + +def test_import_replaces_duties_in_range(db_url): + """Re-importing same range replaces old duties.""" + result1 = DutyScheduleResult( + start_date=date(2026, 2, 16), + end_date=date(2026, 2, 17), + entries=[("Sidorov", [date(2026, 2, 16), date(2026, 2, 17)])], + ) + _run_import(db_url, result1, 9, 0) + + session = get_session(db_url) + try: + duties_first = get_duties(session, "2026-02-16", "2026-02-18") + finally: + session.close() + assert len(duties_first) == 2 + + result2 = DutyScheduleResult( + start_date=date(2026, 2, 16), + end_date=date(2026, 2, 17), + entries=[("Sidorov", [date(2026, 2, 17)])], + ) + _run_import(db_url, result2, 9, 0) + + session = get_session(db_url) + try: + duties_second = get_duties(session, "2026-02-16", "2026-02-18") + finally: + session.close() + assert len(duties_second) == 1 + assert duties_second[0][0].start_at == "2026-02-17T09:00:00Z" + + +def test_import_full_flow_parse_then_import(db_url): + """Parse real-looking JSON then run import.""" + raw = ( + '{"meta": {"start_date": "2026-02-16"}, ' + '"schedule": [{"name": "Alexey A.", "duty": "\u0431; ; \u0432"}]}' + ).encode("utf-8") + parsed = parse_duty_schedule(raw) + num_users, num_duties = _run_import(db_url, parsed, 6, 0) + assert num_users == 1 + assert num_duties == 2 + + session = get_session(db_url) + try: + duties = get_duties(session, "2026-02-16", "2026-02-19") + finally: + session.close() + assert len(duties) == 2 + assert duties[0][1] == "Alexey A." diff --git a/tests/test_repository_duty_range.py b/tests/test_repository_duty_range.py new file mode 100644 index 0000000..8daef8e --- /dev/null +++ b/tests/test_repository_duty_range.py @@ -0,0 +1,93 @@ +"""Tests for delete_duties_in_range and get_or_create_user_by_full_name.""" + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from db.models import Base, User, Duty +from db.repository import ( + delete_duties_in_range, + get_or_create_user_by_full_name, + get_duties, + insert_duty, +) + + +@pytest.fixture +def session(): + engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False}) + Base.metadata.create_all(engine) + Session = sessionmaker(bind=engine, autocommit=False, autoflush=False) + s = Session() + try: + yield s + finally: + s.close() + + +@pytest.fixture +def user_a(session): + u = User( + telegram_user_id=None, + full_name="User A", + username=None, + first_name=None, + last_name=None, + ) + session.add(u) + session.commit() + session.refresh(u) + return u + + +def test_get_or_create_user_by_full_name_creates(session): + u = get_or_create_user_by_full_name(session, "Новый Пользователь") + assert u.id is not None + assert u.full_name == "Новый Пользователь" + assert u.telegram_user_id is None + + +def test_get_or_create_user_by_full_name_returns_existing(session, user_a): + u = get_or_create_user_by_full_name(session, "User A") + assert u.id == user_a.id + assert u.full_name == "User A" + + +def test_delete_duties_in_range_removes_only_in_range(session, user_a): + # Duties: 2026-02-01 06:00 - 2026-02-02 06:00; 2026-02-15 - 2026-02-16; 2026-02-28 - 2026-03-01 + insert_duty( + session, + user_a.id, + "2026-02-01T06:00:00Z", + "2026-02-02T06:00:00Z", + ) + insert_duty( + session, + user_a.id, + "2026-02-15T06:00:00Z", + "2026-02-16T06:00:00Z", + ) + insert_duty( + session, + user_a.id, + "2026-02-28T06:00:00Z", + "2026-03-01T06:00:00Z", + ) + deleted = delete_duties_in_range(session, user_a.id, "2026-02-10", "2026-02-20") + assert deleted == 1 + remaining = get_duties(session, "2026-01-01", "2026-03-31") + assert len(remaining) == 2 + starts = [d[0].start_at for d in remaining] + assert "2026-02-01T06:00:00Z" in starts + assert "2026-02-28T06:00:00Z" in starts + assert "2026-02-15T06:00:00Z" not in starts + + +def test_delete_duties_in_range_other_user_unchanged(session, user_a): + user_b = get_or_create_user_by_full_name(session, "User B") + insert_duty(session, user_a.id, "2026-02-10T06:00:00Z", "2026-02-11T06:00:00Z") + insert_duty(session, user_b.id, "2026-02-10T06:00:00Z", "2026-02-11T06:00:00Z") + delete_duties_in_range(session, user_a.id, "2026-02-01", "2026-02-28") + remaining = get_duties(session, "2026-02-01", "2026-02-28") + assert len(remaining) == 1 + assert remaining[0][1] == "User B"