Skip to content

API Reference

Generated from the duty_teller package. The following subpackages and modules are included.

Configuration

duty_teller.config

Load configuration from environment (e.g. .env via python-dotenv).

BOT_TOKEN is not validated on import; call require_bot_token() in the entry point when running the bot.

Settings dataclass

Injectable settings built from environment. Used in tests or when env is overridden.

Source code in duty_teller/config.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@dataclass(frozen=True)
class Settings:
    """Injectable settings built from environment. Used in tests or when env is overridden."""

    bot_token: str
    database_url: str
    mini_app_base_url: str
    http_port: int
    allowed_usernames: set[str]
    admin_usernames: set[str]
    allowed_phones: set[str]
    admin_phones: set[str]
    mini_app_skip_auth: bool
    init_data_max_age_seconds: int
    cors_origins: list[str]
    external_calendar_ics_url: str
    duty_display_tz: str
    default_language: str

    @classmethod
    def from_env(cls) -> "Settings":
        """Build Settings from current environment (same logic as module-level variables).

        Returns:
            Settings instance with all fields populated from env.
        """
        bot_token = os.getenv("BOT_TOKEN") or ""
        raw_allowed = os.getenv("ALLOWED_USERNAMES", "").strip()
        allowed = {
            s.strip().lstrip("@").lower() for s in raw_allowed.split(",") if s.strip()
        }
        raw_admin = os.getenv("ADMIN_USERNAMES", "").strip()
        admin = {
            s.strip().lstrip("@").lower() for s in raw_admin.split(",") if s.strip()
        }
        allowed_phones = _parse_phone_list(os.getenv("ALLOWED_PHONES", ""))
        admin_phones = _parse_phone_list(os.getenv("ADMIN_PHONES", ""))
        raw_cors = os.getenv("CORS_ORIGINS", "").strip()
        cors = (
            [_o.strip() for _o in raw_cors.split(",") if _o.strip()]
            if raw_cors and raw_cors != "*"
            else ["*"]
        )
        return cls(
            bot_token=bot_token,
            database_url=os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db"),
            mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"),
            http_port=int(os.getenv("HTTP_PORT", "8080")),
            allowed_usernames=allowed,
            admin_usernames=admin,
            allowed_phones=allowed_phones,
            admin_phones=admin_phones,
            mini_app_skip_auth=os.getenv("MINI_APP_SKIP_AUTH", "").strip()
            in ("1", "true", "yes"),
            init_data_max_age_seconds=int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0")),
            cors_origins=cors,
            external_calendar_ics_url=os.getenv(
                "EXTERNAL_CALENDAR_ICS_URL", ""
            ).strip(),
            duty_display_tz=os.getenv("DUTY_DISPLAY_TZ", "Europe/Moscow").strip()
            or "Europe/Moscow",
            default_language=_normalize_default_language(
                os.getenv("DEFAULT_LANGUAGE", "en").strip()
            ),
        )

from_env() classmethod

Build Settings from current environment (same logic as module-level variables).

Returns:

Type Description
Settings

Settings instance with all fields populated from env.

Source code in duty_teller/config.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def from_env(cls) -> "Settings":
    """Build Settings from current environment (same logic as module-level variables).

    Returns:
        Settings instance with all fields populated from env.
    """
    bot_token = os.getenv("BOT_TOKEN") or ""
    raw_allowed = os.getenv("ALLOWED_USERNAMES", "").strip()
    allowed = {
        s.strip().lstrip("@").lower() for s in raw_allowed.split(",") if s.strip()
    }
    raw_admin = os.getenv("ADMIN_USERNAMES", "").strip()
    admin = {
        s.strip().lstrip("@").lower() for s in raw_admin.split(",") if s.strip()
    }
    allowed_phones = _parse_phone_list(os.getenv("ALLOWED_PHONES", ""))
    admin_phones = _parse_phone_list(os.getenv("ADMIN_PHONES", ""))
    raw_cors = os.getenv("CORS_ORIGINS", "").strip()
    cors = (
        [_o.strip() for _o in raw_cors.split(",") if _o.strip()]
        if raw_cors and raw_cors != "*"
        else ["*"]
    )
    return cls(
        bot_token=bot_token,
        database_url=os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db"),
        mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"),
        http_port=int(os.getenv("HTTP_PORT", "8080")),
        allowed_usernames=allowed,
        admin_usernames=admin,
        allowed_phones=allowed_phones,
        admin_phones=admin_phones,
        mini_app_skip_auth=os.getenv("MINI_APP_SKIP_AUTH", "").strip()
        in ("1", "true", "yes"),
        init_data_max_age_seconds=int(os.getenv("INIT_DATA_MAX_AGE_SECONDS", "0")),
        cors_origins=cors,
        external_calendar_ics_url=os.getenv(
            "EXTERNAL_CALENDAR_ICS_URL", ""
        ).strip(),
        duty_display_tz=os.getenv("DUTY_DISPLAY_TZ", "Europe/Moscow").strip()
        or "Europe/Moscow",
        default_language=_normalize_default_language(
            os.getenv("DEFAULT_LANGUAGE", "en").strip()
        ),
    )

can_access_miniapp(username)

Check if username is allowed to open the calendar Miniapp.

Parameters:

Name Type Description Default
username str

Telegram username (with or without @; case-insensitive).

required

Returns:

Type Description
bool

True if in ALLOWED_USERNAMES or ADMIN_USERNAMES.

Source code in duty_teller/config.py
172
173
174
175
176
177
178
179
180
181
182
def can_access_miniapp(username: str) -> bool:
    """Check if username is allowed to open the calendar Miniapp.

    Args:
        username: Telegram username (with or without @; case-insensitive).

    Returns:
        True if in ALLOWED_USERNAMES or ADMIN_USERNAMES.
    """
    u = (username or "").strip().lower()
    return u in ALLOWED_USERNAMES or u in ADMIN_USERNAMES

can_access_miniapp_by_phone(phone)

Check if phone (set via /set_phone) is allowed to open the Miniapp.

Parameters:

Name Type Description Default
phone str | None

Raw phone string or None.

required

Returns:

Type Description
bool

True if normalized phone is in ALLOWED_PHONES or ADMIN_PHONES.

Source code in duty_teller/config.py
185
186
187
188
189
190
191
192
193
194
195
196
197
def can_access_miniapp_by_phone(phone: str | None) -> bool:
    """Check if phone (set via /set_phone) is allowed to open the Miniapp.

    Args:
        phone: Raw phone string or None.

    Returns:
        True if normalized phone is in ALLOWED_PHONES or ADMIN_PHONES.
    """
    normalized = normalize_phone(phone)
    if not normalized:
        return False
    return normalized in ALLOWED_PHONES or normalized in ADMIN_PHONES

is_admin(username)

Check if Telegram username is in ADMIN_USERNAMES.

Parameters:

Name Type Description Default
username str

Telegram username (with or without @; case-insensitive).

required

Returns:

Type Description
bool

True if in ADMIN_USERNAMES.

Source code in duty_teller/config.py
160
161
162
163
164
165
166
167
168
169
def is_admin(username: str) -> bool:
    """Check if Telegram username is in ADMIN_USERNAMES.

    Args:
        username: Telegram username (with or without @; case-insensitive).

    Returns:
        True if in ADMIN_USERNAMES.
    """
    return (username or "").strip().lower() in ADMIN_USERNAMES

is_admin_by_phone(phone)

Check if phone is in ADMIN_PHONES.

Parameters:

Name Type Description Default
phone str | None

Raw phone string or None.

required

Returns:

Type Description
bool

True if normalized phone is in ADMIN_PHONES.

Source code in duty_teller/config.py
200
201
202
203
204
205
206
207
208
209
210
def is_admin_by_phone(phone: str | None) -> bool:
    """Check if phone is in ADMIN_PHONES.

    Args:
        phone: Raw phone string or None.

    Returns:
        True if normalized phone is in ADMIN_PHONES.
    """
    normalized = normalize_phone(phone)
    return bool(normalized and normalized in ADMIN_PHONES)

normalize_phone(phone)

Return phone as digits only (spaces, +, parentheses, dashes removed).

Parameters:

Name Type Description Default
phone str | None

Raw phone string or None.

required

Returns:

Type Description
str

Digits-only string, or empty string if None or empty.

Source code in duty_teller/config.py
23
24
25
26
27
28
29
30
31
32
33
34
def normalize_phone(phone: str | None) -> str:
    """Return phone as digits only (spaces, +, parentheses, dashes removed).

    Args:
        phone: Raw phone string or None.

    Returns:
        Digits-only string, or empty string if None or empty.
    """
    if not phone or not isinstance(phone, str):
        return ""
    return _PHONE_DIGITS_RE.sub("", phone.strip())

require_bot_token()

Raise SystemExit with a clear message if BOT_TOKEN is not set.

Call from the application entry point (e.g. main.py or duty_teller.run) so the process exits with a helpful message instead of failing later.

Raises:

Type Description
SystemExit

If BOT_TOKEN is empty.

Source code in duty_teller/config.py
213
214
215
216
217
218
219
220
221
222
223
224
225
def require_bot_token() -> None:
    """Raise SystemExit with a clear message if BOT_TOKEN is not set.

    Call from the application entry point (e.g. main.py or duty_teller.run) so the
    process exits with a helpful message instead of failing later.

    Raises:
        SystemExit: If BOT_TOKEN is empty.
    """
    if not BOT_TOKEN:
        raise SystemExit(
            "BOT_TOKEN is not set. Copy .env.example to .env and set your token from @BotFather."
        )

API (FastAPI and auth)

duty_teller.api

HTTP API for the calendar Mini App: duties, calendar events, and static webapp.

duty_teller.api.app

FastAPI app: /api/duties, /api/calendar-events, personal ICS, and static webapp at /app.

get_personal_calendar_ical(token, session=Depends(get_db_session))

Return ICS calendar with only the subscribing user's duties. No Telegram auth; access is by secret token in the URL.

Source code in duty_teller/api/app.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@app.get(
    "/api/calendar/ical/{token}.ics",
    summary="Personal calendar ICS",
    description="Returns an ICS calendar with only the subscribing user's duties. No Telegram auth; access is by secret token in the URL.",
)
def get_personal_calendar_ical(
    token: str,
    session: Session = Depends(get_db_session),
) -> Response:
    """
    Return ICS calendar with only the subscribing user's duties.
    No Telegram auth; access is by secret token in the URL.
    """
    user = get_user_by_calendar_token(session, token)
    if user is None:
        return Response(status_code=404, content="Not found")
    today = date.today()
    from_date = (today - timedelta(days=365)).strftime("%Y-%m-%d")
    to_date = (today + timedelta(days=365 * 2)).strftime("%Y-%m-%d")
    duties_with_name = get_duties_for_user(
        session, user.id, from_date=from_date, to_date=to_date
    )
    ics_bytes = build_personal_ics(duties_with_name)
    return Response(
        content=ics_bytes,
        media_type="text/calendar; charset=utf-8",
    )

duty_teller.api.dependencies

FastAPI dependencies: DB session, Miniapp auth (initData/allowlist), date validation.

fetch_duties_response(session, from_date, to_date)

Load duties in range and return as DutyWithUser list for API response.

Parameters:

Name Type Description Default
session Session

DB session.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
list[DutyWithUser]

List of DutyWithUser (id, user_id, start_at, end_at, full_name, event_type).

Source code in duty_teller/api/dependencies.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def fetch_duties_response(
    session: Session, from_date: str, to_date: str
) -> list[DutyWithUser]:
    """Load duties in range and return as DutyWithUser list for API response.

    Args:
        session: DB session.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        List of DutyWithUser (id, user_id, start_at, end_at, full_name, event_type).
    """
    rows = get_duties(session, from_date=from_date, to_date=to_date)
    return [
        DutyWithUser(
            id=duty.id,
            user_id=duty.user_id,
            start_at=duty.start_at,
            end_at=duty.end_at,
            full_name=full_name,
            event_type=(
                duty.event_type
                if duty.event_type in ("duty", "unavailable", "vacation")
                else "duty"
            ),
        )
        for duty, full_name in rows
    ]

get_authenticated_username(request, x_telegram_init_data, session)

Return identifier for miniapp auth (username or full_name or id:...); empty if skip-auth.

Parameters:

Name Type Description Default
request Request

FastAPI request (client host for private-IP bypass).

required
x_telegram_init_data str | None

Raw X-Telegram-Init-Data header value.

required
session Session

DB session (for phone allowlist lookup).

required

Returns:

Type Description
str

Username, full_name, or "id:"; empty string if MINI_APP_SKIP_AUTH

str

or private IP and no initData.

Raises:

Type Description
HTTPException

403 if initData missing/invalid or user not in allowlist.

Source code in duty_teller/api/dependencies.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_authenticated_username(
    request: Request,
    x_telegram_init_data: str | None,
    session: Session,
) -> str:
    """Return identifier for miniapp auth (username or full_name or id:...); empty if skip-auth.

    Args:
        request: FastAPI request (client host for private-IP bypass).
        x_telegram_init_data: Raw X-Telegram-Init-Data header value.
        session: DB session (for phone allowlist lookup).

    Returns:
        Username, full_name, or "id:<telegram_id>"; empty string if MINI_APP_SKIP_AUTH
        or private IP and no initData.

    Raises:
        HTTPException: 403 if initData missing/invalid or user not in allowlist.
    """
    if config.MINI_APP_SKIP_AUTH:
        log.warning("allowing without any auth check (MINI_APP_SKIP_AUTH is set)")
        return ""
    init_data = (x_telegram_init_data or "").strip()
    if not init_data:
        client_host = request.client.host if request.client else None
        if _is_private_client(client_host):
            return ""
        log.warning("no X-Telegram-Init-Data header (client=%s)", client_host)
        lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
        raise HTTPException(status_code=403, detail=t(lang, "api.open_from_telegram"))
    max_age = config.INIT_DATA_MAX_AGE_SECONDS or None
    telegram_user_id, username, auth_reason, lang = validate_init_data_with_reason(
        init_data, config.BOT_TOKEN, max_age_seconds=max_age
    )
    if auth_reason != "ok":
        log.warning("initData validation failed: %s", auth_reason)
        raise HTTPException(
            status_code=403, detail=_auth_error_detail(auth_reason, lang)
        )
    if username and config.can_access_miniapp(username):
        return username
    failed_phone: str | None = None
    if telegram_user_id is not None:
        user = get_user_by_telegram_id(session, telegram_user_id)
        if user and user.phone and config.can_access_miniapp_by_phone(user.phone):
            return username or (user.full_name or "") or f"id:{telegram_user_id}"
        if user and user.phone:
            failed_phone = config.normalize_phone(user.phone)
    log.warning(
        "username/phone not in allowlist (username=%s, telegram_id=%s, phone=%s)",
        username,
        telegram_user_id,
        failed_phone if failed_phone else "—",
    )
    raise HTTPException(status_code=403, detail=t(lang, "api.access_denied"))

get_db_session()

Yield a DB session for the request; closed automatically by FastAPI.

Source code in duty_teller/api/dependencies.py
86
87
88
89
def get_db_session() -> Generator[Session, None, None]:
    """Yield a DB session for the request; closed automatically by FastAPI."""
    with session_scope(config.DATABASE_URL) as session:
        yield session

get_validated_dates(request, from_date=Query(..., description='ISO date YYYY-MM-DD', alias='from'), to_date=Query(..., description='ISO date YYYY-MM-DD', alias='to'))

Validate from/to date query params; use Accept-Language for error messages.

Parameters:

Name Type Description Default
request Request

FastAPI request (for Accept-Language).

required
from_date str

Start date YYYY-MM-DD.

Query(..., description='ISO date YYYY-MM-DD', alias='from')
to_date str

End date YYYY-MM-DD.

Query(..., description='ISO date YYYY-MM-DD', alias='to')

Returns:

Type Description
tuple[str, str]

(from_date, to_date) as strings.

Raises:

Type Description
HTTPException

400 if format invalid or from_date > to_date.

Source code in duty_teller/api/dependencies.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def get_validated_dates(
    request: Request,
    from_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="from"),
    to_date: str = Query(..., description="ISO date YYYY-MM-DD", alias="to"),
) -> tuple[str, str]:
    """Validate from/to date query params; use Accept-Language for error messages.

    Args:
        request: FastAPI request (for Accept-Language).
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        (from_date, to_date) as strings.

    Raises:
        HTTPException: 400 if format invalid or from_date > to_date.
    """
    lang = _lang_from_accept_language(request.headers.get("Accept-Language"))
    _validate_duty_dates(from_date, to_date, lang)
    return (from_date, to_date)

require_miniapp_username(request, x_telegram_init_data=None, session=Depends(get_db_session))

FastAPI dependency: require valid Miniapp auth; return username/identifier.

Raises:

Type Description
HTTPException

403 if initData missing/invalid or user not in allowlist.

Source code in duty_teller/api/dependencies.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def require_miniapp_username(
    request: Request,
    x_telegram_init_data: Annotated[
        str | None, Header(alias="X-Telegram-Init-Data")
    ] = None,
    session: Session = Depends(get_db_session),
) -> str:
    """FastAPI dependency: require valid Miniapp auth; return username/identifier.

    Raises:
        HTTPException: 403 if initData missing/invalid or user not in allowlist.
    """
    return get_authenticated_username(request, x_telegram_init_data, session)

duty_teller.api.telegram_auth

Validate Telegram Web App initData and extract user username.

validate_init_data(init_data, bot_token, max_age_seconds=None)

Validate Telegram Web App initData and return username if valid.

Parameters:

Name Type Description Default
init_data str

Raw initData string from tgWebAppData.

required
bot_token str

Bot token (must match the bot that signed the data).

required
max_age_seconds int | None

Reject if auth_date older than this; None to disable.

None

Returns:

Type Description
str | None

Username (lowercase, no @) or None if validation fails.

Source code in duty_teller/api/telegram_auth.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def validate_init_data(
    init_data: str,
    bot_token: str,
    max_age_seconds: int | None = None,
) -> str | None:
    """Validate Telegram Web App initData and return username if valid.

    Args:
        init_data: Raw initData string from tgWebAppData.
        bot_token: Bot token (must match the bot that signed the data).
        max_age_seconds: Reject if auth_date older than this; None to disable.

    Returns:
        Username (lowercase, no @) or None if validation fails.
    """
    _, username, _, _ = validate_init_data_with_reason(
        init_data, bot_token, max_age_seconds
    )
    return username

validate_init_data_with_reason(init_data, bot_token, max_age_seconds=None)

Validate initData signature and return user id, username, reason, and lang.

Parameters:

Name Type Description Default
init_data str

Raw initData string from tgWebAppData.

required
bot_token str

Bot token (must match the bot that signed the data).

required
max_age_seconds int | None

Reject if auth_date older than this; None to disable.

None

Returns:

Type Description
int | None

Tuple (telegram_user_id, username, reason, lang). reason is one of: "ok",

str | None

"empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user",

str

"user_invalid", "no_user_id". lang is from user.language_code normalized

str

to 'ru' or 'en'; 'en' when no user. On success: (user.id, username or None,

tuple[int | None, str | None, str, str]

"ok", lang).

Source code in duty_teller/api/telegram_auth.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def validate_init_data_with_reason(
    init_data: str,
    bot_token: str,
    max_age_seconds: int | None = None,
) -> tuple[int | None, str | None, str, str]:
    """Validate initData signature and return user id, username, reason, and lang.

    Args:
        init_data: Raw initData string from tgWebAppData.
        bot_token: Bot token (must match the bot that signed the data).
        max_age_seconds: Reject if auth_date older than this; None to disable.

    Returns:
        Tuple (telegram_user_id, username, reason, lang). reason is one of: "ok",
        "empty", "no_hash", "hash_mismatch", "auth_date_expired", "no_user",
        "user_invalid", "no_user_id". lang is from user.language_code normalized
        to 'ru' or 'en'; 'en' when no user. On success: (user.id, username or None,
        "ok", lang).
    """
    if not init_data or not bot_token:
        return (None, None, "empty", "en")
    init_data = init_data.strip()
    params = {}
    for part in init_data.split("&"):
        if "=" not in part:
            continue
        key, _, value = part.partition("=")
        if not key:
            continue
        params[key] = value
    hash_val = params.pop("hash", None)
    if not hash_val:
        return (None, None, "no_hash", "en")
    data_pairs = sorted(params.items())
    # Data-check string: key=value with URL-decoded values (per Telegram example)
    data_string = "\n".join(f"{k}={unquote(v)}" for k, v in data_pairs)
    # HMAC-SHA256(key=WebAppData, message=bot_token) per reference implementations
    secret_key = hmac.new(
        b"WebAppData",
        msg=bot_token.encode(),
        digestmod=hashlib.sha256,
    ).digest()
    computed = hmac.new(
        secret_key,
        msg=data_string.encode(),
        digestmod=hashlib.sha256,
    ).hexdigest()
    if not hmac.compare_digest(computed.lower(), hash_val.lower()):
        return (None, None, "hash_mismatch", "en")
    if max_age_seconds is not None and max_age_seconds > 0:
        auth_date_raw = params.get("auth_date")
        if not auth_date_raw:
            return (None, None, "auth_date_expired", "en")
        try:
            auth_date = int(float(auth_date_raw))
        except (ValueError, TypeError):
            return (None, None, "auth_date_expired", "en")
        if time.time() - auth_date > max_age_seconds:
            return (None, None, "auth_date_expired", "en")
    user_raw = params.get("user")
    if not user_raw:
        return (None, None, "no_user", "en")
    try:
        user = json.loads(unquote(user_raw))
    except (json.JSONDecodeError, TypeError):
        return (None, None, "user_invalid", "en")
    if not isinstance(user, dict):
        return (None, None, "user_invalid", "en")
    lang = _normalize_lang(user.get("language_code"))
    raw_id = user.get("id")
    if raw_id is None:
        return (None, None, "no_user_id", lang)
    try:
        telegram_user_id = int(raw_id)
    except (TypeError, ValueError):
        return (None, None, "no_user_id", lang)
    username = user.get("username")
    if username and isinstance(username, str):
        username = username.strip().lstrip("@").lower()
    else:
        username = None
    return (telegram_user_id, username, "ok", lang)

duty_teller.api.calendar_ics

Fetch and parse external ICS calendar; in-memory cache with 7-day TTL.

get_calendar_events(url, from_date, to_date)

Fetch ICS from URL and return events in the given date range.

Uses in-memory cache with TTL 7 days. Recurring events are skipped. On fetch or parse error returns an empty list.

Parameters:

Name Type Description Default
url str

URL of the ICS calendar.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
list[dict]

List of dicts with keys "date" (YYYY-MM-DD) and "summary". Empty on error.

Source code in duty_teller/api/calendar_ics.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def get_calendar_events(
    url: str,
    from_date: str,
    to_date: str,
) -> list[dict]:
    """Fetch ICS from URL and return events in the given date range.

    Uses in-memory cache with TTL 7 days. Recurring events are skipped.
    On fetch or parse error returns an empty list.

    Args:
        url: URL of the ICS calendar.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        List of dicts with keys "date" (YYYY-MM-DD) and "summary". Empty on error.
    """
    if not url or from_date > to_date:
        return []

    now = datetime.now().timestamp()
    raw: bytes | None = None
    if url in _ics_cache:
        cached_at, cached_raw = _ics_cache[url]
        if now - cached_at < CACHE_TTL_SECONDS:
            raw = cached_raw
    if raw is None:
        raw = _fetch_ics(url)
        if raw is None:
            return []
        _ics_cache[url] = (now, raw)

    return _get_events_from_ics(raw, from_date, to_date)

duty_teller.api.personal_calendar_ics

Generate ICS calendar containing only one user's duties (for subscription link).

build_personal_ics(duties_with_name)

Build a VCALENDAR (ICS) with one VEVENT per duty.

Parameters:

Name Type Description Default
duties_with_name list[tuple[Duty, str]]

List of (Duty, full_name). full_name is available for DESCRIPTION; SUMMARY is taken from event_type (duty/unavailable/vacation).

required

Returns:

Type Description
bytes

ICS file content as bytes (UTF-8).

Source code in duty_teller/api/personal_calendar_ics.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def build_personal_ics(duties_with_name: list[tuple[Duty, str]]) -> bytes:
    """Build a VCALENDAR (ICS) with one VEVENT per duty.

    Args:
        duties_with_name: List of (Duty, full_name). full_name is available for
            DESCRIPTION; SUMMARY is taken from event_type (duty/unavailable/vacation).

    Returns:
        ICS file content as bytes (UTF-8).
    """
    cal = Calendar()
    cal.add("prodid", "-//Duty Teller//Personal Calendar//EN")
    cal.add("version", "2.0")
    cal.add("calscale", "GREGORIAN")

    for duty, _full_name in duties_with_name:
        event = Event()
        start_dt = _parse_utc_iso(duty.start_at)
        end_dt = _parse_utc_iso(duty.end_at)
        # Ensure timezone-aware for icalendar
        if start_dt.tzinfo is None:
            start_dt = start_dt.replace(tzinfo=timezone.utc)
        if end_dt.tzinfo is None:
            end_dt = end_dt.replace(tzinfo=timezone.utc)
        event.add("dtstart", start_dt)
        event.add("dtend", end_dt)
        summary = SUMMARY_BY_TYPE.get(
            duty.event_type if duty.event_type else "duty", "Duty"
        )
        event.add("summary", summary)
        event.add("uid", f"duty-{duty.id}@duty-teller")
        event.add("dtstamp", datetime.now(timezone.utc))
        cal.add_component(event)

    return cal.to_ical()

Database

duty_teller.db

Database layer: SQLAlchemy models, Pydantic schemas, repository, init.

Base

Bases: DeclarativeBase

Declarative base for all models.

Source code in duty_teller/db/models.py
 7
 8
 9
10
class Base(DeclarativeBase):
    """Declarative base for all models."""

    pass

Duty

Bases: Base

Single duty/unavailable/vacation slot (UTC start_at/end_at, event_type).

Source code in duty_teller/db/models.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Duty(Base):
    """Single duty/unavailable/vacation slot (UTC start_at/end_at, event_type)."""

    __tablename__ = "duties"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id"), nullable=False
    )
    # UTC, ISO 8601 with Z suffix (e.g. 2025-01-15T09:00:00Z)
    start_at: Mapped[str] = mapped_column(Text, nullable=False)
    end_at: Mapped[str] = mapped_column(Text, nullable=False)
    # duty | unavailable | vacation
    event_type: Mapped[str] = mapped_column(Text, nullable=False, server_default="duty")

    user: Mapped["User"] = relationship("User", back_populates="duties")

DutyCreate

Bases: DutyBase

Duty creation payload.

Source code in duty_teller/db/schemas.py
40
41
42
43
class DutyCreate(DutyBase):
    """Duty creation payload."""

    pass

DutyInDb

Bases: DutyBase

Duty as stored in DB (includes id).

Source code in duty_teller/db/schemas.py
46
47
48
49
50
51
class DutyInDb(DutyBase):
    """Duty as stored in DB (includes id)."""

    id: int

    model_config = ConfigDict(from_attributes=True)

DutyWithUser

Bases: DutyInDb

Duty with full_name and event_type for calendar display.

event_type: only these values are returned; unknown DB values are mapped to "duty" in the API.

Source code in duty_teller/db/schemas.py
54
55
56
57
58
59
60
61
62
63
class DutyWithUser(DutyInDb):
    """Duty with full_name and event_type for calendar display.

    event_type: only these values are returned; unknown DB values are mapped to "duty" in the API.
    """

    full_name: str
    event_type: Literal["duty", "unavailable", "vacation"] = "duty"

    model_config = ConfigDict(from_attributes=True)

User

Bases: Base

Telegram user and display name; may have telegram_user_id=None for import-only users.

Source code in duty_teller/db/models.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class User(Base):
    """Telegram user and display name; may have telegram_user_id=None for import-only users."""

    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    telegram_user_id: Mapped[int | None] = mapped_column(
        BigInteger, unique=True, nullable=True
    )
    full_name: Mapped[str] = mapped_column(Text, nullable=False)
    username: Mapped[str | None] = mapped_column(Text, nullable=True)
    first_name: Mapped[str | None] = mapped_column(Text, nullable=True)
    last_name: Mapped[str | None] = mapped_column(Text, nullable=True)
    phone: Mapped[str | None] = mapped_column(Text, nullable=True)
    name_manually_edited: Mapped[bool] = mapped_column(
        Boolean, nullable=False, server_default="0", default=False
    )

    duties: Mapped[list["Duty"]] = relationship("Duty", back_populates="user")

UserCreate

Bases: UserBase

User creation payload including Telegram user id.

Source code in duty_teller/db/schemas.py
17
18
19
20
class UserCreate(UserBase):
    """User creation payload including Telegram user id."""

    telegram_user_id: int

UserInDb

Bases: UserBase

User as stored in DB (includes id and telegram_user_id).

Source code in duty_teller/db/schemas.py
23
24
25
26
27
28
29
class UserInDb(UserBase):
    """User as stored in DB (includes id and telegram_user_id)."""

    id: int
    telegram_user_id: int

    model_config = ConfigDict(from_attributes=True)

delete_duties_in_range(session, user_id, from_date, to_date)

Delete all duties of the user that overlap the given date range.

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
int

Number of duties deleted.

Source code in duty_teller/db/repository.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def delete_duties_in_range(
    session: Session,
    user_id: int,
    from_date: str,
    to_date: str,
) -> int:
    """Delete all duties of the user that overlap the given date range.

    Args:
        session: DB session.
        user_id: User id.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        Number of duties deleted.
    """
    to_next = (
        datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)
    ).strftime("%Y-%m-%d")
    q = session.query(Duty).filter(
        Duty.user_id == user_id,
        Duty.start_at < to_next,
        Duty.end_at >= from_date,
    )
    count = q.count()
    q.delete(synchronize_session=False)
    session.commit()
    return count

get_duties(session, from_date, to_date)

Return duties overlapping the given date range with user full_name.

Parameters:

Name Type Description Default
session Session

DB session.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
list[tuple[Duty, str]]

List of (Duty, full_name) tuples.

Source code in duty_teller/db/repository.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_duties(
    session: Session,
    from_date: str,
    to_date: str,
) -> list[tuple[Duty, str]]:
    """Return duties overlapping the given date range with user full_name.

    Args:
        session: DB session.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        List of (Duty, full_name) tuples.
    """
    to_date_next = (
        datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)
    ).strftime("%Y-%m-%d")
    q = (
        session.query(Duty, User.full_name)
        .join(User, Duty.user_id == User.id)
        .filter(Duty.start_at < to_date_next, Duty.end_at >= from_date)
    )
    return list(q.all())

get_engine(database_url)

Return cached SQLAlchemy engine for the given URL (one per process).

Source code in duty_teller/db/session.py
42
43
44
45
46
47
48
49
50
51
52
53
def get_engine(database_url: str):
    """Return cached SQLAlchemy engine for the given URL (one per process)."""
    global _engine
    if _engine is None:
        _engine = create_engine(
            database_url,
            connect_args={"check_same_thread": False}
            if "sqlite" in database_url
            else {},
            echo=False,
        )
    return _engine

get_or_create_user(session, telegram_user_id, full_name, username=None, first_name=None, last_name=None)

Get or create user by Telegram user ID.

On create, name fields come from Telegram. On update: username is always synced; full_name, first_name, last_name are updated only if name_manually_edited is False (otherwise existing display name is kept).

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
full_name str

Display full name.

required
username str | None

Telegram username (optional).

None
first_name str | None

Telegram first name (optional).

None
last_name str | None

Telegram last name (optional).

None

Returns:

Type Description
User

User instance (created or updated).

Source code in duty_teller/db/repository.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_or_create_user(
    session: Session,
    telegram_user_id: int,
    full_name: str,
    username: str | None = None,
    first_name: str | None = None,
    last_name: str | None = None,
) -> User:
    """Get or create user by Telegram user ID.

    On create, name fields come from Telegram. On update: username is always
    synced; full_name, first_name, last_name are updated only if
    name_manually_edited is False (otherwise existing display name is kept).

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        full_name: Display full name.
        username: Telegram username (optional).
        first_name: Telegram first name (optional).
        last_name: Telegram last name (optional).

    Returns:
        User instance (created or updated).
    """
    user = get_user_by_telegram_id(session, telegram_user_id)
    if user:
        user.username = username
        if not user.name_manually_edited:
            user.full_name = full_name
            user.first_name = first_name
            user.last_name = last_name
        session.commit()
        session.refresh(user)
        return user
    user = User(
        telegram_user_id=telegram_user_id,
        full_name=full_name,
        username=username,
        first_name=first_name,
        last_name=last_name,
        name_manually_edited=False,
    )
    session.add(user)
    session.commit()
    session.refresh(user)
    return user

get_or_create_user_by_full_name(session, full_name)

Find user by exact full_name or create one (for duty-schedule import).

New users have telegram_user_id=None and name_manually_edited=True.

Parameters:

Name Type Description Default
session Session

DB session.

required
full_name str

Exact full name to match or set.

required

Returns:

Type Description
User

User instance (existing or newly created).

Source code in duty_teller/db/repository.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_or_create_user_by_full_name(session: Session, full_name: str) -> User:
    """Find user by exact full_name or create one (for duty-schedule import).

    New users have telegram_user_id=None and name_manually_edited=True.

    Args:
        session: DB session.
        full_name: Exact full name to match or set.

    Returns:
        User instance (existing or newly created).
    """
    user = session.query(User).filter(User.full_name == full_name).first()
    if user:
        return user
    user = User(
        telegram_user_id=None,
        full_name=full_name,
        username=None,
        first_name=None,
        last_name=None,
        name_manually_edited=True,
    )
    session.add(user)
    session.commit()
    session.refresh(user)
    return user

get_session(database_url)

Create a new session from the factory for the given URL.

Source code in duty_teller/db/session.py
65
66
67
def get_session(database_url: str) -> Session:
    """Create a new session from the factory for the given URL."""
    return get_session_factory(database_url)()

get_session_factory(database_url)

Return cached session factory for the given URL (one per process).

Source code in duty_teller/db/session.py
56
57
58
59
60
61
62
def get_session_factory(database_url: str) -> sessionmaker[Session]:
    """Return cached session factory for the given URL (one per process)."""
    global _SessionLocal
    if _SessionLocal is None:
        engine = get_engine(database_url)
        _SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    return _SessionLocal

init_db(database_url)

Create all tables from SQLAlchemy metadata.

Prefer Alembic migrations for schema changes in production.

Parameters:

Name Type Description Default
database_url str

SQLAlchemy database URL.

required
Source code in duty_teller/db/__init__.py
51
52
53
54
55
56
57
58
59
60
def init_db(database_url: str) -> None:
    """Create all tables from SQLAlchemy metadata.

    Prefer Alembic migrations for schema changes in production.

    Args:
        database_url: SQLAlchemy database URL.
    """
    engine = get_engine(database_url)
    Base.metadata.create_all(bind=engine)

insert_duty(session, user_id, start_at, end_at, event_type='duty')

Create a duty record.

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required
start_at str

Start time UTC, ISO 8601 with Z (e.g. 2025-01-15T09:00:00Z).

required
end_at str

End time UTC, ISO 8601 with Z.

required
event_type str

One of "duty", "unavailable", "vacation". Default "duty".

'duty'

Returns:

Type Description
Duty

Created Duty instance.

Source code in duty_teller/db/repository.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def insert_duty(
    session: Session,
    user_id: int,
    start_at: str,
    end_at: str,
    event_type: str = "duty",
) -> Duty:
    """Create a duty record.

    Args:
        session: DB session.
        user_id: User id.
        start_at: Start time UTC, ISO 8601 with Z (e.g. 2025-01-15T09:00:00Z).
        end_at: End time UTC, ISO 8601 with Z.
        event_type: One of "duty", "unavailable", "vacation". Default "duty".

    Returns:
        Created Duty instance.
    """
    duty = Duty(
        user_id=user_id,
        start_at=start_at,
        end_at=end_at,
        event_type=event_type,
    )
    session.add(duty)
    session.commit()
    session.refresh(duty)
    return duty

session_scope(database_url)

Context manager that yields a session; rolls back on exception, closes on exit.

Parameters:

Name Type Description Default
database_url str

SQLAlchemy database URL.

required

Yields:

Type Description
Session

Session instance. Caller must not use it after exit.

Source code in duty_teller/db/session.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@contextmanager
def session_scope(database_url: str) -> Generator[Session, None, None]:
    """Context manager that yields a session; rolls back on exception, closes on exit.

    Args:
        database_url: SQLAlchemy database URL.

    Yields:
        Session instance. Caller must not use it after exit.
    """
    session = get_session(database_url)
    try:
        yield session
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

set_user_phone(session, telegram_user_id, phone)

Set or clear phone for user by Telegram user id.

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
phone str | None

Phone string or None to clear.

required

Returns:

Type Description
User | None

Updated User or None if not found.

Source code in duty_teller/db/repository.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def set_user_phone(
    session: Session, telegram_user_id: int, phone: str | None
) -> User | None:
    """Set or clear phone for user by Telegram user id.

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        phone: Phone string or None to clear.

    Returns:
        Updated User or None if not found.
    """
    user = session.query(User).filter(User.telegram_user_id == telegram_user_id).first()
    if not user:
        return None
    user.phone = phone
    session.commit()
    session.refresh(user)
    return user

update_user_display_name(session, telegram_user_id, full_name, first_name=None, last_name=None)

Update display name and set name_manually_edited=True.

Use from API or admin when name is changed manually; subsequent get_or_create_user will not overwrite these fields.

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
full_name str

New full name.

required
first_name str | None

New first name (optional).

None
last_name str | None

New last name (optional).

None

Returns:

Type Description
User | None

Updated User or None if not found.

Source code in duty_teller/db/repository.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def update_user_display_name(
    session: Session,
    telegram_user_id: int,
    full_name: str,
    first_name: str | None = None,
    last_name: str | None = None,
) -> User | None:
    """Update display name and set name_manually_edited=True.

    Use from API or admin when name is changed manually; subsequent
    get_or_create_user will not overwrite these fields.

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        full_name: New full name.
        first_name: New first name (optional).
        last_name: New last name (optional).

    Returns:
        Updated User or None if not found.
    """
    user = session.query(User).filter(User.telegram_user_id == telegram_user_id).first()
    if not user:
        return None
    user.full_name = full_name
    user.first_name = first_name
    user.last_name = last_name
    user.name_manually_edited = True
    session.commit()
    session.refresh(user)
    return user

duty_teller.db.models

SQLAlchemy ORM models for users and duties.

Base

Bases: DeclarativeBase

Declarative base for all models.

Source code in duty_teller/db/models.py
 7
 8
 9
10
class Base(DeclarativeBase):
    """Declarative base for all models."""

    pass

CalendarSubscriptionToken

Bases: Base

One active calendar subscription token per user; token_hash is unique.

Source code in duty_teller/db/models.py
34
35
36
37
38
39
40
41
42
43
44
class CalendarSubscriptionToken(Base):
    """One active calendar subscription token per user; token_hash is unique."""

    __tablename__ = "calendar_subscription_tokens"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id"), nullable=False
    )
    token_hash: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
    created_at: Mapped[str] = mapped_column(Text, nullable=False)

Duty

Bases: Base

Single duty/unavailable/vacation slot (UTC start_at/end_at, event_type).

Source code in duty_teller/db/models.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Duty(Base):
    """Single duty/unavailable/vacation slot (UTC start_at/end_at, event_type)."""

    __tablename__ = "duties"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id"), nullable=False
    )
    # UTC, ISO 8601 with Z suffix (e.g. 2025-01-15T09:00:00Z)
    start_at: Mapped[str] = mapped_column(Text, nullable=False)
    end_at: Mapped[str] = mapped_column(Text, nullable=False)
    # duty | unavailable | vacation
    event_type: Mapped[str] = mapped_column(Text, nullable=False, server_default="duty")

    user: Mapped["User"] = relationship("User", back_populates="duties")

GroupDutyPin

Bases: Base

Stores which message to update in each group for the pinned duty notice.

Source code in duty_teller/db/models.py
65
66
67
68
69
70
71
class GroupDutyPin(Base):
    """Stores which message to update in each group for the pinned duty notice."""

    __tablename__ = "group_duty_pins"

    chat_id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
    message_id: Mapped[int] = mapped_column(Integer, nullable=False)

User

Bases: Base

Telegram user and display name; may have telegram_user_id=None for import-only users.

Source code in duty_teller/db/models.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class User(Base):
    """Telegram user and display name; may have telegram_user_id=None for import-only users."""

    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    telegram_user_id: Mapped[int | None] = mapped_column(
        BigInteger, unique=True, nullable=True
    )
    full_name: Mapped[str] = mapped_column(Text, nullable=False)
    username: Mapped[str | None] = mapped_column(Text, nullable=True)
    first_name: Mapped[str | None] = mapped_column(Text, nullable=True)
    last_name: Mapped[str | None] = mapped_column(Text, nullable=True)
    phone: Mapped[str | None] = mapped_column(Text, nullable=True)
    name_manually_edited: Mapped[bool] = mapped_column(
        Boolean, nullable=False, server_default="0", default=False
    )

    duties: Mapped[list["Duty"]] = relationship("Duty", back_populates="user")

duty_teller.db.schemas

Pydantic schemas for API request/response and validation.

CalendarEvent

Bases: BaseModel

External calendar event (e.g. holiday) for a single day.

Source code in duty_teller/db/schemas.py
66
67
68
69
70
class CalendarEvent(BaseModel):
    """External calendar event (e.g. holiday) for a single day."""

    date: str  # YYYY-MM-DD
    summary: str

DutyBase

Bases: BaseModel

Duty fields: user_id, start_at, end_at (UTC ISO 8601 with Z).

Source code in duty_teller/db/schemas.py
32
33
34
35
36
37
class DutyBase(BaseModel):
    """Duty fields: user_id, start_at, end_at (UTC ISO 8601 with Z)."""

    user_id: int
    start_at: str  # UTC, ISO 8601 with Z
    end_at: str  # UTC, ISO 8601 with Z

DutyCreate

Bases: DutyBase

Duty creation payload.

Source code in duty_teller/db/schemas.py
40
41
42
43
class DutyCreate(DutyBase):
    """Duty creation payload."""

    pass

DutyInDb

Bases: DutyBase

Duty as stored in DB (includes id).

Source code in duty_teller/db/schemas.py
46
47
48
49
50
51
class DutyInDb(DutyBase):
    """Duty as stored in DB (includes id)."""

    id: int

    model_config = ConfigDict(from_attributes=True)

DutyWithUser

Bases: DutyInDb

Duty with full_name and event_type for calendar display.

event_type: only these values are returned; unknown DB values are mapped to "duty" in the API.

Source code in duty_teller/db/schemas.py
54
55
56
57
58
59
60
61
62
63
class DutyWithUser(DutyInDb):
    """Duty with full_name and event_type for calendar display.

    event_type: only these values are returned; unknown DB values are mapped to "duty" in the API.
    """

    full_name: str
    event_type: Literal["duty", "unavailable", "vacation"] = "duty"

    model_config = ConfigDict(from_attributes=True)

UserBase

Bases: BaseModel

Base user fields (full_name, username, first/last name).

Source code in duty_teller/db/schemas.py
 8
 9
10
11
12
13
14
class UserBase(BaseModel):
    """Base user fields (full_name, username, first/last name)."""

    full_name: str
    username: str | None = None
    first_name: str | None = None
    last_name: str | None = None

UserCreate

Bases: UserBase

User creation payload including Telegram user id.

Source code in duty_teller/db/schemas.py
17
18
19
20
class UserCreate(UserBase):
    """User creation payload including Telegram user id."""

    telegram_user_id: int

UserInDb

Bases: UserBase

User as stored in DB (includes id and telegram_user_id).

Source code in duty_teller/db/schemas.py
23
24
25
26
27
28
29
class UserInDb(UserBase):
    """User as stored in DB (includes id and telegram_user_id)."""

    id: int
    telegram_user_id: int

    model_config = ConfigDict(from_attributes=True)

duty_teller.db.session

SQLAlchemy engine and session factory.

Engine and session factory are cached globally per process. Only one DATABASE_URL is effectively used for the process lifetime. Using a different URL later (e.g. in tests with in-memory SQLite) would still use the first engine. To use a different URL in tests, set env (e.g. DATABASE_URL) before the first import of this module, or clear _engine and _SessionLocal in test fixtures. Prefer session_scope() for all callers so sessions are always closed and rolled back on error.

get_engine(database_url)

Return cached SQLAlchemy engine for the given URL (one per process).

Source code in duty_teller/db/session.py
42
43
44
45
46
47
48
49
50
51
52
53
def get_engine(database_url: str):
    """Return cached SQLAlchemy engine for the given URL (one per process)."""
    global _engine
    if _engine is None:
        _engine = create_engine(
            database_url,
            connect_args={"check_same_thread": False}
            if "sqlite" in database_url
            else {},
            echo=False,
        )
    return _engine

get_session(database_url)

Create a new session from the factory for the given URL.

Source code in duty_teller/db/session.py
65
66
67
def get_session(database_url: str) -> Session:
    """Create a new session from the factory for the given URL."""
    return get_session_factory(database_url)()

get_session_factory(database_url)

Return cached session factory for the given URL (one per process).

Source code in duty_teller/db/session.py
56
57
58
59
60
61
62
def get_session_factory(database_url: str) -> sessionmaker[Session]:
    """Return cached session factory for the given URL (one per process)."""
    global _SessionLocal
    if _SessionLocal is None:
        engine = get_engine(database_url)
        _SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    return _SessionLocal

session_scope(database_url)

Context manager that yields a session; rolls back on exception, closes on exit.

Parameters:

Name Type Description Default
database_url str

SQLAlchemy database URL.

required

Yields:

Type Description
Session

Session instance. Caller must not use it after exit.

Source code in duty_teller/db/session.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@contextmanager
def session_scope(database_url: str) -> Generator[Session, None, None]:
    """Context manager that yields a session; rolls back on exception, closes on exit.

    Args:
        database_url: SQLAlchemy database URL.

    Yields:
        Session instance. Caller must not use it after exit.
    """
    session = get_session(database_url)
    try:
        yield session
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

duty_teller.db.repository

Repository: get_or_create_user, get_duties, insert_duty, get_current_duty, group_duty_pins.

create_calendar_token(session, user_id)

Create a new calendar subscription token for the user.

Any existing tokens for this user are removed. The raw token is returned only once (not stored in plain text).

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required

Returns:

Type Description
str

Raw token string (e.g. for URL /api/calendar/ical/{token}.ics).

Source code in duty_teller/db/repository.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def create_calendar_token(session: Session, user_id: int) -> str:
    """Create a new calendar subscription token for the user.

    Any existing tokens for this user are removed. The raw token is returned
    only once (not stored in plain text).

    Args:
        session: DB session.
        user_id: User id.

    Returns:
        Raw token string (e.g. for URL /api/calendar/ical/{token}.ics).
    """
    session.query(CalendarSubscriptionToken).filter(
        CalendarSubscriptionToken.user_id == user_id
    ).delete(synchronize_session=False)
    raw_token = secrets.token_urlsafe(32)
    token_hash_val = _token_hash(raw_token)
    now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    record = CalendarSubscriptionToken(
        user_id=user_id,
        token_hash=token_hash_val,
        created_at=now_iso,
    )
    session.add(record)
    session.commit()
    return raw_token

delete_duties_in_range(session, user_id, from_date, to_date)

Delete all duties of the user that overlap the given date range.

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
int

Number of duties deleted.

Source code in duty_teller/db/repository.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def delete_duties_in_range(
    session: Session,
    user_id: int,
    from_date: str,
    to_date: str,
) -> int:
    """Delete all duties of the user that overlap the given date range.

    Args:
        session: DB session.
        user_id: User id.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        Number of duties deleted.
    """
    to_next = (
        datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)
    ).strftime("%Y-%m-%d")
    q = session.query(Duty).filter(
        Duty.user_id == user_id,
        Duty.start_at < to_next,
        Duty.end_at >= from_date,
    )
    count = q.count()
    q.delete(synchronize_session=False)
    session.commit()
    return count

delete_group_duty_pin(session, chat_id)

Remove the pinned duty message record for the chat (e.g. when bot leaves group).

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
Source code in duty_teller/db/repository.py
427
428
429
430
431
432
433
434
435
def delete_group_duty_pin(session: Session, chat_id: int) -> None:
    """Remove the pinned duty message record for the chat (e.g. when bot leaves group).

    Args:
        session: DB session.
        chat_id: Telegram chat id.
    """
    session.query(GroupDutyPin).filter(GroupDutyPin.chat_id == chat_id).delete()
    session.commit()

get_all_group_duty_pin_chat_ids(session)

Return all chat_ids that have a pinned duty message.

Used to restore update jobs on bot startup.

Parameters:

Name Type Description Default
session Session

DB session.

required

Returns:

Type Description
list[int]

List of chat ids.

Source code in duty_teller/db/repository.py
438
439
440
441
442
443
444
445
446
447
448
449
450
def get_all_group_duty_pin_chat_ids(session: Session) -> list[int]:
    """Return all chat_ids that have a pinned duty message.

    Used to restore update jobs on bot startup.

    Args:
        session: DB session.

    Returns:
        List of chat ids.
    """
    rows = session.query(GroupDutyPin.chat_id).all()
    return [r[0] for r in rows]

get_current_duty(session, at_utc)

Return the duty and user active at the given UTC time (event_type='duty').

Parameters:

Name Type Description Default
session Session

DB session.

required
at_utc datetime

Point in time (timezone-aware or naive UTC).

required

Returns:

Type Description
tuple[Duty, User] | None

(Duty, User) or None if no duty at that time.

Source code in duty_teller/db/repository.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def get_current_duty(session: Session, at_utc: datetime) -> tuple[Duty, User] | None:
    """Return the duty and user active at the given UTC time (event_type='duty').

    Args:
        session: DB session.
        at_utc: Point in time (timezone-aware or naive UTC).

    Returns:
        (Duty, User) or None if no duty at that time.
    """
    from datetime import timezone

    if at_utc.tzinfo is not None:
        at_utc = at_utc.astimezone(timezone.utc)
    now_iso = at_utc.strftime("%Y-%m-%dT%H:%M:%S") + "Z"
    row = (
        session.query(Duty, User)
        .join(User, Duty.user_id == User.id)
        .filter(
            Duty.event_type == "duty",
            Duty.start_at <= now_iso,
            Duty.end_at > now_iso,
        )
        .first()
    )
    if row is None:
        return None
    return (row[0], row[1])

get_duties(session, from_date, to_date)

Return duties overlapping the given date range with user full_name.

Parameters:

Name Type Description Default
session Session

DB session.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
list[tuple[Duty, str]]

List of (Duty, full_name) tuples.

Source code in duty_teller/db/repository.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_duties(
    session: Session,
    from_date: str,
    to_date: str,
) -> list[tuple[Duty, str]]:
    """Return duties overlapping the given date range with user full_name.

    Args:
        session: DB session.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        List of (Duty, full_name) tuples.
    """
    to_date_next = (
        datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)
    ).strftime("%Y-%m-%d")
    q = (
        session.query(Duty, User.full_name)
        .join(User, Duty.user_id == User.id)
        .filter(Duty.start_at < to_date_next, Duty.end_at >= from_date)
    )
    return list(q.all())

get_duties_for_user(session, user_id, from_date, to_date)

Return duties for one user overlapping the date range.

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required
from_date str

Start date YYYY-MM-DD.

required
to_date str

End date YYYY-MM-DD.

required

Returns:

Type Description
list[tuple[Duty, str]]

List of (Duty, full_name) tuples.

Source code in duty_teller/db/repository.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def get_duties_for_user(
    session: Session,
    user_id: int,
    from_date: str,
    to_date: str,
) -> list[tuple[Duty, str]]:
    """Return duties for one user overlapping the date range.

    Args:
        session: DB session.
        user_id: User id.
        from_date: Start date YYYY-MM-DD.
        to_date: End date YYYY-MM-DD.

    Returns:
        List of (Duty, full_name) tuples.
    """
    to_date_next = (
        datetime.fromisoformat(to_date + "T00:00:00") + timedelta(days=1)
    ).strftime("%Y-%m-%d")
    q = (
        session.query(Duty, User.full_name)
        .join(User, Duty.user_id == User.id)
        .filter(
            Duty.user_id == user_id,
            Duty.start_at < to_date_next,
            Duty.end_at >= from_date,
        )
    )
    return list(q.all())

get_group_duty_pin(session, chat_id)

Get the pinned duty message record for a chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required

Returns:

Type Description
GroupDutyPin | None

GroupDutyPin or None.

Source code in duty_teller/db/repository.py
390
391
392
393
394
395
396
397
398
399
400
def get_group_duty_pin(session: Session, chat_id: int) -> GroupDutyPin | None:
    """Get the pinned duty message record for a chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.

    Returns:
        GroupDutyPin or None.
    """
    return session.query(GroupDutyPin).filter(GroupDutyPin.chat_id == chat_id).first()

get_next_shift_end(session, after_utc)

Return the end_at of the current or next duty (event_type='duty').

Parameters:

Name Type Description Default
session Session

DB session.

required
after_utc datetime

Point in time (timezone-aware or naive UTC).

required

Returns:

Type Description
datetime | None

End datetime (naive UTC) or None if no current or future duty.

Source code in duty_teller/db/repository.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def get_next_shift_end(session: Session, after_utc: datetime) -> datetime | None:
    """Return the end_at of the current or next duty (event_type='duty').

    Args:
        session: DB session.
        after_utc: Point in time (timezone-aware or naive UTC).

    Returns:
        End datetime (naive UTC) or None if no current or future duty.
    """
    from datetime import timezone

    if after_utc.tzinfo is not None:
        after_utc = after_utc.astimezone(timezone.utc)
    after_iso = after_utc.strftime("%Y-%m-%dT%H:%M:%S") + "Z"
    current = (
        session.query(Duty)
        .filter(
            Duty.event_type == "duty",
            Duty.start_at <= after_iso,
            Duty.end_at > after_iso,
        )
        .first()
    )
    if current:
        return datetime.fromisoformat(current.end_at.replace("Z", "+00:00")).replace(
            tzinfo=None
        )
    next_duty = (
        session.query(Duty)
        .filter(Duty.event_type == "duty", Duty.start_at > after_iso)
        .order_by(Duty.start_at)
        .first()
    )
    if next_duty:
        return datetime.fromisoformat(next_duty.end_at.replace("Z", "+00:00")).replace(
            tzinfo=None
        )
    return None

get_or_create_user(session, telegram_user_id, full_name, username=None, first_name=None, last_name=None)

Get or create user by Telegram user ID.

On create, name fields come from Telegram. On update: username is always synced; full_name, first_name, last_name are updated only if name_manually_edited is False (otherwise existing display name is kept).

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
full_name str

Display full name.

required
username str | None

Telegram username (optional).

None
first_name str | None

Telegram first name (optional).

None
last_name str | None

Telegram last name (optional).

None

Returns:

Type Description
User

User instance (created or updated).

Source code in duty_teller/db/repository.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_or_create_user(
    session: Session,
    telegram_user_id: int,
    full_name: str,
    username: str | None = None,
    first_name: str | None = None,
    last_name: str | None = None,
) -> User:
    """Get or create user by Telegram user ID.

    On create, name fields come from Telegram. On update: username is always
    synced; full_name, first_name, last_name are updated only if
    name_manually_edited is False (otherwise existing display name is kept).

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        full_name: Display full name.
        username: Telegram username (optional).
        first_name: Telegram first name (optional).
        last_name: Telegram last name (optional).

    Returns:
        User instance (created or updated).
    """
    user = get_user_by_telegram_id(session, telegram_user_id)
    if user:
        user.username = username
        if not user.name_manually_edited:
            user.full_name = full_name
            user.first_name = first_name
            user.last_name = last_name
        session.commit()
        session.refresh(user)
        return user
    user = User(
        telegram_user_id=telegram_user_id,
        full_name=full_name,
        username=username,
        first_name=first_name,
        last_name=last_name,
        name_manually_edited=False,
    )
    session.add(user)
    session.commit()
    session.refresh(user)
    return user

get_or_create_user_by_full_name(session, full_name)

Find user by exact full_name or create one (for duty-schedule import).

New users have telegram_user_id=None and name_manually_edited=True.

Parameters:

Name Type Description Default
session Session

DB session.

required
full_name str

Exact full name to match or set.

required

Returns:

Type Description
User

User instance (existing or newly created).

Source code in duty_teller/db/repository.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def get_or_create_user_by_full_name(session: Session, full_name: str) -> User:
    """Find user by exact full_name or create one (for duty-schedule import).

    New users have telegram_user_id=None and name_manually_edited=True.

    Args:
        session: DB session.
        full_name: Exact full name to match or set.

    Returns:
        User instance (existing or newly created).
    """
    user = session.query(User).filter(User.full_name == full_name).first()
    if user:
        return user
    user = User(
        telegram_user_id=None,
        full_name=full_name,
        username=None,
        first_name=None,
        last_name=None,
        name_manually_edited=True,
    )
    session.add(user)
    session.commit()
    session.refresh(user)
    return user

get_user_by_calendar_token(session, token)

Find user by calendar subscription token.

Uses constant-time comparison to avoid timing leaks.

Parameters:

Name Type Description Default
session Session

DB session.

required
token str

Raw token from URL.

required

Returns:

Type Description
User | None

User or None if token is invalid or not found.

Source code in duty_teller/db/repository.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_user_by_calendar_token(session: Session, token: str) -> User | None:
    """Find user by calendar subscription token.

    Uses constant-time comparison to avoid timing leaks.

    Args:
        session: DB session.
        token: Raw token from URL.

    Returns:
        User or None if token is invalid or not found.
    """
    token_hash_val = _token_hash(token)
    row = (
        session.query(CalendarSubscriptionToken, User)
        .join(User, CalendarSubscriptionToken.user_id == User.id)
        .filter(CalendarSubscriptionToken.token_hash == token_hash_val)
        .first()
    )
    if row is None:
        return None
    # Constant-time compare to avoid timing leaks (token_hash is already hashed).
    if not hmac.compare_digest(row[0].token_hash, token_hash_val):
        return None
    return row[1]

get_user_by_telegram_id(session, telegram_user_id)

Find user by Telegram user ID.

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required

Returns:

Type Description
User | None

User or None if not found. Does not create a user.

Source code in duty_teller/db/repository.py
13
14
15
16
17
18
19
20
21
22
23
def get_user_by_telegram_id(session: Session, telegram_user_id: int) -> User | None:
    """Find user by Telegram user ID.

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.

    Returns:
        User or None if not found. Does not create a user.
    """
    return session.query(User).filter(User.telegram_user_id == telegram_user_id).first()

insert_duty(session, user_id, start_at, end_at, event_type='duty')

Create a duty record.

Parameters:

Name Type Description Default
session Session

DB session.

required
user_id int

User id.

required
start_at str

Start time UTC, ISO 8601 with Z (e.g. 2025-01-15T09:00:00Z).

required
end_at str

End time UTC, ISO 8601 with Z.

required
event_type str

One of "duty", "unavailable", "vacation". Default "duty".

'duty'

Returns:

Type Description
Duty

Created Duty instance.

Source code in duty_teller/db/repository.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def insert_duty(
    session: Session,
    user_id: int,
    start_at: str,
    end_at: str,
    event_type: str = "duty",
) -> Duty:
    """Create a duty record.

    Args:
        session: DB session.
        user_id: User id.
        start_at: Start time UTC, ISO 8601 with Z (e.g. 2025-01-15T09:00:00Z).
        end_at: End time UTC, ISO 8601 with Z.
        event_type: One of "duty", "unavailable", "vacation". Default "duty".

    Returns:
        Created Duty instance.
    """
    duty = Duty(
        user_id=user_id,
        start_at=start_at,
        end_at=end_at,
        event_type=event_type,
    )
    session.add(duty)
    session.commit()
    session.refresh(duty)
    return duty

save_group_duty_pin(session, chat_id, message_id)

Save or update the pinned duty message for a chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
message_id int

Message id to pin/update.

required

Returns:

Type Description
GroupDutyPin

GroupDutyPin instance (created or updated).

Source code in duty_teller/db/repository.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def save_group_duty_pin(
    session: Session, chat_id: int, message_id: int
) -> GroupDutyPin:
    """Save or update the pinned duty message for a chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.
        message_id: Message id to pin/update.

    Returns:
        GroupDutyPin instance (created or updated).
    """
    pin = session.query(GroupDutyPin).filter(GroupDutyPin.chat_id == chat_id).first()
    if pin:
        pin.message_id = message_id
    else:
        pin = GroupDutyPin(chat_id=chat_id, message_id=message_id)
        session.add(pin)
    session.commit()
    session.refresh(pin)
    return pin

set_user_phone(session, telegram_user_id, phone)

Set or clear phone for user by Telegram user id.

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
phone str | None

Phone string or None to clear.

required

Returns:

Type Description
User | None

Updated User or None if not found.

Source code in duty_teller/db/repository.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def set_user_phone(
    session: Session, telegram_user_id: int, phone: str | None
) -> User | None:
    """Set or clear phone for user by Telegram user id.

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        phone: Phone string or None to clear.

    Returns:
        Updated User or None if not found.
    """
    user = session.query(User).filter(User.telegram_user_id == telegram_user_id).first()
    if not user:
        return None
    user.phone = phone
    session.commit()
    session.refresh(user)
    return user

update_user_display_name(session, telegram_user_id, full_name, first_name=None, last_name=None)

Update display name and set name_manually_edited=True.

Use from API or admin when name is changed manually; subsequent get_or_create_user will not overwrite these fields.

Parameters:

Name Type Description Default
session Session

DB session.

required
telegram_user_id int

Telegram user id.

required
full_name str

New full name.

required
first_name str | None

New first name (optional).

None
last_name str | None

New last name (optional).

None

Returns:

Type Description
User | None

Updated User or None if not found.

Source code in duty_teller/db/repository.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def update_user_display_name(
    session: Session,
    telegram_user_id: int,
    full_name: str,
    first_name: str | None = None,
    last_name: str | None = None,
) -> User | None:
    """Update display name and set name_manually_edited=True.

    Use from API or admin when name is changed manually; subsequent
    get_or_create_user will not overwrite these fields.

    Args:
        session: DB session.
        telegram_user_id: Telegram user id.
        full_name: New full name.
        first_name: New first name (optional).
        last_name: New last name (optional).

    Returns:
        Updated User or None if not found.
    """
    user = session.query(User).filter(User.telegram_user_id == telegram_user_id).first()
    if not user:
        return None
    user.full_name = full_name
    user.first_name = first_name
    user.last_name = last_name
    user.name_manually_edited = True
    session.commit()
    session.refresh(user)
    return user

Services

duty_teller.services

Service layer: business logic and orchestration.

delete_pin(session, chat_id)

Remove the pinned message record for the chat (e.g. when bot leaves).

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
Source code in duty_teller/services/group_duty_pin_service.py
107
108
109
110
111
112
113
114
def delete_pin(session: Session, chat_id: int) -> None:
    """Remove the pinned message record for the chat (e.g. when bot leaves).

    Args:
        session: DB session.
        chat_id: Telegram chat id.
    """
    delete_group_duty_pin(session, chat_id)

format_duty_message(duty, user, tz_name, lang='en')

Build the text for the pinned duty message.

Parameters:

Name Type Description Default
duty

Duty instance or None.

required
user

User instance or None.

required
tz_name str

Timezone name for display (e.g. Europe/Moscow).

required
lang str

Language code for i18n ('ru' or 'en').

'en'

Returns:

Type Description
str

Formatted message string; "No duty" if duty or user is None.

Source code in duty_teller/services/group_duty_pin_service.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def format_duty_message(duty, user, tz_name: str, lang: str = "en") -> str:
    """Build the text for the pinned duty message.

    Args:
        duty: Duty instance or None.
        user: User instance or None.
        tz_name: Timezone name for display (e.g. Europe/Moscow).
        lang: Language code for i18n ('ru' or 'en').

    Returns:
        Formatted message string; "No duty" if duty or user is None.
    """
    if duty is None or user is None:
        return t(lang, "duty.no_duty")
    try:
        tz = ZoneInfo(tz_name)
    except Exception:
        tz = ZoneInfo("Europe/Moscow")
        tz_name = "Europe/Moscow"
    start_dt = datetime.fromisoformat(duty.start_at.replace("Z", "+00:00"))
    end_dt = datetime.fromisoformat(duty.end_at.replace("Z", "+00:00"))
    start_local = start_dt.astimezone(tz)
    end_local = end_dt.astimezone(tz)
    offset_sec = (
        start_local.utcoffset().total_seconds() if start_local.utcoffset() else 0
    )
    sign = "+" if offset_sec >= 0 else "-"
    h, r = divmod(abs(int(offset_sec)), 3600)
    m = r // 60
    tz_hint = f"UTC{sign}{h:d}:{m:02d}, {tz_name}"
    time_range = (
        f"{start_local.strftime('%d.%m.%Y %H:%M')} — "
        f"{end_local.strftime('%d.%m.%Y %H:%M')} ({tz_hint})"
    )
    label = t(lang, "duty.label")
    lines = [
        f"🕐 {label} {time_range}",
        f"👤 {user.full_name}",
    ]
    if user.phone:
        lines.append(f"📞 {user.phone}")
    if user.username:
        lines.append(f"@{user.username}")
    return "\n".join(lines)

get_all_pin_chat_ids(session)

Return all chat_ids that have a pinned duty message.

Used to restore update jobs on bot startup.

Parameters:

Name Type Description Default
session Session

DB session.

required

Returns:

Type Description
list[int]

List of chat ids.

Source code in duty_teller/services/group_duty_pin_service.py
131
132
133
134
135
136
137
138
139
140
141
142
def get_all_pin_chat_ids(session: Session) -> list[int]:
    """Return all chat_ids that have a pinned duty message.

    Used to restore update jobs on bot startup.

    Args:
        session: DB session.

    Returns:
        List of chat ids.
    """
    return get_all_group_duty_pin_chat_ids(session)

get_duty_message_text(session, tz_name, lang='en')

Get current duty from DB and return formatted message text.

Parameters:

Name Type Description Default
session Session

DB session.

required
tz_name str

Timezone name for display.

required
lang str

Language code for i18n.

'en'

Returns:

Type Description
str

Formatted duty message or "No duty" if none.

Source code in duty_teller/services/group_duty_pin_service.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_duty_message_text(session: Session, tz_name: str, lang: str = "en") -> str:
    """Get current duty from DB and return formatted message text.

    Args:
        session: DB session.
        tz_name: Timezone name for display.
        lang: Language code for i18n.

    Returns:
        Formatted duty message or "No duty" if none.
    """
    now = datetime.now(timezone.utc)
    result = get_current_duty(session, now)
    if result is None:
        return t(lang, "duty.no_duty")
    duty, user = result
    return format_duty_message(duty, user, tz_name, lang)

get_message_id(session, chat_id)

Return message_id for the pinned duty message in this chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required

Returns:

Type Description
int | None

Message id or None if no pin record.

Source code in duty_teller/services/group_duty_pin_service.py
117
118
119
120
121
122
123
124
125
126
127
128
def get_message_id(session: Session, chat_id: int) -> int | None:
    """Return message_id for the pinned duty message in this chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.

    Returns:
        Message id or None if no pin record.
    """
    pin = get_group_duty_pin(session, chat_id)
    return pin.message_id if pin else None

get_next_shift_end_utc(session)

Return next shift end as naive UTC datetime for job scheduling.

Parameters:

Name Type Description Default
session Session

DB session.

required

Returns:

Type Description
datetime | None

Next shift end (naive UTC) or None.

Source code in duty_teller/services/group_duty_pin_service.py
84
85
86
87
88
89
90
91
92
93
def get_next_shift_end_utc(session: Session) -> datetime | None:
    """Return next shift end as naive UTC datetime for job scheduling.

    Args:
        session: DB session.

    Returns:
        Next shift end (naive UTC) or None.
    """
    return get_next_shift_end(session, datetime.now(timezone.utc))

run_import(session, result, hour_utc, minute_utc)

Run duty-schedule import: delete range per user, insert duty/unavailable/vacation.

For each entry: get_or_create_user_by_full_name, delete_duties_in_range for the result date range, then insert duties (handover time in UTC), unavailable (all-day), and vacation (consecutive ranges).

Parameters:

Name Type Description Default
session Session

DB session.

required
result DutyScheduleResult

Parsed duty schedule (start_date, end_date, entries).

required
hour_utc int

Handover hour in UTC (0-23).

required
minute_utc int

Handover minute in UTC (0-59).

required

Returns:

Type Description
tuple[int, int, int, int]

Tuple (num_users, num_duty, num_unavailable, num_vacation).

Source code in duty_teller/services/import_service.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def run_import(
    session: Session,
    result: DutyScheduleResult,
    hour_utc: int,
    minute_utc: int,
) -> tuple[int, int, int, int]:
    """Run duty-schedule import: delete range per user, insert duty/unavailable/vacation.

    For each entry: get_or_create_user_by_full_name, delete_duties_in_range for
    the result date range, then insert duties (handover time in UTC), unavailable
    (all-day), and vacation (consecutive ranges).

    Args:
        session: DB session.
        result: Parsed duty schedule (start_date, end_date, entries).
        hour_utc: Handover hour in UTC (0-23).
        minute_utc: Handover minute in UTC (0-59).

    Returns:
        Tuple (num_users, num_duty, num_unavailable, num_vacation).
    """
    from_date_str = result.start_date.isoformat()
    to_date_str = result.end_date.isoformat()
    num_duty = num_unavailable = num_vacation = 0
    for entry in result.entries:
        user = get_or_create_user_by_full_name(session, entry.full_name)
        delete_duties_in_range(session, user.id, from_date_str, to_date_str)
        for d in entry.duty_dates:
            start_at = duty_to_iso(d, hour_utc, minute_utc)
            d_next = d + timedelta(days=1)
            end_at = duty_to_iso(d_next, hour_utc, minute_utc)
            insert_duty(session, user.id, start_at, end_at, event_type="duty")
            num_duty += 1
        for d in entry.unavailable_dates:
            insert_duty(
                session,
                user.id,
                day_start_iso(d),
                day_end_iso(d),
                event_type="unavailable",
            )
            num_unavailable += 1
        for start_d, end_d in _consecutive_date_ranges(entry.vacation_dates):
            insert_duty(
                session,
                user.id,
                day_start_iso(start_d),
                day_end_iso(end_d),
                event_type="vacation",
            )
            num_vacation += 1
    return (len(result.entries), num_duty, num_unavailable, num_vacation)

save_pin(session, chat_id, message_id)

Save or update the pinned duty message record for a chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
message_id int

Message id to store.

required
Source code in duty_teller/services/group_duty_pin_service.py
 96
 97
 98
 99
100
101
102
103
104
def save_pin(session: Session, chat_id: int, message_id: int) -> None:
    """Save or update the pinned duty message record for a chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.
        message_id: Message id to store.
    """
    save_group_duty_pin(session, chat_id, message_id)

duty_teller.services.import_service

Import duty schedule: delete range, insert duties/unavailable/vacation. Accepts session.

run_import(session, result, hour_utc, minute_utc)

Run duty-schedule import: delete range per user, insert duty/unavailable/vacation.

For each entry: get_or_create_user_by_full_name, delete_duties_in_range for the result date range, then insert duties (handover time in UTC), unavailable (all-day), and vacation (consecutive ranges).

Parameters:

Name Type Description Default
session Session

DB session.

required
result DutyScheduleResult

Parsed duty schedule (start_date, end_date, entries).

required
hour_utc int

Handover hour in UTC (0-23).

required
minute_utc int

Handover minute in UTC (0-59).

required

Returns:

Type Description
tuple[int, int, int, int]

Tuple (num_users, num_duty, num_unavailable, num_vacation).

Source code in duty_teller/services/import_service.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def run_import(
    session: Session,
    result: DutyScheduleResult,
    hour_utc: int,
    minute_utc: int,
) -> tuple[int, int, int, int]:
    """Run duty-schedule import: delete range per user, insert duty/unavailable/vacation.

    For each entry: get_or_create_user_by_full_name, delete_duties_in_range for
    the result date range, then insert duties (handover time in UTC), unavailable
    (all-day), and vacation (consecutive ranges).

    Args:
        session: DB session.
        result: Parsed duty schedule (start_date, end_date, entries).
        hour_utc: Handover hour in UTC (0-23).
        minute_utc: Handover minute in UTC (0-59).

    Returns:
        Tuple (num_users, num_duty, num_unavailable, num_vacation).
    """
    from_date_str = result.start_date.isoformat()
    to_date_str = result.end_date.isoformat()
    num_duty = num_unavailable = num_vacation = 0
    for entry in result.entries:
        user = get_or_create_user_by_full_name(session, entry.full_name)
        delete_duties_in_range(session, user.id, from_date_str, to_date_str)
        for d in entry.duty_dates:
            start_at = duty_to_iso(d, hour_utc, minute_utc)
            d_next = d + timedelta(days=1)
            end_at = duty_to_iso(d_next, hour_utc, minute_utc)
            insert_duty(session, user.id, start_at, end_at, event_type="duty")
            num_duty += 1
        for d in entry.unavailable_dates:
            insert_duty(
                session,
                user.id,
                day_start_iso(d),
                day_end_iso(d),
                event_type="unavailable",
            )
            num_unavailable += 1
        for start_d, end_d in _consecutive_date_ranges(entry.vacation_dates):
            insert_duty(
                session,
                user.id,
                day_start_iso(start_d),
                day_end_iso(end_d),
                event_type="vacation",
            )
            num_vacation += 1
    return (len(result.entries), num_duty, num_unavailable, num_vacation)

duty_teller.services.group_duty_pin_service

Group duty pin: current duty message text, next shift end, pin CRUD. All accept session.

delete_pin(session, chat_id)

Remove the pinned message record for the chat (e.g. when bot leaves).

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
Source code in duty_teller/services/group_duty_pin_service.py
107
108
109
110
111
112
113
114
def delete_pin(session: Session, chat_id: int) -> None:
    """Remove the pinned message record for the chat (e.g. when bot leaves).

    Args:
        session: DB session.
        chat_id: Telegram chat id.
    """
    delete_group_duty_pin(session, chat_id)

format_duty_message(duty, user, tz_name, lang='en')

Build the text for the pinned duty message.

Parameters:

Name Type Description Default
duty

Duty instance or None.

required
user

User instance or None.

required
tz_name str

Timezone name for display (e.g. Europe/Moscow).

required
lang str

Language code for i18n ('ru' or 'en').

'en'

Returns:

Type Description
str

Formatted message string; "No duty" if duty or user is None.

Source code in duty_teller/services/group_duty_pin_service.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def format_duty_message(duty, user, tz_name: str, lang: str = "en") -> str:
    """Build the text for the pinned duty message.

    Args:
        duty: Duty instance or None.
        user: User instance or None.
        tz_name: Timezone name for display (e.g. Europe/Moscow).
        lang: Language code for i18n ('ru' or 'en').

    Returns:
        Formatted message string; "No duty" if duty or user is None.
    """
    if duty is None or user is None:
        return t(lang, "duty.no_duty")
    try:
        tz = ZoneInfo(tz_name)
    except Exception:
        tz = ZoneInfo("Europe/Moscow")
        tz_name = "Europe/Moscow"
    start_dt = datetime.fromisoformat(duty.start_at.replace("Z", "+00:00"))
    end_dt = datetime.fromisoformat(duty.end_at.replace("Z", "+00:00"))
    start_local = start_dt.astimezone(tz)
    end_local = end_dt.astimezone(tz)
    offset_sec = (
        start_local.utcoffset().total_seconds() if start_local.utcoffset() else 0
    )
    sign = "+" if offset_sec >= 0 else "-"
    h, r = divmod(abs(int(offset_sec)), 3600)
    m = r // 60
    tz_hint = f"UTC{sign}{h:d}:{m:02d}, {tz_name}"
    time_range = (
        f"{start_local.strftime('%d.%m.%Y %H:%M')} — "
        f"{end_local.strftime('%d.%m.%Y %H:%M')} ({tz_hint})"
    )
    label = t(lang, "duty.label")
    lines = [
        f"🕐 {label} {time_range}",
        f"👤 {user.full_name}",
    ]
    if user.phone:
        lines.append(f"📞 {user.phone}")
    if user.username:
        lines.append(f"@{user.username}")
    return "\n".join(lines)

get_all_pin_chat_ids(session)

Return all chat_ids that have a pinned duty message.

Used to restore update jobs on bot startup.

Parameters:

Name Type Description Default
session Session

DB session.

required

Returns:

Type Description
list[int]

List of chat ids.

Source code in duty_teller/services/group_duty_pin_service.py
131
132
133
134
135
136
137
138
139
140
141
142
def get_all_pin_chat_ids(session: Session) -> list[int]:
    """Return all chat_ids that have a pinned duty message.

    Used to restore update jobs on bot startup.

    Args:
        session: DB session.

    Returns:
        List of chat ids.
    """
    return get_all_group_duty_pin_chat_ids(session)

get_duty_message_text(session, tz_name, lang='en')

Get current duty from DB and return formatted message text.

Parameters:

Name Type Description Default
session Session

DB session.

required
tz_name str

Timezone name for display.

required
lang str

Language code for i18n.

'en'

Returns:

Type Description
str

Formatted duty message or "No duty" if none.

Source code in duty_teller/services/group_duty_pin_service.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_duty_message_text(session: Session, tz_name: str, lang: str = "en") -> str:
    """Get current duty from DB and return formatted message text.

    Args:
        session: DB session.
        tz_name: Timezone name for display.
        lang: Language code for i18n.

    Returns:
        Formatted duty message or "No duty" if none.
    """
    now = datetime.now(timezone.utc)
    result = get_current_duty(session, now)
    if result is None:
        return t(lang, "duty.no_duty")
    duty, user = result
    return format_duty_message(duty, user, tz_name, lang)

get_message_id(session, chat_id)

Return message_id for the pinned duty message in this chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required

Returns:

Type Description
int | None

Message id or None if no pin record.

Source code in duty_teller/services/group_duty_pin_service.py
117
118
119
120
121
122
123
124
125
126
127
128
def get_message_id(session: Session, chat_id: int) -> int | None:
    """Return message_id for the pinned duty message in this chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.

    Returns:
        Message id or None if no pin record.
    """
    pin = get_group_duty_pin(session, chat_id)
    return pin.message_id if pin else None

get_next_shift_end_utc(session)

Return next shift end as naive UTC datetime for job scheduling.

Parameters:

Name Type Description Default
session Session

DB session.

required

Returns:

Type Description
datetime | None

Next shift end (naive UTC) or None.

Source code in duty_teller/services/group_duty_pin_service.py
84
85
86
87
88
89
90
91
92
93
def get_next_shift_end_utc(session: Session) -> datetime | None:
    """Return next shift end as naive UTC datetime for job scheduling.

    Args:
        session: DB session.

    Returns:
        Next shift end (naive UTC) or None.
    """
    return get_next_shift_end(session, datetime.now(timezone.utc))

save_pin(session, chat_id, message_id)

Save or update the pinned duty message record for a chat.

Parameters:

Name Type Description Default
session Session

DB session.

required
chat_id int

Telegram chat id.

required
message_id int

Message id to store.

required
Source code in duty_teller/services/group_duty_pin_service.py
 96
 97
 98
 99
100
101
102
103
104
def save_pin(session: Session, chat_id: int, message_id: int) -> None:
    """Save or update the pinned duty message record for a chat.

    Args:
        session: DB session.
        chat_id: Telegram chat id.
        message_id: Message id to store.
    """
    save_group_duty_pin(session, chat_id, message_id)

Handlers

duty_teller.handlers

Expose a single register_handlers(app) that registers all handlers.

register_handlers(app)

Register all Telegram handlers (commands, import, group pin, error handler) on the application.

Parameters:

Name Type Description Default
app Application

python-telegram-bot Application instance.

required
Source code in duty_teller/handlers/__init__.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def register_handlers(app: Application) -> None:
    """Register all Telegram handlers (commands, import, group pin, error handler) on the application.

    Args:
        app: python-telegram-bot Application instance.
    """
    app.add_handler(commands.start_handler)
    app.add_handler(commands.help_handler)
    app.add_handler(commands.set_phone_handler)
    app.add_handler(commands.calendar_link_handler)
    app.add_handler(import_duty_schedule.import_duty_schedule_handler)
    app.add_handler(import_duty_schedule.handover_time_handler)
    app.add_handler(import_duty_schedule.duty_schedule_document_handler)
    app.add_handler(group_duty_pin.group_duty_pin_handler)
    app.add_handler(group_duty_pin.pin_duty_handler)
    app.add_error_handler(errors.error_handler)

duty_teller.handlers.commands

Command handlers: /start, /help; /start registers user.

Handle /calendar_link: send personal ICS URL (private chat only; user must be in allowlist).

Source code in duty_teller/handlers/commands.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def calendar_link(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /calendar_link: send personal ICS URL (private chat only; user must be in allowlist)."""
    if not update.message or not update.effective_user:
        return
    lang = get_lang(update.effective_user)
    if update.effective_chat and update.effective_chat.type != "private":
        await update.message.reply_text(t(lang, "calendar_link.private_only"))
        return
    telegram_user_id = update.effective_user.id
    username = (update.effective_user.username or "").strip()
    full_name = build_full_name(
        update.effective_user.first_name, update.effective_user.last_name
    )

    def do_calendar_link() -> tuple[str | None, str | None]:
        with session_scope(config.DATABASE_URL) as session:
            user = get_or_create_user(
                session,
                telegram_user_id=telegram_user_id,
                full_name=full_name,
                username=update.effective_user.username,
                first_name=update.effective_user.first_name,
                last_name=update.effective_user.last_name,
            )
            if not config.can_access_miniapp(
                username
            ) and not config.can_access_miniapp_by_phone(user.phone):
                return (None, "denied")
            token = create_calendar_token(session, user.id)
            base = (config.MINI_APP_BASE_URL or "").rstrip("/")
            url = f"{base}/api/calendar/ical/{token}.ics" if base else None
            return (url, None)

    result_url, error = await asyncio.get_running_loop().run_in_executor(
        None, do_calendar_link
    )
    if error == "denied":
        await update.message.reply_text(t(lang, "calendar_link.access_denied"))
        return
    if not result_url:
        await update.message.reply_text(t(lang, "calendar_link.error"))
        return
    await update.message.reply_text(
        t(lang, "calendar_link.success", url=result_url)
        + "\n\n"
        + t(lang, "calendar_link.help_hint")
    )

help_cmd(update, context) async

Handle /help: send list of commands (admins see import_duty_schedule).

Source code in duty_teller/handlers/commands.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /help: send list of commands (admins see import_duty_schedule)."""
    if not update.message or not update.effective_user:
        return
    lang = get_lang(update.effective_user)
    lines = [
        t(lang, "help.title"),
        t(lang, "help.start"),
        t(lang, "help.help"),
        t(lang, "help.set_phone"),
        t(lang, "help.calendar_link"),
        t(lang, "help.pin_duty"),
    ]
    if config.is_admin(update.effective_user.username or ""):
        lines.append(t(lang, "help.import_schedule"))
    await update.message.reply_text("\n".join(lines))

set_phone(update, context) async

Handle /set_phone [number]: set or clear phone (private chat only).

Source code in duty_teller/handlers/commands.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def set_phone(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /set_phone [number]: set or clear phone (private chat only)."""
    if not update.message or not update.effective_user:
        return
    lang = get_lang(update.effective_user)
    if update.effective_chat and update.effective_chat.type != "private":
        await update.message.reply_text(t(lang, "set_phone.private_only"))
        return
    args = context.args or []
    phone = " ".join(args).strip() if args else None
    telegram_user_id = update.effective_user.id

    def do_set_phone() -> str | None:
        with session_scope(config.DATABASE_URL) as session:
            full_name = build_full_name(
                update.effective_user.first_name, update.effective_user.last_name
            )
            get_or_create_user(
                session,
                telegram_user_id=telegram_user_id,
                full_name=full_name,
                username=update.effective_user.username,
                first_name=update.effective_user.first_name,
                last_name=update.effective_user.last_name,
            )
            user = set_user_phone(session, telegram_user_id, phone or None)
            if user is None:
                return "error"
            if phone:
                return "saved"
            return "cleared"

    result = await asyncio.get_running_loop().run_in_executor(None, do_set_phone)
    if result == "error":
        await update.message.reply_text(t(lang, "set_phone.error"))
    elif result == "saved":
        await update.message.reply_text(t(lang, "set_phone.saved", phone=phone or ""))
    else:
        await update.message.reply_text(t(lang, "set_phone.cleared"))

start(update, context) async

Handle /start: register user in DB and send greeting.

Source code in duty_teller/handlers/commands.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /start: register user in DB and send greeting."""
    if not update.message:
        return
    user = update.effective_user
    if not user:
        return
    full_name = build_full_name(user.first_name, user.last_name)
    telegram_user_id = user.id
    username = user.username
    first_name = user.first_name
    last_name = user.last_name

    def do_get_or_create() -> None:
        with session_scope(config.DATABASE_URL) as session:
            get_or_create_user(
                session,
                telegram_user_id=telegram_user_id,
                full_name=full_name,
                username=username,
                first_name=first_name,
                last_name=last_name,
            )

    await asyncio.get_running_loop().run_in_executor(None, do_get_or_create)

    lang = get_lang(user)
    text = t(lang, "start.greeting")
    await update.message.reply_text(text)

duty_teller.handlers.import_duty_schedule

Import duty-schedule: /import_duty_schedule (admin only). Two steps: handover time -> JSON file.

handle_duty_schedule_document(update, context) async

Handle uploaded JSON file: parse duty-schedule and run import.

Source code in duty_teller/handlers/import_duty_schedule.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
async def handle_duty_schedule_document(
    update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
    """Handle uploaded JSON file: parse duty-schedule and run import."""
    if not update.message or not update.message.document or not update.effective_user:
        return
    if not context.user_data.get("awaiting_duty_schedule_file"):
        return
    lang = get_lang(update.effective_user)
    handover = context.user_data.get("handover_utc_time")
    if not handover or not config.is_admin(update.effective_user.username or ""):
        return
    if not (update.message.document.file_name or "").lower().endswith(".json"):
        await update.message.reply_text(t(lang, "import.need_json"))
        return

    hour_utc, minute_utc = handover
    file_id = update.message.document.file_id

    file = await context.bot.get_file(file_id)
    raw = bytes(await file.download_as_bytearray())
    try:
        result = parse_duty_schedule(raw)
    except DutyScheduleParseError as e:
        context.user_data.pop("awaiting_duty_schedule_file", None)
        context.user_data.pop("handover_utc_time", None)
        await update.message.reply_text(t(lang, "import.parse_error", error=str(e)))
        return

    def run_import_with_scope():
        with session_scope(config.DATABASE_URL) as session:
            return run_import(session, result, hour_utc, minute_utc)

    loop = asyncio.get_running_loop()
    try:
        num_users, num_duty, num_unavailable, num_vacation = await loop.run_in_executor(
            None, run_import_with_scope
        )
    except Exception as e:
        await update.message.reply_text(t(lang, "import.import_error", error=str(e)))
    else:
        total = num_duty + num_unavailable + num_vacation
        unavailable_suffix = (
            t(lang, "import.done_unavailable", count=str(num_unavailable))
            if num_unavailable
            else ""
        )
        vacation_suffix = (
            t(lang, "import.done_vacation", count=str(num_vacation))
            if num_vacation
            else ""
        )
        await update.message.reply_text(
            t(
                lang,
                "import.done",
                users=str(num_users),
                duties=str(num_duty),
                unavailable=unavailable_suffix,
                vacation=vacation_suffix,
                total=str(total),
            )
        )
    finally:
        context.user_data.pop("awaiting_duty_schedule_file", None)
        context.user_data.pop("handover_utc_time", None)

handle_handover_time_text(update, context) async

Handle text message when awaiting handover time (e.g. 09:00 Europe/Moscow).

Source code in duty_teller/handlers/import_duty_schedule.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
async def handle_handover_time_text(
    update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
    """Handle text message when awaiting handover time (e.g. 09:00 Europe/Moscow)."""
    if not update.message or not update.effective_user or not update.message.text:
        return
    if not context.user_data.get("awaiting_handover_time"):
        return
    if not config.is_admin(update.effective_user.username or ""):
        return
    lang = get_lang(update.effective_user)
    text = update.message.text.strip()
    parsed = parse_handover_time(text)
    if parsed is None:
        await update.message.reply_text(t(lang, "import.parse_time_error"))
        return
    hour_utc, minute_utc = parsed
    context.user_data["handover_utc_time"] = (hour_utc, minute_utc)
    context.user_data["awaiting_handover_time"] = False
    context.user_data["awaiting_duty_schedule_file"] = True
    await update.message.reply_text(t(lang, "import.send_json"))

import_duty_schedule_cmd(update, context) async

Handle /import_duty_schedule: start two-step import (admin only); asks for handover time.

Source code in duty_teller/handlers/import_duty_schedule.py
19
20
21
22
23
24
25
26
27
28
29
30
async def import_duty_schedule_cmd(
    update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
    """Handle /import_duty_schedule: start two-step import (admin only); asks for handover time."""
    if not update.message or not update.effective_user:
        return
    lang = get_lang(update.effective_user)
    if not config.is_admin(update.effective_user.username or ""):
        await update.message.reply_text(t(lang, "import.admin_only"))
        return
    context.user_data["awaiting_handover_time"] = True
    await update.message.reply_text(t(lang, "import.handover_format"))

duty_teller.handlers.group_duty_pin

Pinned duty message in groups: handle bot add/remove, schedule updates at shift end.

my_chat_member_handler(update, context) async

Handle bot added to or removed from group: send/pin duty message or delete pin record.

Source code in duty_teller/handlers/group_duty_pin.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def my_chat_member_handler(
    update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
    """Handle bot added to or removed from group: send/pin duty message or delete pin record."""
    if not update.my_chat_member or not update.effective_user:
        return
    old = update.my_chat_member.old_chat_member
    new = update.my_chat_member.new_chat_member
    chat = update.effective_chat
    if not chat or chat.type not in ("group", "supergroup"):
        return
    if new.user.id != context.bot.id:
        return
    chat_id = chat.id

    if new.status in (
        ChatMemberStatus.MEMBER,
        ChatMemberStatus.ADMINISTRATOR,
    ) and old.status in (
        ChatMemberStatus.LEFT,
        ChatMemberStatus.BANNED,
    ):
        loop = asyncio.get_running_loop()
        lang = get_lang(update.effective_user)
        text = await loop.run_in_executor(
            None, lambda: _get_duty_message_text_sync(lang)
        )
        try:
            msg = await context.bot.send_message(chat_id=chat_id, text=text)
        except (BadRequest, Forbidden) as e:
            logger.warning("Failed to send duty message in chat_id=%s: %s", chat_id, e)
            return
        pinned = False
        try:
            await context.bot.pin_chat_message(
                chat_id=chat_id,
                message_id=msg.message_id,
                disable_notification=True,
            )
            pinned = True
        except (BadRequest, Forbidden) as e:
            logger.warning("Failed to pin message in chat_id=%s: %s", chat_id, e)
        await loop.run_in_executor(None, _sync_save_pin, chat_id, msg.message_id)
        if not pinned:
            try:
                await context.bot.send_message(
                    chat_id=chat_id,
                    text=t(lang, "pin_duty.could_not_pin_make_admin"),
                )
            except (BadRequest, Forbidden):
                pass
        next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
        await _schedule_next_update(context.application, chat_id, next_end)
        return

    if new.status in (ChatMemberStatus.LEFT, ChatMemberStatus.BANNED):
        await asyncio.get_running_loop().run_in_executor(
            None, _sync_delete_pin, chat_id
        )
        name = f"{JOB_NAME_PREFIX}{chat_id}"
        if context.application.job_queue:
            for job in context.application.job_queue.get_jobs_by_name(name):
                job.schedule_removal()
        logger.info("Bot left chat_id=%s, removed pin record and jobs", chat_id)

pin_duty_cmd(update, context) async

Handle /pin_duty: pin the current duty message in the group (reply to bot's message).

Source code in duty_teller/handlers/group_duty_pin.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
async def pin_duty_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Handle /pin_duty: pin the current duty message in the group (reply to bot's message)."""
    if not update.message or not update.effective_chat or not update.effective_user:
        return
    chat = update.effective_chat
    lang = get_lang(update.effective_user)
    if chat.type not in ("group", "supergroup"):
        await update.message.reply_text(t(lang, "pin_duty.group_only"))
        return
    chat_id = chat.id
    loop = asyncio.get_running_loop()
    message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
    if message_id is None:
        await update.message.reply_text(t(lang, "pin_duty.no_message"))
        return
    try:
        await context.bot.pin_chat_message(
            chat_id=chat_id,
            message_id=message_id,
            disable_notification=True,
        )
        await update.message.reply_text(t(lang, "pin_duty.pinned"))
    except (BadRequest, Forbidden) as e:
        logger.warning("pin_duty failed chat_id=%s: %s", chat_id, e)
        await update.message.reply_text(t(lang, "pin_duty.failed"))

restore_group_pin_jobs(application) async

Restore scheduled pin-update jobs for all chats that have a pinned message (on startup).

Source code in duty_teller/handlers/group_duty_pin.py
189
190
191
192
193
194
195
196
async def restore_group_pin_jobs(application) -> None:
    """Restore scheduled pin-update jobs for all chats that have a pinned message (on startup)."""
    loop = asyncio.get_running_loop()
    chat_ids = await loop.run_in_executor(None, _get_all_pin_chat_ids_sync)
    for chat_id in chat_ids:
        next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
        await _schedule_next_update(application, chat_id, next_end)
    logger.info("Restored %s group pin jobs", len(chat_ids))

update_group_pin(context) async

Job callback: refresh pinned duty message and schedule next update at shift end.

Source code in duty_teller/handlers/group_duty_pin.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def update_group_pin(context: ContextTypes.DEFAULT_TYPE) -> None:
    """Job callback: refresh pinned duty message and schedule next update at shift end."""
    chat_id = context.job.data.get("chat_id")
    if chat_id is None:
        return
    loop = asyncio.get_running_loop()
    message_id = await loop.run_in_executor(None, _sync_get_message_id, chat_id)
    if message_id is None:
        logger.info("No pin record for chat_id=%s, skipping update", chat_id)
        return
    text = await loop.run_in_executor(
        None, lambda: _get_duty_message_text_sync(config.DEFAULT_LANGUAGE)
    )
    try:
        await context.bot.edit_message_text(
            chat_id=chat_id,
            message_id=message_id,
            text=text,
        )
    except (BadRequest, Forbidden) as e:
        logger.warning("Failed to edit pinned message chat_id=%s: %s", chat_id, e)
    next_end = await loop.run_in_executor(None, _get_next_shift_end_sync)
    await _schedule_next_update(context.application, chat_id, next_end)

duty_teller.handlers.errors

Global error handler: log exception and notify user.

error_handler(update, context) async

Global error handler: log exception and reply with generic message if possible.

Parameters:

Name Type Description Default
update Update | None

Update that caused the error (may be None).

required
context DEFAULT_TYPE

Callback context.

required
Source code in duty_teller/handlers/errors.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def error_handler(
    update: Update | None, context: ContextTypes.DEFAULT_TYPE
) -> None:
    """Global error handler: log exception and reply with generic message if possible.

    Args:
        update: Update that caused the error (may be None).
        context: Callback context.
    """
    logger.exception("Exception while handling an update")
    if isinstance(update, Update) and update.effective_message:
        user = getattr(update, "effective_user", None)
        lang = get_lang(user) if user else config.DEFAULT_LANGUAGE
        await update.effective_message.reply_text(t(lang, "errors.generic"))

Importers

duty_teller.importers

Importers for duty data (e.g. duty-schedule JSON).

duty_teller.importers.duty_schedule

Parser for duty-schedule JSON format. No DB access.

DutyScheduleEntry dataclass

One person's schedule: full_name and three lists of dates by event type.

Source code in duty_teller/importers/duty_schedule.py
13
14
15
16
17
18
19
20
@dataclass
class DutyScheduleEntry:
    """One person's schedule: full_name and three lists of dates by event type."""

    full_name: str
    duty_dates: list[date]
    unavailable_dates: list[date]
    vacation_dates: list[date]

DutyScheduleParseError

Bases: Exception

Invalid or missing fields in duty-schedule JSON.

Source code in duty_teller/importers/duty_schedule.py
32
33
34
35
class DutyScheduleParseError(Exception):
    """Invalid or missing fields in duty-schedule JSON."""

    pass

DutyScheduleResult dataclass

Parsed duty schedule: start_date, end_date, and per-person entries.

Source code in duty_teller/importers/duty_schedule.py
23
24
25
26
27
28
29
@dataclass
class DutyScheduleResult:
    """Parsed duty schedule: start_date, end_date, and per-person entries."""

    start_date: date
    end_date: date
    entries: list[DutyScheduleEntry]

parse_duty_schedule(raw_bytes)

Parse duty-schedule JSON into DutyScheduleResult.

Expects meta.start_date (YYYY-MM-DD) and schedule (array). For each schedule item: name (required), duty string with ';' separator; index i = start_date + i days. Cell values: в/В/б/Б => duty, Н => unavailable, О => vacation; rest ignored.

Parameters:

Name Type Description Default
raw_bytes bytes

UTF-8 encoded JSON bytes.

required

Returns:

Type Description
DutyScheduleResult

DutyScheduleResult with start_date, end_date, and entries (per-person dates).

Raises:

Type Description
DutyScheduleParseError

On invalid JSON, missing/invalid meta or schedule, or invalid item fields.

Source code in duty_teller/importers/duty_schedule.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def parse_duty_schedule(raw_bytes: bytes) -> DutyScheduleResult:
    """Parse duty-schedule JSON into DutyScheduleResult.

    Expects meta.start_date (YYYY-MM-DD) and schedule (array). For each schedule
    item: name (required), duty string with ';' separator; index i = start_date + i days.
    Cell values: в/В/б/Б => duty, Н => unavailable, О => vacation; rest ignored.

    Args:
        raw_bytes: UTF-8 encoded JSON bytes.

    Returns:
        DutyScheduleResult with start_date, end_date, and entries (per-person dates).

    Raises:
        DutyScheduleParseError: On invalid JSON, missing/invalid meta or schedule,
            or invalid item fields.
    """
    try:
        data = json.loads(raw_bytes.decode("utf-8"))
    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        raise DutyScheduleParseError(f"Invalid JSON or encoding: {e}") from e

    meta = data.get("meta")
    if not meta or not isinstance(meta, dict):
        raise DutyScheduleParseError("Missing or invalid 'meta'")

    start_str = meta.get("start_date")
    if not start_str or not isinstance(start_str, str):
        raise DutyScheduleParseError("Missing or invalid meta.start_date")
    try:
        start_date = date.fromisoformat(start_str.strip())
    except ValueError as e:
        raise DutyScheduleParseError(f"Invalid meta.start_date: {start_str}") from e

    schedule = data.get("schedule")
    if not isinstance(schedule, list):
        raise DutyScheduleParseError("Missing or invalid 'schedule' (must be array)")

    max_days = 0
    entries: list[DutyScheduleEntry] = []

    for row in schedule:
        if not isinstance(row, dict):
            raise DutyScheduleParseError("schedule item must be an object")
        name = row.get("name")
        if name is None or not isinstance(name, str):
            raise DutyScheduleParseError("schedule item must have 'name' (string)")
        full_name = name.strip()
        if not full_name:
            raise DutyScheduleParseError("schedule item 'name' cannot be empty")

        duty_str = row.get("duty")
        if duty_str is None:
            duty_str = ""
        if not isinstance(duty_str, str):
            raise DutyScheduleParseError("schedule item 'duty' must be string")

        cells = [c.strip() for c in duty_str.split(";")]
        max_days = max(max_days, len(cells))

        duty_dates: list[date] = []
        unavailable_dates: list[date] = []
        vacation_dates: list[date] = []
        for i, cell in enumerate(cells):
            d = start_date + timedelta(days=i)
            if cell in DUTY_MARKERS:
                duty_dates.append(d)
            elif cell == UNAVAILABLE_MARKER:
                unavailable_dates.append(d)
            elif cell == VACATION_MARKER:
                vacation_dates.append(d)
        entries.append(
            DutyScheduleEntry(
                full_name=full_name,
                duty_dates=duty_dates,
                unavailable_dates=unavailable_dates,
                vacation_dates=vacation_dates,
            )
        )

    if max_days == 0:
        end_date = start_date
    else:
        end_date = start_date + timedelta(days=max_days - 1)

    return DutyScheduleResult(start_date=start_date, end_date=end_date, entries=entries)