Skip to content

105.编写消息通知功能(1)

大家好~我是米洛
我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的教程,希望大家多多支持。
欢迎关注我的公众号米洛的测开日记,获取最新文章教程!

回顾

上一节我们编写了用例/目录分屏的功能,属于提升用户体验相关的内容。接下来我们要基于websocket,完成消息中心相关功能。这块博主也没什么经验,如果有不对的地方欢迎大家指正

本模块由于篇幅问题,会分为3~5篇。

需求背景

我们的产品要想做一些东西,必须得有一个背景在里头。比如我为啥要做这个功能,有什么用处。虽然咱们比不上专业的产品,但是也可以在做项目的过程中培养自己这块能力

我个人的想法是,当我们执行case,或者指派一些用例给他人编写,或者未来有一些任务安排的时候,我们可以先指派,接着推送给对方,让对方能知道你的安排。

但可能目前我们最需要的就是,测试任务完成以后的通知功能。所以我们需要调研一下简单的页面。

参考下语雀的页面:

可以看到十分清爽,左侧tab显示消息类型,右侧tab显示已读/未读消息数量和列表。

编写消息表

消息的话,我们需要先定义一下消息表。来看看有哪些字段吧:

from sqlalchemy import SMALLINT, Column, VARCHAR, INT

from app.models.basic import PityBase


class PityNotification(PityBase):
    msg_type = Column(SMALLINT, comment="消息类型 0: 用例执行 1: 任务分配")
    msg_content = Column(VARCHAR(200), comment="消息内容", nullable=True)
    msg_link = Column(VARCHAR(128), comment="消息链接")
    msg_status = Column(SMALLINT, comment="消息状态 0: 未读 1: 已读")
    sender = Column(INT, comment="消息发送人, 0则是CPU 非0则是其他用户")
    receiver = Column(INT, comment="消息接收人, 0为广播消息")

    __tablename__ = "pity_notification"

    def __init__(self, msg_type, msg_content, sender, receiver, user, msg_link=None, msg_status=0):
        super().__init__(user)
        self.msg_type = msg_type
        self.receiver = receiver
        self.msg_content = msg_content
        self.sender = sender
        self.msg_link = msg_link
        self.msg_status = msg_status

定义了一个通知表,里面有几个核心字段:

  • 类型(预留字段)
  • 内容
  • 链接(方便用户跳转,可为空)
  • 状态
  • 发送人
  • 接收人(这里有个广播消息的区分)

编写后端查询/已读功能

一般我们在页面上的操作,也就是查询已读功能,所以对外暴露的接口,暂时只考虑这2块就可以了。

如果不存在的目录和文件,大家就新建,否则就修改。放上路径的原因便是如此。

  • 编写Dao方法(app/crud/notification/NotificationDao.py)
from app.crud import Mapper
from app.models.notification import PityNotification
from app.utils.decorator import dao
from app.utils.logger import Log


@dao(PityNotification, Log("PityNotificationDao"))
class PityNotificationDao(Mapper):
    pass

由于之前编写了一些公有方法,所以现在很方便,我们不再需要写很重复的代码了。

  • 编写接口

在此之前,我们需要为app/crud/init.py/Mapper类新增一个update方法:

    @classmethod
    async def update_by_map(cls, *condition, **kwargs):
        try:
            async with async_session() as session:
                async with session.begin():
                    sql = update(cls.model).where(*condition).values(**kwargs, updated_at=datetime.now())
                    await session.execute(sql)
        except Exception as e:
            cls.log.error(f"更新数据失败: {e}")
            raise Exception("更新数据失败")

以往我们编写的update,都是先查询数据,如果没有数据则返回异常,如果有则更新数据。

这样做的缺点是多查询了一次数据库,优点是能够防止意外数据

上述的方法,是根据condition(查询条件)及kwargs(变更字典)更新。调用的也都是sqlalchemy基本的方法。

后续有空我也会出一个sqlalchemy crud相关demo,包括join(只能说下次一定)。

接着我们照常编写crud接口就可以了。

新建app/routers/notification/message.py文件(也别忘了在main.py引入这个router 代码大家可以参考之前的routers目录)

from typing import List

from fastapi import APIRouter, Depends

from app.crud.notification.NotificationDao import PityNotificationDao
from app.handler.fatcory import PityResponse
from app.models.notification import PityNotification
from app.routers import Permission

router = APIRouter(prefix="/notification")


@router.get("/list", description="获取用户消息列表")
async def list_msg(msg_status: int = 0, msg_type: int = 0, user_info=Depends(Permission())):
    try:
        data = await PityNotificationDao.list_record(msg_type=msg_type, msg_status=msg_status,
                                                     receiver=user_info['id'])
        return PityResponse.success(data)
    except Exception as e:
        return PityResponse.failed(str(e))


@router.post("/read", description="用户读取消息")
async def read_msg(msg_id: List[int], user_info=Depends(Permission())):
    try:
        await PityNotificationDao.update_by_map(PityNotification.id.in_(msg_id),
                                                PityNotification.receiver == user_info['id'], msg_status=1)
        return PityResponse.success()
    except Exception as e:
        return PityResponse.failed(str(e))

这边要注意的地方是已读消息的接口,接受的是一个msg_id列表,因为我们需要一个一键已读的功能,这样会比较方便。其实也就是一个批量和单个的接口,为了省事,我们使用一个即可。

今天的内容就介绍到这里了,但其实这样还有个大问题: 对于广播消息,这个查询和更新都是操作不到的。

这个问题就要留到我们下一节来处理了。