Commit 8ba714bd authored by uuo00_n's avatar uuo00_n

feat(核心配置): 迁移BaseSettings至pydantic-settings并新增APP_MODE配置

refactor(角色权限): 新增角色等级映射与版别校验模块

feat(用户认证): 在JWT令牌中增加角色等级与版别信息

feat(仪表盘): 实现基于角色等级与版别的动态视图返回

docs(模型注释): 完善用户模型字段说明并兼容Pydantic v2

本次提交主要包含以下改进:
1. 将BaseSettings从pydantic迁移至pydantic-settings包
2. 新增APP_MODE配置项支持教育版/企业版隔离
3. 创建角色权限中心化定义模块
4. 增强JWT令牌携带用户权限信息
5. 实现仪表盘接口的动态内容返回
6. 优化用户模型字段注释和类型提示
parent 0bbc8096
#!/Users/uu/Desktop/dles_prj/llm-filter/.venv/bin/python3
# -*- coding: utf-8 -*-
import re
import sys
from email_validator.__main__ import main
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(main())
This is free and unencumbered software released into the public
domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a
compiled binary, for any purpose, commercial or non-commercial,
and by any means.
In jurisdictions that recognize copyright laws, the author or
authors of this software dedicate any and all copyright
interest in the software to the public domain. We make this
dedication for the benefit of the public at large and to the
detriment of our heirs and successors. We intend this
dedication to be an overt act of relinquishment in perpetuity
of all present and future rights to this software under
copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
For more information, please refer to <https://unlicense.org/>
../../../bin/email_validator,sha256=hFLktj2OUakfhW21WXjWtePBUupYbxMWCwwa_AS0-OA,262
email_validator-2.2.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
email_validator-2.2.0.dist-info/LICENSE,sha256=ZyF5dS4QkTSj-yvdB4Cyn9t6A5dPD1hqE66tUSlWLUw,1212
email_validator-2.2.0.dist-info/METADATA,sha256=vELkkg-p-qMuqNFX6uzDmMaruT7Pe5PDAQexHLAB4XM,25741
email_validator-2.2.0.dist-info/RECORD,,
email_validator-2.2.0.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
email_validator-2.2.0.dist-info/WHEEL,sha256=cpQTJ5IWu9CdaPViMhC9YzF8gZuS5-vlfoFihTBC86A,91
email_validator-2.2.0.dist-info/entry_points.txt,sha256=zRM_6bNIUSHTbNx5u6M3nK1MAguvryrc9hICC6HyrBg,66
email_validator-2.2.0.dist-info/top_level.txt,sha256=fYDOSWFZke46ut7WqdOAJjjhlpPYAaOwOwIsh3s8oWI,16
email_validator/__init__.py,sha256=g-TFM6vzpEt4dMG93giGlS343yXXXIy7EOLNFEn6DfA,4360
email_validator/__main__.py,sha256=TIvjaG_OSFRciH0J2pnEJEdX3uJy3ZgocmasEqh9EEI,2243
email_validator/__pycache__/__init__.cpython-39.pyc,,
email_validator/__pycache__/__main__.cpython-39.pyc,,
email_validator/__pycache__/deliverability.cpython-39.pyc,,
email_validator/__pycache__/exceptions_types.cpython-39.pyc,,
email_validator/__pycache__/rfc_constants.cpython-39.pyc,,
email_validator/__pycache__/syntax.cpython-39.pyc,,
email_validator/__pycache__/validate_email.cpython-39.pyc,,
email_validator/__pycache__/version.cpython-39.pyc,,
email_validator/deliverability.py,sha256=e6eODNSaLMiM29EZ3bWYDFkQDlMIdicBaykjYQJwYig,7222
email_validator/exceptions_types.py,sha256=yLxXqwtl5dXa-938K7skLP1pMFgi0oovzCs74mX7TGs,6024
email_validator/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
email_validator/rfc_constants.py,sha256=KVUshwIu699cle3UzDU2_fFBSQOO7p91Z_hrlNANtGM,2767
email_validator/syntax.py,sha256=Mo5KLgEsbQcvNzs8zO5QbhzUK4MAjL9yJFDpwsF12lY,36005
email_validator/validate_email.py,sha256=YUXY5Sv_mQ7Vuu_AmGdISza8v-VaABnNMLrlWv8EIl4,8401
email_validator/version.py,sha256=DKk-1b-rZsJFxFi1JoJ7TmEvIEQ0rf-C9HAZWwvjuM0,22
Wheel-Version: 1.0
Generator: setuptools (70.1.0)
Root-Is-Purelib: true
Tag: py3-none-any
[console_scripts]
email_validator = email_validator.__main__:main
from typing import TYPE_CHECKING
# Export the main method, helper methods, and the public data types.
from .exceptions_types import ValidatedEmail, EmailNotValidError, \
EmailSyntaxError, EmailUndeliverableError
from .validate_email import validate_email
from .version import __version__
__all__ = ["validate_email",
"ValidatedEmail", "EmailNotValidError",
"EmailSyntaxError", "EmailUndeliverableError",
"caching_resolver", "__version__"]
if TYPE_CHECKING:
from .deliverability import caching_resolver
else:
def caching_resolver(*args, **kwargs):
# Lazy load `deliverability` as it is slow to import (due to dns.resolver)
from .deliverability import caching_resolver
return caching_resolver(*args, **kwargs)
# These global attributes are a part of the library's API and can be
# changed by library users.
# Default values for keyword arguments.
ALLOW_SMTPUTF8 = True
ALLOW_QUOTED_LOCAL = False
ALLOW_DOMAIN_LITERAL = False
ALLOW_DISPLAY_NAME = False
GLOBALLY_DELIVERABLE = True
CHECK_DELIVERABILITY = True
TEST_ENVIRONMENT = False
DEFAULT_TIMEOUT = 15 # secs
# IANA Special Use Domain Names
# Last Updated 2021-09-21
# https://www.iana.org/assignments/special-use-domain-names/special-use-domain-names.txt
#
# The domain names without dots would be caught by the check that the domain
# name in an email address must have a period, but this list will also catch
# subdomains of these domains, which are also reserved.
SPECIAL_USE_DOMAIN_NAMES = [
# The "arpa" entry here is consolidated from a lot of arpa subdomains
# for private address (i.e. non-routable IP addresses like 172.16.x.x)
# reverse mapping, plus some other subdomains. Although RFC 6761 says
# that application software should not treat these domains as special,
# they are private-use domains and so cannot have globally deliverable
# email addresses, which is an assumption of this library, and probably
# all of arpa is similarly special-use, so we reject it all.
"arpa",
# RFC 6761 says applications "SHOULD NOT" treat the "example" domains
# as special, i.e. applications should accept these domains.
#
# The domain "example" alone fails our syntax validation because it
# lacks a dot (we assume no one has an email address on a TLD directly).
# "@example.com/net/org" will currently fail DNS-based deliverability
# checks because IANA publishes a NULL MX for these domains, and
# "@mail.example[.com/net/org]" and other subdomains will fail DNS-
# based deliverability checks because IANA does not publish MX or A
# DNS records for these subdomains.
# "example", # i.e. "wwww.example"
# "example.com",
# "example.net",
# "example.org",
# RFC 6761 says that applications are permitted to treat this domain
# as special and that DNS should return an immediate negative response,
# so we also immediately reject this domain, which also follows the
# purpose of the domain.
"invalid",
# RFC 6762 says that applications "may" treat ".local" as special and
# that "name resolution APIs and libraries SHOULD recognize these names
# as special," and since ".local" has no global definition, we reject
# it, as we expect email addresses to be gloally routable.
"local",
# RFC 6761 says that applications (like this library) are permitted
# to treat "localhost" as special, and since it cannot have a globally
# deliverable email address, we reject it.
"localhost",
# RFC 7686 says "applications that do not implement the Tor protocol
# SHOULD generate an error upon the use of .onion and SHOULD NOT
# perform a DNS lookup.
"onion",
# Although RFC 6761 says that application software should not treat
# these domains as special, it also warns users that the address may
# resolve differently in different systems, and therefore it cannot
# have a globally routable email address, which is an assumption of
# this library, so we reject "@test" and "@*.test" addresses, unless
# the test_environment keyword argument is given, to allow their use
# in application-level test environments. These domains will generally
# fail deliverability checks because "test" is not an actual TLD.
"test",
]
# A command-line tool for testing.
#
# Usage:
#
# python -m email_validator test@example.org
# python -m email_validator < LIST_OF_ADDRESSES.TXT
#
# Provide email addresses to validate either as a command-line argument
# or in STDIN separated by newlines. Validation errors will be printed for
# invalid email addresses. When passing an email address on the command
# line, if the email address is valid, information about it will be printed.
# When using STDIN, no output will be given for valid email addresses.
#
# Keyword arguments to validate_email can be set in environment variables
# of the same name but upprcase (see below).
import json
import os
import sys
from typing import Any, Dict, Optional
from .validate_email import validate_email, _Resolver
from .deliverability import caching_resolver
from .exceptions_types import EmailNotValidError
def main(dns_resolver: Optional[_Resolver] = None) -> None:
# The dns_resolver argument is for tests.
# Set options from environment variables.
options: Dict[str, Any] = {}
for varname in ('ALLOW_SMTPUTF8', 'ALLOW_QUOTED_LOCAL', 'ALLOW_DOMAIN_LITERAL',
'GLOBALLY_DELIVERABLE', 'CHECK_DELIVERABILITY', 'TEST_ENVIRONMENT'):
if varname in os.environ:
options[varname.lower()] = bool(os.environ[varname])
for varname in ('DEFAULT_TIMEOUT',):
if varname in os.environ:
options[varname.lower()] = float(os.environ[varname])
if len(sys.argv) == 1:
# Validate the email addresses pased line-by-line on STDIN.
dns_resolver = dns_resolver or caching_resolver()
for line in sys.stdin:
email = line.strip()
try:
validate_email(email, dns_resolver=dns_resolver, **options)
except EmailNotValidError as e:
print(f"{email} {e}")
else:
# Validate the email address passed on the command line.
email = sys.argv[1]
try:
result = validate_email(email, dns_resolver=dns_resolver, **options)
print(json.dumps(result.as_dict(), indent=2, sort_keys=True, ensure_ascii=False))
except EmailNotValidError as e:
print(e)
if __name__ == "__main__":
main()
from typing import Any, List, Optional, Tuple, TypedDict
import ipaddress
from .exceptions_types import EmailUndeliverableError
import dns.resolver
import dns.exception
def caching_resolver(*, timeout: Optional[int] = None, cache: Any = None, dns_resolver: Optional[dns.resolver.Resolver] = None) -> dns.resolver.Resolver:
if timeout is None:
from . import DEFAULT_TIMEOUT
timeout = DEFAULT_TIMEOUT
resolver = dns_resolver or dns.resolver.Resolver()
resolver.cache = cache or dns.resolver.LRUCache()
resolver.lifetime = timeout # timeout, in seconds
return resolver
DeliverabilityInfo = TypedDict("DeliverabilityInfo", {
"mx": List[Tuple[int, str]],
"mx_fallback_type": Optional[str],
"unknown-deliverability": str,
}, total=False)
def validate_email_deliverability(domain: str, domain_i18n: str, timeout: Optional[int] = None, dns_resolver: Optional[dns.resolver.Resolver] = None) -> DeliverabilityInfo:
# Check that the domain resolves to an MX record. If there is no MX record,
# try an A or AAAA record which is a deprecated fallback for deliverability.
# Raises an EmailUndeliverableError on failure. On success, returns a dict
# with deliverability information.
# If no dns.resolver.Resolver was given, get dnspython's default resolver.
# Override the default resolver's timeout. This may affect other uses of
# dnspython in this process.
if dns_resolver is None:
from . import DEFAULT_TIMEOUT
if timeout is None:
timeout = DEFAULT_TIMEOUT
dns_resolver = dns.resolver.get_default_resolver()
dns_resolver.lifetime = timeout
elif timeout is not None:
raise ValueError("It's not valid to pass both timeout and dns_resolver.")
deliverability_info: DeliverabilityInfo = {}
try:
try:
# Try resolving for MX records (RFC 5321 Section 5).
response = dns_resolver.resolve(domain, "MX")
# For reporting, put them in priority order and remove the trailing dot in the qnames.
mtas = sorted([(r.preference, str(r.exchange).rstrip('.')) for r in response])
# RFC 7505: Null MX (0, ".") records signify the domain does not accept email.
# Remove null MX records from the mtas list (but we've stripped trailing dots,
# so the 'exchange' is just "") so we can check if there are no non-null MX
# records remaining.
mtas = [(preference, exchange) for preference, exchange in mtas
if exchange != ""]
if len(mtas) == 0: # null MX only, if there were no MX records originally a NoAnswer exception would have occurred
raise EmailUndeliverableError(f"The domain name {domain_i18n} does not accept email.")
deliverability_info["mx"] = mtas
deliverability_info["mx_fallback_type"] = None
except dns.resolver.NoAnswer:
# If there was no MX record, fall back to an A or AAA record
# (RFC 5321 Section 5). Check A first since it's more common.
# If the A/AAAA response has no Globally Reachable IP address,
# treat the response as if it were NoAnswer, i.e., the following
# address types are not allowed fallbacks: Private-Use, Loopback,
# Link-Local, and some other obscure ranges. See
# https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
# https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
# (Issue #134.)
def is_global_addr(address: Any) -> bool:
try:
ipaddr = ipaddress.ip_address(address)
except ValueError:
return False
return ipaddr.is_global
try:
response = dns_resolver.resolve(domain, "A")
if not any(is_global_addr(r.address) for r in response):
raise dns.resolver.NoAnswer # fall back to AAAA
deliverability_info["mx"] = [(0, domain)]
deliverability_info["mx_fallback_type"] = "A"
except dns.resolver.NoAnswer:
# If there was no A record, fall back to an AAAA record.
# (It's unclear if SMTP servers actually do this.)
try:
response = dns_resolver.resolve(domain, "AAAA")
if not any(is_global_addr(r.address) for r in response):
raise dns.resolver.NoAnswer
deliverability_info["mx"] = [(0, domain)]
deliverability_info["mx_fallback_type"] = "AAAA"
except dns.resolver.NoAnswer as e:
# If there was no MX, A, or AAAA record, then mail to
# this domain is not deliverable, although the domain
# name has other records (otherwise NXDOMAIN would
# have been raised).
raise EmailUndeliverableError(f"The domain name {domain_i18n} does not accept email.") from e
# Check for a SPF (RFC 7208) reject-all record ("v=spf1 -all") which indicates
# no emails are sent from this domain (similar to a Null MX record
# but for sending rather than receiving). In combination with the
# absence of an MX record, this is probably a good sign that the
# domain is not used for email.
try:
response = dns_resolver.resolve(domain, "TXT")
for rec in response:
value = b"".join(rec.strings)
if value.startswith(b"v=spf1 "):
if value == b"v=spf1 -all":
raise EmailUndeliverableError(f"The domain name {domain_i18n} does not send email.")
except dns.resolver.NoAnswer:
# No TXT records means there is no SPF policy, so we cannot take any action.
pass
except dns.resolver.NXDOMAIN as e:
# The domain name does not exist --- there are no records of any sort
# for the domain name.
raise EmailUndeliverableError(f"The domain name {domain_i18n} does not exist.") from e
except dns.resolver.NoNameservers:
# All nameservers failed to answer the query. This might be a problem
# with local nameservers, maybe? We'll allow the domain to go through.
return {
"unknown-deliverability": "no_nameservers",
}
except dns.exception.Timeout:
# A timeout could occur for various reasons, so don't treat it as a failure.
return {
"unknown-deliverability": "timeout",
}
except EmailUndeliverableError:
# Don't let these get clobbered by the wider except block below.
raise
except Exception as e:
# Unhandled conditions should not propagate.
raise EmailUndeliverableError(
"There was an error while checking if the domain name in the email address is deliverable: " + str(e)
) from e
return deliverability_info
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union
class EmailNotValidError(ValueError):
"""Parent class of all exceptions raised by this module."""
pass
class EmailSyntaxError(EmailNotValidError):
"""Exception raised when an email address fails validation because of its form."""
pass
class EmailUndeliverableError(EmailNotValidError):
"""Exception raised when an email address fails validation because its domain name does not appear deliverable."""
pass
class ValidatedEmail:
"""The validate_email function returns objects of this type holding the normalized form of the email address
and other information."""
"""The email address that was passed to validate_email. (If passed as bytes, this will be a string.)"""
original: str
"""The normalized email address, which should always be used in preference to the original address.
The normalized address converts an IDNA ASCII domain name to Unicode, if possible, and performs
Unicode normalization on the local part and on the domain (if originally Unicode). It is the
concatenation of the local_part and domain attributes, separated by an @-sign."""
normalized: str
"""The local part of the email address after Unicode normalization."""
local_part: str
"""The domain part of the email address after Unicode normalization or conversion to
Unicode from IDNA ascii."""
domain: str
"""If the domain part is a domain literal, the IPv4Address or IPv6Address object."""
domain_address: object
"""If not None, a form of the email address that uses 7-bit ASCII characters only."""
ascii_email: Optional[str]
"""If not None, the local part of the email address using 7-bit ASCII characters only."""
ascii_local_part: Optional[str]
"""A form of the domain name that uses 7-bit ASCII characters only."""
ascii_domain: str
"""If True, the SMTPUTF8 feature of your mail relay will be required to transmit messages
to this address. This flag is True just when ascii_local_part is missing. Otherwise it
is False."""
smtputf8: bool
"""If a deliverability check is performed and if it succeeds, a list of (priority, domain)
tuples of MX records specified in the DNS for the domain."""
mx: List[Tuple[int, str]]
"""If no MX records are actually specified in DNS and instead are inferred, through an obsolete
mechanism, from A or AAAA records, the value is the type of DNS record used instead (`A` or `AAAA`)."""
mx_fallback_type: Optional[str]
"""The display name in the original input text, unquoted and unescaped, or None."""
display_name: Optional[str]
def __repr__(self) -> str:
return f"<ValidatedEmail {self.normalized}>"
"""For backwards compatibility, support old field names."""
def __getattr__(self, key: str) -> str:
if key == "original_email":
return self.original
if key == "email":
return self.normalized
raise AttributeError(key)
@property
def email(self) -> str:
warnings.warn("ValidatedEmail.email is deprecated and will be removed, use ValidatedEmail.normalized instead", DeprecationWarning)
return self.normalized
"""For backwards compatibility, some fields are also exposed through a dict-like interface. Note
that some of the names changed when they became attributes."""
def __getitem__(self, key: str) -> Union[Optional[str], bool, List[Tuple[int, str]]]:
warnings.warn("dict-like access to the return value of validate_email is deprecated and may not be supported in the future.", DeprecationWarning, stacklevel=2)
if key == "email":
return self.normalized
if key == "email_ascii":
return self.ascii_email
if key == "local":
return self.local_part
if key == "domain":
return self.ascii_domain
if key == "domain_i18n":
return self.domain
if key == "smtputf8":
return self.smtputf8
if key == "mx":
return self.mx
if key == "mx-fallback":
return self.mx_fallback_type
raise KeyError()
"""Tests use this."""
def __eq__(self, other: object) -> bool:
if not isinstance(other, ValidatedEmail):
return False
return (
self.normalized == other.normalized
and self.local_part == other.local_part
and self.domain == other.domain
and getattr(self, 'ascii_email', None) == getattr(other, 'ascii_email', None)
and getattr(self, 'ascii_local_part', None) == getattr(other, 'ascii_local_part', None)
and getattr(self, 'ascii_domain', None) == getattr(other, 'ascii_domain', None)
and self.smtputf8 == other.smtputf8
and repr(sorted(self.mx) if getattr(self, 'mx', None) else None)
== repr(sorted(other.mx) if getattr(other, 'mx', None) else None)
and getattr(self, 'mx_fallback_type', None) == getattr(other, 'mx_fallback_type', None)
and getattr(self, 'display_name', None) == getattr(other, 'display_name', None)
)
"""This helps producing the README."""
def as_constructor(self) -> str:
return "ValidatedEmail(" \
+ ",".join(f"\n {key}={repr(getattr(self, key))}"
for key in ('normalized', 'local_part', 'domain',
'ascii_email', 'ascii_local_part', 'ascii_domain',
'smtputf8', 'mx', 'mx_fallback_type',
'display_name')
if hasattr(self, key)
) \
+ ")"
"""Convenience method for accessing ValidatedEmail as a dict"""
def as_dict(self) -> Dict[str, Any]:
d = self.__dict__
if d.get('domain_address'):
d['domain_address'] = repr(d['domain_address'])
return d
# These constants are defined by the email specifications.
import re
# Based on RFC 5322 3.2.3, these characters are permitted in email
# addresses (not taking into account internationalization) separated by dots:
ATEXT = r'a-zA-Z0-9_!#\$%&\'\*\+\-/=\?\^`\{\|\}~'
ATEXT_RE = re.compile('[.' + ATEXT + ']') # ATEXT plus dots
DOT_ATOM_TEXT = re.compile('[' + ATEXT + ']+(?:\\.[' + ATEXT + r']+)*\Z')
# RFC 6531 3.3 extends the allowed characters in internationalized
# addresses to also include three specific ranges of UTF8 defined in
# RFC 3629 section 4, which appear to be the Unicode code points from
# U+0080 to U+10FFFF.
ATEXT_INTL = ATEXT + "\u0080-\U0010FFFF"
ATEXT_INTL_DOT_RE = re.compile('[.' + ATEXT_INTL + ']') # ATEXT_INTL plus dots
DOT_ATOM_TEXT_INTL = re.compile('[' + ATEXT_INTL + ']+(?:\\.[' + ATEXT_INTL + r']+)*\Z')
# The domain part of the email address, after IDNA (ASCII) encoding,
# must also satisfy the requirements of RFC 952/RFC 1123 2.1 which
# restrict the allowed characters of hostnames further.
ATEXT_HOSTNAME_INTL = re.compile(r"[a-zA-Z0-9\-\." + "\u0080-\U0010FFFF" + "]")
HOSTNAME_LABEL = r'(?:(?:[a-zA-Z0-9][a-zA-Z0-9\-]*)?[a-zA-Z0-9])'
DOT_ATOM_TEXT_HOSTNAME = re.compile(HOSTNAME_LABEL + r'(?:\.' + HOSTNAME_LABEL + r')*\Z')
DOMAIN_NAME_REGEX = re.compile(r"[A-Za-z]\Z") # all TLDs currently end with a letter
# Domain literal (RFC 5322 3.4.1)
DOMAIN_LITERAL_CHARS = re.compile(r"[\u0021-\u00FA\u005E-\u007E]")
# Quoted-string local part (RFC 5321 4.1.2, internationalized by RFC 6531 3.3)
# The permitted characters in a quoted string are the characters in the range
# 32-126, except that quotes and (literal) backslashes can only appear when escaped
# by a backslash. When internationalized, UTF-8 strings are also permitted except
# the ASCII characters that are not previously permitted (see above).
# QUOTED_LOCAL_PART_ADDR = re.compile(r"^\"((?:[\u0020-\u0021\u0023-\u005B\u005D-\u007E]|\\[\u0020-\u007E])*)\"@(.*)")
QTEXT_INTL = re.compile(r"[\u0020-\u007E\u0080-\U0010FFFF]")
# Length constants
# RFC 3696 + errata 1003 + errata 1690 (https://www.rfc-editor.org/errata_search.php?rfc=3696&eid=1690)
# explains the maximum length of an email address is 254 octets.
EMAIL_MAX_LENGTH = 254
LOCAL_PART_MAX_LENGTH = 64
DNS_LABEL_LENGTH_LIMIT = 63 # in "octets", RFC 1035 2.3.1
DOMAIN_MAX_LENGTH = 253 # in "octets" as transmitted, RFC 1035 2.3.4 and RFC 5321 4.5.3.1.2, and see https://stackoverflow.com/questions/32290167/what-is-the-maximum-length-of-a-dns-name
# RFC 2142
CASE_INSENSITIVE_MAILBOX_NAMES = [
'info', 'marketing', 'sales', 'support', # section 3
'abuse', 'noc', 'security', # section 4
'postmaster', 'hostmaster', 'usenet', 'news', 'webmaster', 'www', 'uucp', 'ftp', # section 5
]
This diff is collapsed.
from typing import Optional, Union, TYPE_CHECKING
import unicodedata
from .exceptions_types import EmailSyntaxError, ValidatedEmail
from .syntax import split_email, validate_email_local_part, validate_email_domain_name, validate_email_domain_literal, validate_email_length
from .rfc_constants import CASE_INSENSITIVE_MAILBOX_NAMES
if TYPE_CHECKING:
import dns.resolver
_Resolver = dns.resolver.Resolver
else:
_Resolver = object
def validate_email(
email: Union[str, bytes],
/, # prior arguments are positional-only
*, # subsequent arguments are keyword-only
allow_smtputf8: Optional[bool] = None,
allow_empty_local: bool = False,
allow_quoted_local: Optional[bool] = None,
allow_domain_literal: Optional[bool] = None,
allow_display_name: Optional[bool] = None,
check_deliverability: Optional[bool] = None,
test_environment: Optional[bool] = None,
globally_deliverable: Optional[bool] = None,
timeout: Optional[int] = None,
dns_resolver: Optional[_Resolver] = None
) -> ValidatedEmail:
"""
Given an email address, and some options, returns a ValidatedEmail instance
with information about the address if it is valid or, if the address is not
valid, raises an EmailNotValidError. This is the main function of the module.
"""
# Fill in default values of arguments.
from . import ALLOW_SMTPUTF8, ALLOW_QUOTED_LOCAL, ALLOW_DOMAIN_LITERAL, ALLOW_DISPLAY_NAME, \
GLOBALLY_DELIVERABLE, CHECK_DELIVERABILITY, TEST_ENVIRONMENT, DEFAULT_TIMEOUT
if allow_smtputf8 is None:
allow_smtputf8 = ALLOW_SMTPUTF8
if allow_quoted_local is None:
allow_quoted_local = ALLOW_QUOTED_LOCAL
if allow_domain_literal is None:
allow_domain_literal = ALLOW_DOMAIN_LITERAL
if allow_display_name is None:
allow_display_name = ALLOW_DISPLAY_NAME
if check_deliverability is None:
check_deliverability = CHECK_DELIVERABILITY
if test_environment is None:
test_environment = TEST_ENVIRONMENT
if globally_deliverable is None:
globally_deliverable = GLOBALLY_DELIVERABLE
if timeout is None and dns_resolver is None:
timeout = DEFAULT_TIMEOUT
# Allow email to be a str or bytes instance. If bytes,
# it must be ASCII because that's how the bytes work
# on the wire with SMTP.
if not isinstance(email, str):
try:
email = email.decode("ascii")
except ValueError as e:
raise EmailSyntaxError("The email address is not valid ASCII.") from e
# Split the address into the display name (or None), the local part
# (before the @-sign), and the domain part (after the @-sign).
# Normally, there is only one @-sign. But the awkward "quoted string"
# local part form (RFC 5321 4.1.2) allows @-signs in the local
# part if the local part is quoted.
display_name, local_part, domain_part, is_quoted_local_part \
= split_email(email)
# Collect return values in this instance.
ret = ValidatedEmail()
ret.original = ((local_part if not is_quoted_local_part
else ('"' + local_part + '"'))
+ "@" + domain_part) # drop the display name, if any, for email length tests at the end
ret.display_name = display_name
# Validate the email address's local part syntax and get a normalized form.
# If the original address was quoted and the decoded local part is a valid
# unquoted local part, then we'll get back a normalized (unescaped) local
# part.
local_part_info = validate_email_local_part(local_part,
allow_smtputf8=allow_smtputf8,
allow_empty_local=allow_empty_local,
quoted_local_part=is_quoted_local_part)
ret.local_part = local_part_info["local_part"]
ret.ascii_local_part = local_part_info["ascii_local_part"]
ret.smtputf8 = local_part_info["smtputf8"]
# RFC 6532 section 3.1 says that Unicode NFC normalization should be applied,
# so we'll return the NFC-normalized local part. Since the caller may use that
# string in place of the original string, ensure it is also valid.
normalized_local_part = unicodedata.normalize("NFC", ret.local_part)
if normalized_local_part != ret.local_part:
try:
validate_email_local_part(normalized_local_part,
allow_smtputf8=allow_smtputf8,
allow_empty_local=allow_empty_local,
quoted_local_part=is_quoted_local_part)
except EmailSyntaxError as e:
raise EmailSyntaxError("After Unicode normalization: " + str(e)) from e
ret.local_part = normalized_local_part
# If a quoted local part isn't allowed but is present, now raise an exception.
# This is done after any exceptions raised by validate_email_local_part so
# that mandatory checks have highest precedence.
if is_quoted_local_part and not allow_quoted_local:
raise EmailSyntaxError("Quoting the part before the @-sign is not allowed here.")
# Some local parts are required to be case-insensitive, so we should normalize
# to lowercase.
# RFC 2142
if ret.ascii_local_part is not None \
and ret.ascii_local_part.lower() in CASE_INSENSITIVE_MAILBOX_NAMES \
and ret.local_part is not None:
ret.ascii_local_part = ret.ascii_local_part.lower()
ret.local_part = ret.local_part.lower()
# Validate the email address's domain part syntax and get a normalized form.
is_domain_literal = False
if len(domain_part) == 0:
raise EmailSyntaxError("There must be something after the @-sign.")
elif domain_part.startswith("[") and domain_part.endswith("]"):
# Parse the address in the domain literal and get back a normalized domain.
domain_literal_info = validate_email_domain_literal(domain_part[1:-1])
if not allow_domain_literal:
raise EmailSyntaxError("A bracketed IP address after the @-sign is not allowed here.")
ret.domain = domain_literal_info["domain"]
ret.ascii_domain = domain_literal_info["domain"] # Domain literals are always ASCII.
ret.domain_address = domain_literal_info["domain_address"]
is_domain_literal = True # Prevent deliverability checks.
else:
# Check the syntax of the domain and get back a normalized
# internationalized and ASCII form.
domain_name_info = validate_email_domain_name(domain_part, test_environment=test_environment, globally_deliverable=globally_deliverable)
ret.domain = domain_name_info["domain"]
ret.ascii_domain = domain_name_info["ascii_domain"]
# Construct the complete normalized form.
ret.normalized = ret.local_part + "@" + ret.domain
# If the email address has an ASCII form, add it.
if not ret.smtputf8:
if not ret.ascii_domain:
raise Exception("Missing ASCII domain.")
ret.ascii_email = (ret.ascii_local_part or "") + "@" + ret.ascii_domain
else:
ret.ascii_email = None
# Check the length of the address.
validate_email_length(ret)
# Check that a display name is permitted. It's the last syntax check
# because we always check against optional parsing features last.
if display_name is not None and not allow_display_name:
raise EmailSyntaxError("A display name and angle brackets around the email address are not permitted here.")
if check_deliverability and not test_environment:
# Validate the email address's deliverability using DNS
# and update the returned ValidatedEmail object with metadata.
if is_domain_literal:
# There is nothing to check --- skip deliverability checks.
return ret
# Lazy load `deliverability` as it is slow to import (due to dns.resolver)
from .deliverability import validate_email_deliverability
deliverability_info = validate_email_deliverability(
ret.ascii_domain, ret.domain, timeout, dns_resolver
)
mx = deliverability_info.get("mx")
if mx is not None:
ret.mx = mx
ret.mx_fallback_type = deliverability_info.get("mx_fallback_type")
return ret
Metadata-Version: 2.1
Name: pydantic-settings
Version: 2.0.3
Summary: Settings management using Pydantic
Project-URL: Homepage, https://github.com/pydantic/pydantic-settings
Project-URL: Funding, https://github.com/sponsors/samuelcolvin
Project-URL: Source, https://github.com/pydantic/pydantic-settings
Project-URL: Changelog, https://github.com/pydantic/pydantic-settings/releases
Project-URL: Documentation, https://docs.pydantic.dev/dev-v2/usage/pydantic_settings/
Author-email: Samuel Colvin <s@muelcolvin.com>, Eric Jolibois <em.jolibois@gmail.com>, Hasan Ramezani <hasan.r67@gmail.com>
License-Expression: MIT
License-File: LICENSE
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Console
Classifier: Environment :: MacOS X
Classifier: Framework :: Pydantic
Classifier: Framework :: Pydantic :: 2
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: POSIX :: Linux
Classifier: Operating System :: Unix
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Topic :: Internet
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.7
Requires-Dist: pydantic>=2.0.1
Requires-Dist: python-dotenv>=0.21.0
Description-Content-Type: text/markdown
# pydantic-settings
[![CI](https://github.com/pydantic/pydantic-settings/workflows/CI/badge.svg?event=push)](https://github.com/pydantic/pydantic-settings/actions?query=event%3Apush+branch%3Amain+workflow%3ACI)
[![Coverage](https://codecov.io/gh/pydantic/pydantic-settings/branch/main/graph/badge.svg)](https://codecov.io/gh/pydantic/pydantic-settings)
[![pypi](https://img.shields.io/pypi/v/pydantic-settings.svg)](https://pypi.python.org/pypi/pydantic-settings)
[![license](https://img.shields.io/github/license/pydantic/pydantic-settings.svg)](https://github.com/pydantic/pydantic-settings/blob/main/LICENSE)
Settings management using Pydantic, this is the new official home of Pydantic's `BaseSettings`.
This package was kindly donated to the [Pydantic organisation](https://github.com/pydantic) by Daniel Daniels, see [pydantic/pydantic#4492](https://github.com/pydantic/pydantic/pull/4492) for discussion.
For the old "Hipster-orgazmic tool to mange application settings" package, see [version 0.2.5](https://pypi.org/project/pydantic-settings/0.2.5/).
See [documentation](https://docs.pydantic.dev/latest/usage/pydantic_settings/) for more details.
pydantic_settings-2.0.3.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
pydantic_settings-2.0.3.dist-info/METADATA,sha256=iuDM6bM6VDeLKrOyfSRQiE4Bp_SqFNmDvNYxjNlojEU,2924
pydantic_settings-2.0.3.dist-info/RECORD,,
pydantic_settings-2.0.3.dist-info/REQUESTED,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
pydantic_settings-2.0.3.dist-info/WHEEL,sha256=9QBuHhg6FNW7lppboF2vKVbCGTVzsFykgRQjjlajrhA,87
pydantic_settings-2.0.3.dist-info/licenses/LICENSE,sha256=6zVadT4CA0bTPYO_l2kTW4n8YQVorFMaAcKVvO5_2Zg,1103
pydantic_settings/__init__.py,sha256=h0HRyW_I6s0YYFIB-qx8gNZOtDI8vCbXnwPbp4BqwzE,482
pydantic_settings/__pycache__/__init__.cpython-39.pyc,,
pydantic_settings/__pycache__/main.cpython-39.pyc,,
pydantic_settings/__pycache__/sources.cpython-39.pyc,,
pydantic_settings/__pycache__/utils.cpython-39.pyc,,
pydantic_settings/__pycache__/version.cpython-39.pyc,,
pydantic_settings/main.py,sha256=DPJPyjM9g7CgaB8-zuoydot1iYVuLOb05rJZUXDt1-o,7178
pydantic_settings/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
pydantic_settings/sources.py,sha256=ruCzD_1mL9e20o-33B7n46cTE5COCJ0524w29uED5BM,24857
pydantic_settings/utils.py,sha256=nomYSaFO_IegfWSL9KJ8SAtLZgyhcruLgE3dTHwSmgo,557
pydantic_settings/version.py,sha256=gemzbOzXm8MxToVh3wokBkbvZFRFfCkFQumP9kJFca4,18
Wheel-Version: 1.0
Generator: hatchling 1.18.0
Root-Is-Purelib: true
Tag: py3-none-any
The MIT License (MIT)
Copyright (c) 2022 Samuel Colvin and other contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
from .main import BaseSettings, SettingsConfigDict
from .sources import (
DotEnvSettingsSource,
EnvSettingsSource,
InitSettingsSource,
PydanticBaseSettingsSource,
SecretsSettingsSource,
)
from .version import VERSION
__all__ = (
'BaseSettings',
'DotEnvSettingsSource',
'EnvSettingsSource',
'InitSettingsSource',
'PydanticBaseSettingsSource',
'SecretsSettingsSource',
'SettingsConfigDict',
'__version__',
)
__version__ = VERSION
from __future__ import annotations as _annotations
from pathlib import Path
from typing import Any, ClassVar
from pydantic import ConfigDict
from pydantic._internal._config import config_keys
from pydantic._internal._utils import deep_update
from pydantic.main import BaseModel
from .sources import (
ENV_FILE_SENTINEL,
DotEnvSettingsSource,
DotenvType,
EnvSettingsSource,
InitSettingsSource,
PydanticBaseSettingsSource,
SecretsSettingsSource,
)
class SettingsConfigDict(ConfigDict, total=False):
case_sensitive: bool
env_prefix: str
env_file: DotenvType | None
env_file_encoding: str | None
env_nested_delimiter: str | None
secrets_dir: str | Path | None
# Extend `config_keys` by pydantic settings config keys to
# support setting config through class kwargs.
# Pydantic uses `config_keys` in `pydantic._internal._config.ConfigWrapper.for_model`
# to extract config keys from model kwargs, So, by adding pydantic settings keys to
# `config_keys`, they will be considered as valid config keys and will be collected
# by Pydantic.
config_keys |= set(SettingsConfigDict.__annotations__.keys())
class BaseSettings(BaseModel):
"""
Base class for settings, allowing values to be overridden by environment variables.
This is useful in production for secrets you do not wish to save in code, it plays nicely with docker(-compose),
Heroku and any 12 factor app design.
All the below attributes can be set via `model_config`.
Args:
_case_sensitive: Whether environment variables names should be read with case-sensitivity. Defaults to `None`.
_env_prefix: Prefix for all environment variables. Defaults to `None`.
_env_file: The env file(s) to load settings values from. Defaults to `Path('')`, which
means that the value from `model_config['env_file']` should be used. You can also pass
`None` to indicate that environment variables should not be loaded from an env file.
_env_file_encoding: The env file encoding, e.g. `'latin-1'`. Defaults to `None`.
_env_nested_delimiter: The nested env values delimiter. Defaults to `None`.
_secrets_dir: The secret files directory. Defaults to `None`.
"""
def __init__(
__pydantic_self__,
_case_sensitive: bool | None = None,
_env_prefix: str | None = None,
_env_file: DotenvType | None = ENV_FILE_SENTINEL,
_env_file_encoding: str | None = None,
_env_nested_delimiter: str | None = None,
_secrets_dir: str | Path | None = None,
**values: Any,
) -> None:
# Uses something other than `self` the first arg to allow "self" as a settable attribute
super().__init__(
**__pydantic_self__._settings_build_values(
values,
_case_sensitive=_case_sensitive,
_env_prefix=_env_prefix,
_env_file=_env_file,
_env_file_encoding=_env_file_encoding,
_env_nested_delimiter=_env_nested_delimiter,
_secrets_dir=_secrets_dir,
)
)
@classmethod
def settings_customise_sources(
cls,
settings_cls: type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
"""
Define the sources and their order for loading the settings values.
Args:
settings_cls: The Settings class.
init_settings: The `InitSettingsSource` instance.
env_settings: The `EnvSettingsSource` instance.
dotenv_settings: The `DotEnvSettingsSource` instance.
file_secret_settings: The `SecretsSettingsSource` instance.
Returns:
A tuple containing the sources and their order for loading the settings values.
"""
return init_settings, env_settings, dotenv_settings, file_secret_settings
def _settings_build_values(
self,
init_kwargs: dict[str, Any],
_case_sensitive: bool | None = None,
_env_prefix: str | None = None,
_env_file: DotenvType | None = None,
_env_file_encoding: str | None = None,
_env_nested_delimiter: str | None = None,
_secrets_dir: str | Path | None = None,
) -> dict[str, Any]:
# Determine settings config values
case_sensitive = _case_sensitive if _case_sensitive is not None else self.model_config.get('case_sensitive')
env_prefix = _env_prefix if _env_prefix is not None else self.model_config.get('env_prefix')
env_file = _env_file if _env_file != ENV_FILE_SENTINEL else self.model_config.get('env_file')
env_file_encoding = (
_env_file_encoding if _env_file_encoding is not None else self.model_config.get('env_file_encoding')
)
env_nested_delimiter = (
_env_nested_delimiter
if _env_nested_delimiter is not None
else self.model_config.get('env_nested_delimiter')
)
secrets_dir = _secrets_dir if _secrets_dir is not None else self.model_config.get('secrets_dir')
# Configure built-in sources
init_settings = InitSettingsSource(self.__class__, init_kwargs=init_kwargs)
env_settings = EnvSettingsSource(
self.__class__,
case_sensitive=case_sensitive,
env_prefix=env_prefix,
env_nested_delimiter=env_nested_delimiter,
)
dotenv_settings = DotEnvSettingsSource(
self.__class__,
env_file=env_file,
env_file_encoding=env_file_encoding,
case_sensitive=case_sensitive,
env_prefix=env_prefix,
env_nested_delimiter=env_nested_delimiter,
)
file_secret_settings = SecretsSettingsSource(
self.__class__, secrets_dir=secrets_dir, case_sensitive=case_sensitive, env_prefix=env_prefix
)
# Provide a hook to set built-in sources priority and add / remove sources
sources = self.settings_customise_sources(
self.__class__,
init_settings=init_settings,
env_settings=env_settings,
dotenv_settings=dotenv_settings,
file_secret_settings=file_secret_settings,
)
if sources:
return deep_update(*reversed([source() for source in sources]))
else:
# no one should mean to do this, but I think returning an empty dict is marginally preferable
# to an informative error and much better than a confusing error
return {}
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
extra='forbid',
arbitrary_types_allowed=True,
validate_default=True,
case_sensitive=False,
env_prefix='',
env_file=None,
env_file_encoding=None,
env_nested_delimiter=None,
secrets_dir=None,
protected_namespaces=('model_', 'settings_'),
)
This diff is collapsed.
from pathlib import Path
path_type_labels = {
'is_dir': 'directory',
'is_file': 'file',
'is_mount': 'mount point',
'is_symlink': 'symlink',
'is_block_device': 'block device',
'is_char_device': 'char device',
'is_fifo': 'FIFO',
'is_socket': 'socket',
}
def path_type_label(p: Path) -> str:
"""
Find out what sort of thing a path is.
"""
assert p.exists(), 'path does not exist'
for method, name in path_type_labels.items():
if getattr(p, method)():
return name
return 'unknown'
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.services.auth import get_current_user
from app.models.role import get_role_level
from app.core.config import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
......@@ -16,10 +18,57 @@ async def get_current_active_user(token: str = Depends(oauth2_scheme)):
return user
async def get_current_admin_user(current_user: dict = Depends(get_current_active_user)):
"""获取当前管理员用户"""
if current_user.get("role") != "admin":
"""获取当前管理员用户(兼容 admin/administrator 两种命名)"""
if current_user.get("role") not in {"admin", "administrator"}:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="权限不足,需要管理员权限"
)
return current_user
\ No newline at end of file
return current_user
# 通用的按角色等级校验的依赖
def require_role(min_level: int):
"""
按最小等级校验权限的依赖工厂。
使用方法:Depends(require_role(3)) 表示仅 3级及以上可访问。
关键点:
- 提前返回,避免多层嵌套;
- 容错处理:当用户缺少 role_level 字段时,根据 role 计算等级。
"""
async def _checker(current_user: dict = Depends(get_current_active_user)):
# 兼容:优先取 role_level,没有则通过 role 计算
level = current_user.get("role_level")
if level is None:
level = get_role_level(current_user.get("role"))
if level < min_level:
raise HTTPException(status_code=403, detail="权限不足")
return current_user
return _checker
def require_edition_for_mode():
"""
版别运行模式依赖:当后端设置为 APP_MODE=edu 或 APP_MODE=biz 时,限制仅允许对应版别的用户访问。
使用方式:在路由层统一挂载,例如:
router = APIRouter(dependencies=[Depends(require_edition_for_mode())])
设计要点:
- 提前返回,避免多层嵌套;
- 与现有认证依赖复用:接收 current_user,避免重复解析 token;
- 容错与默认值:当 APP_MODE 为未知值时默认放行,但建议仅使用 "edu" 或 "biz"。
"""
async def _edition_checker(current_user: dict = Depends(get_current_active_user)):
mode = (settings.APP_MODE or "edu").lower() # 默认 edu
# 仅当模式为 edu 或 biz 时进行限制;其它值(如意外)默认放行
if mode in {"edu", "biz"}:
user_edition = (current_user.get("edition") or "").lower()
if user_edition != mode:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"当前后端运行模式为 '{mode}',用户版别 '{user_edition}' 无权访问"
)
return current_user
return _edition_checker
\ No newline at end of file
......@@ -4,7 +4,7 @@ from datetime import datetime
import json
import csv
import io
from app.api.deps import get_current_admin_user
from app.api.deps import get_current_admin_user, require_edition_for_mode
from app.schemas.sensitive_word import (
SensitiveWordCreate, SensitiveWordResponse, SensitiveRecordResponse,
SensitiveWordBulkImport, CategoryCreate, CategoryResponse, CategoriesResponse
......@@ -16,7 +16,8 @@ from app.services.sensitive_word import (
)
from app.models.sensitive_word import SENSITIVE_WORD_CATEGORIES, SENSITIVE_WORD_SUBCATEGORIES
router = APIRouter()
# 在路由层挂载版别运行模式依赖,确保仅在当前模式的版别下进行管理操作
router = APIRouter(dependencies=[Depends(require_edition_for_mode())])
@router.post("/sensitive-words", response_model=dict, status_code=status.HTTP_201_CREATED)
async def create_sensitive_word(
......
......@@ -5,6 +5,7 @@ from app.core.config import settings
from app.db.mongodb import db
from app.schemas.user import UserCreate, UserResponse, Token
from app.services.auth import authenticate_user, create_access_token, get_password_hash
from app.models.role import get_role_level
router = APIRouter()
......@@ -33,7 +34,9 @@ async def register(user_data: UserCreate):
"username": user_data.username,
"email": user_data.email,
"hashed_password": hashed_password,
"role": "user" # 默认为普通用户
"role": "user", # 默认为普通用户
"role_level": 1, # 默认为 1 级
"edition": "edu", # 默认为教育版,可在后续管理员界面修改
}
result = await db.db.users.insert_one(user)
......@@ -45,7 +48,9 @@ async def register(user_data: UserCreate):
"id": str(created_user["_id"]),
"username": created_user["username"],
"email": created_user["email"],
"role": created_user["role"]
"role": created_user.get("role", "user"),
"role_level": created_user.get("role_level", 1),
"edition": created_user.get("edition", "edu"),
}
@router.post("/login", response_model=Token)
......@@ -59,11 +64,22 @@ async def login(form_data: OAuth2PasswordRequestForm = Depends()):
headers={"WWW-Authenticate": "Bearer"},
)
# 创建访问令牌
# 创建访问令牌(在载荷中加入用户的角色与版别信息)
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(user["_id"])},
data={
"sub": str(user["_id"]),
"role": user.get("role", "user"),
"role_level": user.get("role_level", get_role_level(user.get("role"))),
"edition": user.get("edition", "edu"),
},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
\ No newline at end of file
# 登录响应中回传基础用户信息,前端免一次 /me 调用;后端仍应以服务端鉴权为准
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.get("role", "user"),
"role_level": user.get("role_level", get_role_level(user.get("role"))),
"edition": user.get("edition", "edu"),
}
\ No newline at end of file
from fastapi import APIRouter, Depends, HTTPException, status
from app.api.deps import get_current_active_user
from app.api.deps import get_current_active_user, require_edition_for_mode
from app.schemas.conversation import MessageCreate, ConversationResponse
from app.services.conversation import create_conversation, get_conversation, add_message, get_user_conversations
router = APIRouter()
# 在路由层挂载版别运行模式依赖,限制仅允许当前模式的用户访问
router = APIRouter(dependencies=[Depends(require_edition_for_mode())])
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_new_conversation(current_user: dict = Depends(get_current_active_user)):
......
from fastapi import APIRouter, Depends
from app.api.deps import require_role, require_edition_for_mode
# 在路由层挂载版别运行模式依赖,保证仅允许后端设置的版别访问
router = APIRouter(dependencies=[Depends(require_edition_for_mode())])
@router.get("/summary")
async def dashboard_summary(current_user: dict = Depends(require_role(1))):
"""
仪表盘概要接口:根据角色等级与版别返回不同的首页信息
- 最低 1 级即可访问,但返回内容随等级与版别递增
- 关键节点:避免多层嵌套,先取必要信息后按条件构造视图
"""
role = current_user.get("role", "user")
level = current_user.get("role_level", 1)
edition = current_user.get("edition", "edu")
# 基础公共信息(所有角色可见)
base = {
"welcome": f"欢迎 {current_user.get('username','')} 登录",
"edition": edition,
"role": role,
"role_level": level,
}
# 教育版视图
if edition == "edu":
if level == 1:
base.update({
"modules": ["今日出勤", "操行与教师意见"],
})
return base
if level == 2:
base.update({
"modules": ["班级出勤率", "课程/地点", "学生请假", "上级指示"],
})
return base
if level == 3:
base.update({
"modules": ["系部教师出勤", "学生考勤", "课堂异常指标", "上级指示"],
})
return base
if level == 4:
base.update({
"modules": ["校园整体安全", "教师出勤率", "资金预算", "部门进度", "本期目标"],
})
return base
# 系统管理员:与业务最高权限分离
base.update({
"modules": ["系统运行", "告警", "运维工具"],
})
return base
# 企业版视图(biz)
if level == 1:
base.update({
"modules": ["今日出勤", "绩效", "上级意见"],
})
return base
if level == 2:
base.update({
"modules": ["小组出勤率", "工单/排班", "请假审批", "负责人指示"],
})
return base
if level == 3:
base.update({
"modules": ["部门出勤", "任务进度", "异常指标", "负责人指示"],
})
return base
if level == 4:
base.update({
"modules": ["企业整体安全", "员工出勤率", "资金预算", "部门进度", "战略目标"],
})
return base
base.update({
"modules": ["系统运行", "告警", "运维工具"],
})
return base
\ No newline at end of file
from fastapi import APIRouter
from app.api.v1 import auth, conversation, admin
from app.api.v1 import auth, conversation, admin, dashboard
api_router = APIRouter()
# 注册各模块路由
api_router.include_router(auth.router, prefix="/auth", tags=["认证"])
api_router.include_router(conversation.router, prefix="/conversations", tags=["对话"])
api_router.include_router(admin.router, prefix="/admin", tags=["管理员"])
\ No newline at end of file
api_router.include_router(admin.router, prefix="/admin", tags=["管理员"])
api_router.include_router(dashboard.router, prefix="/dashboard", tags=["仪表盘"])
\ No newline at end of file
import os
from pydantic import BaseSettings
# Pydantic v2 中 BaseSettings 已迁移到 pydantic-settings
from pydantic_settings import BaseSettings
from dotenv import load_dotenv
# 加载环境变量
......@@ -23,4 +24,9 @@ class Settings(BaseSettings):
OLLAMA_BASE_URL: str = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
OLLAMA_MODEL: str = os.getenv("OLLAMA_MODEL", "llama2")
# 应用运行模式开关:仅运行教育版或企业版之一
# 允许的值:"edu" / "biz";若未设置则默认使用 "edu"
# 注意:不再提供混合模式(mixed),如需混合请显式设置并在依赖中放行
APP_MODE: str = os.getenv("APP_MODE", "edu")
settings = Settings()
\ No newline at end of file
"""
角色与权限等级的统一定义
设计说明:
- 为避免“神秘命名”和“重复代码”,将角色等级映射集中在一个模块维护。
- 同时兼容历史中的 "admin" 命名,映射为最高等级(5)。
"""
# 角色等级映射,数字越大权限越高
ROLE_ORDER = {
"user": 1, # 学生/员工
"manager": 2, # 班主任/组长/二级部门管理员
"leader": 3, # 中层干部/部门负责人/一级部门管理员
"master": 4, # 校长/集团高管/总负责人(业务最高)
"administrator": 5, # 系统管理员/运维超管(系统最高)
"admin": 5, # 历史兼容:旧代码中的 admin
}
# 合法版别(教育/企业)
VALID_EDITIONS = {"edu", "biz"}
def get_role_level(role: str) -> int:
"""根据角色字符串返回对应的权限等级,默认返回 1 级。
关键点:提前返回,避免多层嵌套。
"""
return ROLE_ORDER.get((role or "user").lower(), 1)
\ No newline at end of file
......@@ -7,37 +7,56 @@ from bson import ObjectId
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
# Pydantic v2 仍支持生成器形式的验证器
yield cls.validate
@classmethod
def validate(cls, v):
# 校验传入的值是否是合法的 ObjectId 字符串
if not ObjectId.is_valid(v):
raise ValueError("无效的ObjectId")
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
# Pydantic v2 中不再支持 __modify_schema__;如需自定义
# JSON Schema,可实现 __get_pydantic_json_schema__。当前
# 版本先保持默认 Schema,以确保运行稳定。
# 用户模型
# 用户模型(修复缩进错误:确保为顶层类定义)
class UserModel(BaseModel):
# MongoDB 主键,使用别名 _id,序列化时转换为字符串
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
# 用户名
username: str
# 邮箱(此处仅为字符串,外层使用 EmailStr 的 Schema 做校验)
email: str
# 哈希后的密码
hashed_password: str
role: str = "user" # "user" 或 "admin"
# 角色字符串(兼容历史中的 "admin")。建议与 app/models/role.py 中的枚举保持一致
role: str = "user"
# 角色等级(1~5),用于统一的权限判断
role_level: int = 1
# 版别:"edu"(教育版)或 "biz"(企业版)
edition: str = "edu"
# 创建与更新时间
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class Config:
# 允许使用字段名进行赋值(即使定义了 alias)
allow_population_by_field_name = True
# 允许使用自定义类型(如 ObjectId)
arbitrary_types_allowed = True
# 将 ObjectId 序列化为字符串,便于前端展示
json_encoders = {ObjectId: str}
# 示例数据,便于接口文档和调试
schema_extra = {
"example": {
"username": "user1",
"email": "user1@example.com",
"hashed_password": "hashed_password_here",
"role": "user",
"role_level": 1,
"edition": "edu",
}
}
\ No newline at end of file
from typing import Optional
from typing import Optional, Literal
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
......@@ -14,11 +14,26 @@ class UserResponse(BaseModel):
id: str
username: str
email: str
role: str
# 角色采用 Literal 强校验,同时兼容历史中的 "admin"
role: Literal["user", "manager", "leader", "master", "administrator", "admin"]
# 新增:角色等级,便于前端快速展示权限范围
role_level: int
# 版别采用 Literal 强校验:教育版/企业版
edition: Literal["edu", "biz"]
class Token(BaseModel):
access_token: str
token_type: str
# 扩展:在登录响应中返回关键用户属性,减少额外查询(也可单独提供 /me 接口)
# 使用 Literal 限定角色取值范围,保持与 UserResponse 一致
role: Optional[Literal["user", "manager", "leader", "master", "administrator", "admin"]] = None
role_level: Optional[int] = None
# 使用 Literal 限定版别取值范围
edition: Optional[Literal["edu", "biz"]] = None
class TokenData(BaseModel):
user_id: Optional[str] = None
\ No newline at end of file
user_id: Optional[str] = None
# TokenData 也保持与 Token 一致的角色限定,便于类型安全
role: Optional[Literal["user", "manager", "leader", "master", "administrator", "admin"]] = None
role_level: Optional[int] = None
edition: Optional[Literal["edu", "biz"]] = None
\ No newline at end of file
......@@ -5,6 +5,7 @@ from passlib.context import CryptContext
from app.core.config import settings
from app.db.mongodb import db
from bson import ObjectId
from app.models.role import get_role_level
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
......@@ -27,7 +28,11 @@ async def authenticate_user(username: str, password: str):
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""创建访问令牌"""
"""创建访问令牌
关键点:
- 在 JWT 载荷中加入角色与版别信息,便于前端解码后快速展示;
- 仍以服务端数据库查询为准,避免客户端伪造带来的安全问题。
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
......@@ -49,6 +54,11 @@ async def get_current_user(token: str):
if user is None:
return None
# 兼容兜底:若用户数据缺少角色等级或版别,则进行补充
if user.get("role_level") is None:
user["role_level"] = get_role_level(user.get("role"))
if user.get("edition") is None:
user["edition"] = "edu" # 默认版别为教育版
return user
except JWTError:
return None
\ No newline at end of file
import asyncio
import motor.motor_asyncio
import os
from dotenv import load_dotenv
from datetime import datetime
from bson import ObjectId
from passlib.context import CryptContext
......@@ -7,9 +9,14 @@ from passlib.context import CryptContext
# 密码加密工具
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 加载 .env 环境变量,保持与后端一致的配置来源
load_dotenv()
# MongoDB连接配置
MONGODB_URL = "mongodb://localhost:27017"
DB_NAME = "llm_filter_db"
MONGODB_URL = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
DB_NAME = os.getenv("DB_NAME", "llm_filter_db")
# 运行模式:仅运行教育版或企业版之一(不混合)
APP_MODE = (os.getenv("APP_MODE", "edu") or "edu").lower()
async def init_db():
# 连接到MongoDB
......@@ -24,32 +31,135 @@ async def init_db():
print("已清空现有集合")
# 创建用户集合并添加假数据
admin_id = ObjectId()
user_id = ObjectId()
admin_id = ObjectId() # 教育版管理员(用户名 admin)
user_id = ObjectId() # 教育版普通用户(用户名 user)
user_biz_id = ObjectId() # 企业版普通用户(用户名 user_biz)
users = [
# 系统管理员(标准:administrator,兼容:admin 用户名)
{
"_id": admin_id,
"username": "admin",
"email": "admin@example.com",
"hashed_password": pwd_context.hash("admin123"),
"role": "admin",
"role": "administrator", # 统一使用标准角色名,兼容旧数据中的 "admin"
"role_level": 5, # 映射到最高等级
"edition": "edu", # 默认教育版
"created_at": datetime.now(),
"updated_at": datetime.now()
},
# 普通用户(教育版)
{
"_id": user_id,
"username": "user",
"email": "user@example.com",
"hashed_password": pwd_context.hash("user123"),
"role": "user",
"role_level": 1,
"edition": "edu",
"created_at": datetime.now(),
"updated_at": datetime.now()
}
},
# 教育版:班主任、部门负责人、中层与校长
{
"_id": ObjectId(),
"username": "manager_edu",
"email": "manager_edu@example.com",
"hashed_password": pwd_context.hash("manager123"),
"role": "manager",
"role_level": 2,
"edition": "edu",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "leader_edu",
"email": "leader_edu@example.com",
"hashed_password": pwd_context.hash("leader123"),
"role": "leader",
"role_level": 3,
"edition": "edu",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "master_edu",
"email": "master_edu@example.com",
"hashed_password": pwd_context.hash("master123"),
"role": "master",
"role_level": 4,
"edition": "edu",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
# 企业版:员工、组长、负责人、高管与管理员
{
"_id": user_biz_id,
"username": "user_biz",
"email": "user_biz@example.com",
"hashed_password": pwd_context.hash("userbiz123"),
"role": "user",
"role_level": 1,
"edition": "biz",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "manager_biz",
"email": "manager_biz@example.com",
"hashed_password": pwd_context.hash("managerbiz123"),
"role": "manager",
"role_level": 2,
"edition": "biz",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "leader_biz",
"email": "leader_biz@example.com",
"hashed_password": pwd_context.hash("leaderbiz123"),
"role": "leader",
"role_level": 3,
"edition": "biz",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "master_biz",
"email": "master_biz@example.com",
"hashed_password": pwd_context.hash("masterbiz123"),
"role": "master",
"role_level": 4,
"edition": "biz",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
{
"_id": ObjectId(),
"username": "administrator_biz",
"email": "administrator_biz@example.com",
"hashed_password": pwd_context.hash("adminbiz123"),
"role": "administrator",
"role_level": 5,
"edition": "biz",
"created_at": datetime.now(),
"updated_at": datetime.now()
},
]
await db.users.insert_many(users)
print(f"已创建用户集合并添加 {len(users)} 条记录")
# 根据运行模式筛选用户(不混合)
mode = APP_MODE if APP_MODE in {"edu", "biz"} else "edu"
if mode != APP_MODE:
print(f"警告:APP_MODE={APP_MODE} 非法,默认使用 edu")
selected_users = [u for u in users if u["edition"] == mode]
await db.users.insert_many(selected_users)
print(f"已创建用户集合并添加 {len(selected_users)} 条记录(模式:{mode})")
# 创建敏感词集合并添加假数据
sensitive_words = [
......@@ -140,10 +250,13 @@ async def init_db():
# 创建对话集合并添加假数据
conversation_id = ObjectId()
# 根据模式选择示例用户用于演示对话与敏感词记录
sample_user_id = user_id if mode == "edu" else user_biz_id
conversations = [
{
"_id": conversation_id,
"user_id": user_id,
"user_id": sample_user_id,
"messages": [
{
"role": "user",
......@@ -171,8 +284,9 @@ async def init_db():
# 创建敏感词记录集合并添加假数据
sensitive_records = [
{
"user_id": "user123",
"conversation_id": "conv123",
# 使用真实的 ObjectId,避免与模型类型不一致
"user_id": sample_user_id,
"conversation_id": conversation_id,
"message_content": "我想了解一下赌博的事情",
"sensitive_words_found": [
{
......@@ -186,8 +300,9 @@ async def init_db():
"timestamp": datetime.now()
},
{
"user_id": "user123",
"conversation_id": "conv456",
# 第二条记录同样引用真实的 ObjectId
"user_id": sample_user_id,
"conversation_id": conversation_id,
"message_content": "如何获取毒品和色情内容",
"sensitive_words_found": [
{
......@@ -212,9 +327,13 @@ async def init_db():
print(f"已创建敏感词记录集合并添加 {len(sensitive_records)} 条记录")
print("\n数据库初始化完成!")
print("\n测试账号:")
print("管理员账号: admin / admin123")
print("用户账号: user / user123")
print("\n测试账号 (模式: %s):" % mode)
if mode == "edu":
print("教育版管理员: admin / admin123 (role=administrator, edition=edu)")
print("教育版普通用户: user / user123 (role=user, edition=edu)")
else:
print("企业版管理员: administrator_biz / adminbiz123 (role=administrator, edition=biz)")
print("企业版普通用户: user_biz / userbiz123 (role=user, edition=biz)")
if __name__ == "__main__":
asyncio.run(init_db())
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment