Mongodb作网站数据库时一种用户token的存储及认证方法

后端采用Flask(python),以及使用pymongo操作Mongodb数据库。下文中代码仅供参考,需按实际情况修改

基本数据类型

由于Mongodb的每条数据(文档)在_id字段缺省时会自动分配一个ObjectId,所以用户id可以由Mongodb自行设定,其次就是username和password,没什么好说的,对于需要鉴权的服务来说,还需要一个token,而每个token又对应一个过期时间,到期之后需要自动删除,避免用户登录一次就可以永久访问带来的风险。剩下的就是自定义数据了,例如用户的个人信息设定等等,这些就不是本文的重点了。以下是一条用户数据范例:

{
    "_id" : ObjectId("65a9e5a7c68f57d023ee***"),
    "username" : "miku39",
    "password" : "$2b$12$aYlU****7NSMQXs.******zvZbw0IecMaXNoie0kmoe/rLj4VZ.Ne",
    "token" : {
        "96d8378b8702a79173f88768fb4c34acede559a1f8d297c3b25b46caa7c*****" : 
            ISODate("2024-02-28T17:57:08.479+0000"),
        ...
    },
    ...
}

用户登陆时自动生成token并插入到文档中

token生成之后还要返回给客户端,客户端需要将token保存在cookies或者localstorage里,这里不细讲

import datetime
import secrets

# user: 一条用户数据 users:存储用户数据的集合
def insert_token(user, users):
    # 生成32位token
    token = secrets.token_hex(32)
    # 设定token过期时间为1天后
    expiration = datetime.datetime.now() + datetime.timedelta(days=1)
    # 插入键值对到'token'列,键为token,值为过期时间 
    user['token'][token] = expiration
    # 更新'token'列
    users.update_one({'username': user['username']}, {'$set': {'token': user['token']}})

用户发起请求时认证token

要求客户端在发起请求时在载荷中添加token信息,没有token或是错误的token将返回403错误,此时客户端视情况决定是否要求用户重新登录以获取新的token

from flask import Flask, request, abort

application = Flask(__name__)

# 在请求处理前调用
@application.before_request
def before():
    url = request.path
    # 排除一些无需认证的情况,例如登录请求
    if url == '/login' or url == '/':
        pass
    else:
        # 获取请求载荷中的token
        token = request.values.get('token')
        if not result:
            # 如果没有token,返回403错误
            abort(403)
        # 此处users替换为数据库中存放用户数据的集合,该语句作用是查询token列中是否含有该值的键
        result = users.find_one({f'token.{token}': {'$exists': True}})
        if not result:
            # 如果没有查询到结果,返回403错误
            abort(403)
        else:
            pass

定时清理过期token

配合crontab使用,设定每分钟运行一次

from datetime import datetime

def clean():
    current = datetime.now()
    # 此处users替换为数据库中存放用户数据的集合,遍历所有用户数据
    for user in users.find({}, {'_id': False}):
        expiredTokens = []
        tokens = user['token']
        # 遍历一个用户的所有token
        for token,expiration in tokens.items():
            # 如果token过期了,将该token加入到过期token的列表中
            if current > expiration:
                expiredTokens.append(token)
        # 遍历过期token的列表,将其从该用户token列中删除
        for token in expiredTokens:
            tokens.pop(token)
        # 更新删除过期token后的用户数据
        users.update_one({'username': user['username']}, {'$set': {'token': tokens}})

发表回复

电子邮件地址不会被公开。必填项已用 * 标注

Index