互动直播中的前端技术——即时通讯( 二 )

消息收发与服务器或者其他客户端进行消息通讯时通常会基于业务约定协议来封装解析消息 。由于都是异步行为 , 需要有唯一标识来处理消息回调 。这里用自增seq来标记 。
发送消息【互动直播中的前端技术——即时通讯】class Ws extends EventEmitter {seq = 0;cmdTasksMap = {};// ...sendCmd(cmd, params) {return new Promise((resolve, reject) => {this.cmdTasksMap[this.seq] = {resolve,reject};const data = https://www.isolves.com/it/wl/js/2020-06-16/genPacket(cmd, params, this.seq++);this.link.send({ data });});}}接受消息class Ws extends EventEmitter {// ...onMessage(packet) {const data = https://www.isolves.com/it/wl/js/2020-06-16/parsePacket(packet);if (data.seq) {const cmdTask = this.cmdTasksMap[data.seq];if (cmdTask) {if (data.body.code === 200) {cmdTask.resolve(data.body);} else {cmdTask.reject(data.body);}delete this.cmdTasksMap[data.seq];}}}}生产环境中优化上文只介绍了基础功能的简单封装 , 在生产环境中使用 , 还需要对考虑很多因素 , 尤其是在互动直播场景中 , 礼物展示 , 麦序(进行语音通话互动的顺序) , 聊天 , 群聊等都强依赖长链接的稳定性 , 下面就介绍一些兜底与优化措施 。
连接保持为了稳定建立长链接与保持长链接 。采用了以下几个手段:

  • 超时处理
  • 心跳包
  • 重连退避机制
超时处理在实际使用中 , 并不一定每次发送消息都服务端都有响应 , 可能在客户端已经出现异常了 , 我们与服务端的通讯方式都是一问一答 。基于这一点 , 我们可以增加超时逻辑来判断是否是发送成功 。然后基于回调上层进行有友好提示 , 进入异常处理 。接下来就进一步改造发送逻辑 。
class Ws extends EventEmitter {// ...sendCmd(cmd, params) {return new Promise((resolve, reject) => {this.cmdTasksMap[this.seq] = {resolve,reject};// 加个定时器this.timeMap[this.seq] = setTimeout(() => {const err = new newTimeoutError(this.seq);reject({ ...err });}, CMDTIMEOUT);const data = https://www.isolves.com/it/wl/js/2020-06-16/genPacket(cmd, params, this.seq++);this.link.send({ data });});}onMessage(packet) {const data = parsePacket(packet);if (data.seq) {const cmdTask = this.cmdTasksMap[data.seq];if (cmdTask) {clearTimeout(this.timeMap[this.seq]);delete this.timeMap[this.seq];if (data.body.code === 200) {cmdTask.resolve(data.body);} else {cmdTask.reject(data.body);}delete this.cmdTasksMap[data.seq];}}}}心跳包心跳包: 心跳包就是在客户端和服务器间定时通知对方自己状态的一个自己定义的命令字 , 按照一定的时间间隔发送 , 类似于心跳 , 所以叫做心跳包 。
心跳包是检查长链接存活的关键手段 , 在web端我们通过心跳包是否超时来判断 。TCP中已有 keepalive选项  , 为什么要在应用层加入心跳包机制?
  • tcp keepalive检查连接是否存活
  • 应用keepalive检测应用是否正常可响应
举个栗子: 服务端死锁 , 无法处理任何业务请求 。但是操作系统仍然可以响应网络层keepalive包 。所以我们通常使用空内容的心跳包并设定合适的发送频率与超时时间来作为连接的保持的判断 。
如果服务端只认心跳包作为连接存在判断 , 那就在连接建立后定时发心跳就行 。如果以收到包为判断存活 , 那就在每次收到消息重置并起个定时器发送心跳包 。
class Ws extends EventEmitter {// ...onMessage(packet) {const data = https://www.isolves.com/it/wl/js/2020-06-16/parsePacket(packet);if (data.seq) {const cmdTask = this.cmdTasksMap[data.seq];if (cmdTask) {clearTimeout(this.timeMap[this.seq]);if (data.body.code === 200) {cmdTask.resolve(data.body);} else {cmdTask.reject(data.body);}delete this.cmdTasksMap[data.seq];}}this.startHeartBeat();}startHeartBeat() {if (this.heartBeatTimer) {clearTimeout(this.heartBeatTimer);this.heartBeatTimer = null;}this.heartBeatTimer = setTimeout(() => {// 在sendCmd中指定heartbeat类型seq为0 , 让业务包连续编号this.sendCmd('heartbeat').then(() => {// 发送成功了就不管}).catch((e) => {this.heartBeatError(e);});}, HEARTBEATINTERVAL);}}重连退避机制连不上了 , 重连 , 还连不上 , 重连 , 又连不上 , 重连 。重连是一个保活的手段 , 但总不能一直重连吧 , 因此我们要用合理策去重连 。
通常服务端会提供lbs(Location Based Services , LBS)接口 , 来提供最优节点 , 我们端上要做便是缓存这些地址并设定端上的重连退避机制 。按级别次数通常会做以下处理 。


推荐阅读