avatar

封装websocket

封装 websocket

创建websocket类

你是一个软件研发助理,你可以根据我提供的需求描述为我生成高质量的代码.代码风格和性能优秀,达到工程线上使用要求.请使用 typescript 语言,帮助封装一个 websocket 类,这个类需要有心跳检测,并支持断开重连功能,还需要对外开放消息监听事件,支持添加多个监听函数,移除监听函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
嗯,用户让我用TypeScript封装一个WebSocket类,需要有心跳检测、断开重连,还要支持消息监听和多个监听函数的添加移除。首先,我得理清楚这些功能点该怎么实现。

首先,心跳检测。通常的做法是定时发送一个ping消息,如果对方没有响应pong,就判断为断开。所以需要两个定时器,一个用于定期发送心跳,另一个检测是否超时。如果超时了,就触发重连。

然后是断开重连。当连接关闭或出错时,应该尝试重新连接。这里需要设置一个重连的间隔,可能还要有最大重试次数,避免无限重连。重连的时候可能需要指数退避算法,但用户没特别说明,先简单用固定间隔吧。

消息监听部分,需要维护一个事件监听器列表,类似EventEmitter的模式。用户可能希望添加多个监听函数,所以用Map来存储不同事件类型的回调数组。比如,'message'事件对应一组处理函数,当收到消息时,依次调用这些函数。

接下来是类的结构。构造函数需要传入URL,可能还有一些配置选项,比如重连次数、间隔、心跳间隔等。类内部应该有状态管理,比如是否正在重连,当前重试次数等。

然后考虑WebSocket的事件处理。需要监听onopen、onclose、onerror、onmessage这些事件。特别是在onclose的时候,如果不是正常关闭,就触发重连。onmessage的时候,除了处理业务消息,还要处理心跳的pong,重置心跳检测的定时器。

心跳的具体实现,在连接成功后启动心跳定时器,定期发送ping。同时,启动一个检测pong响应的定时器,如果一定时间内没收到pong,就关闭连接,触发重连。收到pong时,重置检测定时器。

重连逻辑,应该在连接关闭后,判断是否需要重连,然后使用setTimeout延迟执行重连。重连次数需要累加,超过最大次数则停止。

事件监听部分,提供on和off方法来添加和移除监听器。使用Map结构,每个事件类型对应一个数组,添加时push,移除时过滤掉对应的函数。触发事件时,遍历数组调用回调。

还需要考虑资源的释放,比如在关闭连接时,清除所有定时器,避免内存泄漏。

测试了一下,可能还需要处理不同的消息格式,比如如果服务端发送的是Blob或ArrayBuffer,可能需要做不同的处理,但用户没特别说明,假设是文本消息。

最后,导出这个类,方便用户实例化使用。示例代码中展示了如何发送消息、添加监听器,这样用户能清楚如何使用。

可能还需要处理错误情况,比如发送消息时连接未打开,这时候应该缓存或者报错。但用户的需求中没有提到,暂时在send方法里检查readyState,如果未打开则抛错。

总之,按照这些步骤,逐步实现各个功能点,确保代码结构清晰,符合工程实践,性能良好。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
type EventType = 'open' | 'close' | 'error' | 'message' | 'reconnect'
type Listener = (data: any) => void

interface WebSocketConfig {
reconnectDelay?: number
maxReconnectAttempts?: number
heartSendRate?: number
heartTimeout?: number
}

export class WebSocketClient {
private ws: WebSocket | null = null
private url: string = ''
private listeners = new Map<EventType, Listener[]>()
private reconnectAttempts = 0
private isManualClose = false
private config: WebSocketConfig = {}
private heartBeatTimer?: NodeJS.Timeout | null
private pongTimer?: NodeJS.Timeout | null
private reconnectTimer?: NodeJS.Timeout | null

constructor(private url: string, private config: WebSocketConfig = {}) {
this.url = url
this.setDefaultConfig(config)
this.connect()
}

private setDefaultConfig(config: WebSocketConfig) {
this.config = {
reconnectInterval: 3000,
maxReconnectAttempts: 5,
heartBeatInterval: 10000,
heartTimeout: 3000,
...config,
}
}

private connect() {
this.ws = new WebSocket(this.url)
this.setupEventHandlers()
}

private setupEventHandlers() {
if (!this.ws) return

this.ws.onopen = () => {
this.reconnectAttempts = 0
this.startHeartBeat()
this.emit('open')
}

this.ws.onmessage = (event) => {
if (event.data === 'pong') {
this.resetPongTimeout()
return
}
this.emit('message', event.data)
}

this.ws.onclose = (event) => {
this.cleanupTimers()
if (!this.isManualClose && this.shouldReconnect()) {
this.reconnect()
}
this.emit('close', event)
}

this.ws.onerror = (error) => {
this.emit('error', error)
this.reconnect()
}
}

// 心跳检测相关逻辑
private startHeartBeat() {
this.heartBeatTimer = setInterval(() => {
this.send('ping')
this.resetPongTimeout()
}, this.config.heartSendRate)
}

private resetPongTimeout() {
clearTimeout(this.pongTimer!)
this.pongTimer = setTimeout(() => {
this.ws?.close()
}, this.config.heartTimeout)
}

// 重连机制
private shouldReconnect() {
return this.reconnectAttempts < this.config.maxReconnectAttempts!
}

private reconnect() {
if (this.shouldReconnect()) {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
} else {
this.reconnectAttempts++
}
this.reconnectTimer = setTimeout(() => {
this.emit('reconnect')
this.connect()
}, this.config.reconnectDelay)
}
}

// 公共方法
public send(data: string | ArrayBuffer | Blob) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(data)
}
}

public close() {
this.isManualClose = true
this.ws?.close()
}

// 事件管理
public on(event: EventType, listener: Listener) {
const listeners = this.listeners.get(event) || []
listeners.push(listener)
this.listeners.set(event, listeners)
}

public off(event: EventType, listener: Listener) {
const listeners = this.listeners.get(event) || []
this.listeners.set(
event,
listeners.filter((l) => l !== listener)
)
}

private emit(event: EventType, data?: any) {
const listeners = this.listeners.get(event) || []
listeners.forEach((listener) => listener(data))
}

private cleanupTimers() {
clearInterval(this.heartBeatTimer!)
clearTimeout(this.pongTimer!)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建实例
const ws = new WebSocketClient('wss://api.example.com/ws', {
heartBeatInterval: 15000,
maxReconnectAttempts: 10,
})

// 添加事件监听
ws.on('message', (data) => {
console.log('Received:', data)
})

// 发送消息
ws.send(JSON.stringify({ type: 'hello' }))

// 关闭连接
ws.close()

封装组合式函数

请将上述类封装成 useWebsocket 组合式函数,应用到 Vue3 的项目中,改函数可以返回 websocket 单例,以便全局使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
好的,用户希望将之前封装的WebSocketClient类转换为Vue3的组合式函数useWebSocket,并且返回单例以便全局使用。首先,我需要回忆一下Vue3的组合式函数的特点,通常使用ref、reactive等响应式API,以及如何在组合式函数中管理状态和生命周期。

首先,单例模式是关键。在Vue3中,通常可以通过模块级的变量来保存实例,确保多次调用useWebSocket时返回同一个实例。所以需要在函数外部定义一个实例变量,比如instance,然后在函数内判断是否已经存在,存在则直接返回。

接下来,需要考虑Vue的生命周期。组合式函数通常在setup()中使用,所以需要确保WebSocket连接在组件卸载时正确关闭。不过用户可能希望单例在整个应用生命周期中存在,所以是否需要自动关闭呢?可能需要根据用户需求决定。但用户没有特别说明,可能希望手动控制,所以暂时不在组合式函数中自动关闭,但提供手动关闭的方法。

然后,将原有的WebSocketClient类集成到组合式函数中。组合式函数应该返回一些响应式状态,比如连接状态、错误信息等,以及方法如send、close,还有事件监听的方法。不过原类已经提供了on和off方法,可能需要将其暴露出来。

但是,Vue的组合式函数更倾向于使用响应式引用和事件钩子。或者,可以保持原有的on和off方法,让用户手动管理事件监听。或者,可以将其转换为Vue的provide/inject,但用户要求返回单例,可能更直接的方式是导出实例的方法。

另外,需要考虑在Vue的服务端渲染(SSR)环境下,WebSocket可能无法运行,所以需要确保只在客户端执行。可以通过判断typeof window是否存在来避免服务端报错。

步骤分解:

1. 创建组合式函数useWebSocket,接受配置参数。
2. 使用模块级变量保存单例实例。
3. 在函数内部,如果实例不存在,则创建WebSocketClient实例。
4. 暴露实例的方法,如send、close、on、off,以及可能的响应式状态。
5. 添加错误处理和响应式状态,比如连接状态、重连次数等,方便Vue组件使用。
6. 处理Vue组件的生命周期,比如在onUnmounted时自动关闭,但因为是单例,可能不需要,除非用户明确要求。

另外,可能需要将事件监听转换为Vue的响应式系统,比如使用ref来存储最新消息,或者提供一个事件总线。但根据用户的需求,可能更简单的方式是保持原有的on/off方法,让用户在组件中使用onMounted来添加监听器,并在onUnmounted时移除,避免内存泄漏。

现在,具体代码实现:

在组合式函数中,创建单例实例,并返回相关方法和状态。比如:

const instance = new WebSocketClient(url, config);

然后返回:
return {
send: instance.send.bind(instance),
close: instance.close.bind(instance),
on: instance.on.bind(instance),
off: instance.off.bind(instance),
// 可能的状态
isConnected: ref(false),
// 需要在实例中暴露状态,或者通过事件更新
};

但原WebSocketClient类没有暴露连接状态,所以可能需要修改类,添加状态跟踪,比如使用一个内部的reactive对象,或者在组合式函数中监听'open'和'close'事件来更新响应式变量。

比如,在组合式函数中:

const isConnected = ref(false);
instance.on('open', () => isConnected.value = true);
instance.on('close', () => isConnected.value = false);

这样,用户就可以通过isConnected来获取当前连接状态。

另外,可能需要处理错误状态的响应式引用,比如error:

const error = ref(null);
instance.on('error', (err) => error.value = err);

这样,组合式函数返回这些响应式变量,方便组件使用。

总结代码结构:

- 创建单例实例
- 设置响应式状态
- 监听实例的事件,更新响应式状态
- 返回实例方法和响应式状态

最后,在示例中使用时,用户可以在组件中调用useWebSocket()获取单例,并通过on和off来添加/移除监听器,或者直接使用返回的响应式状态。

现在,检查原有WebSocketClient类的事件类型,包括'open'、'close'、'error'、'message',这些事件需要在组合式函数中被监听,并更新对应的响应式状态。

最后,确保在Vue客户端环境下执行,避免SSR问题,比如使用typeof window检查。

现在,编写代码:

创建src/hooks/useWebSocket.ts文件,导入WebSocketClient,定义单例实例,返回组合式函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import { ref, shallowRef } from 'vue'
import { type WebSocketConfig, WebSocketClient } from '../utils/WebSocketClient'

type WebSocketStatus = 'connecting' | 'open' | 'closed' | 'error'

export function useWebSocket(url: string, config?: WebSocketConfig) {
const wsInstance = shallowRef<WebSocketClient>()
const status = ref<WebSocketStatus>('connecting')
const lastMessage = ref<any>(null)
const error = ref<Event | null>(null)

// 单例模式实现
if (!wsInstance.value) {
const createWebSocket = () => {
const client = new WebSocketClient(url, config)

// 状态同步
client.on('open', () => (status.value = 'open'))
client.on('close', () => (status.value = 'closed'))
client.on('error', (err) => {
error.value = err
status.value = 'error'
})
client.on('message', (msg) => {
lastMessage.value = msg
})

return client
}

// 客户端环境判断
if (typeof window !== 'undefined') {
wsInstance.value = createWebSocket()
}
}

// 自动重连时重置状态
wsInstance.value?.on('reconnect', () => {
status.value = 'connecting'
})

return {
/** WebSocket连接状态 */
status,
/** 最后收到的消息内容 */
lastMessage,
/** 错误对象 */
error,

/**
* 发送消息
* @param data 支持字符串/ArrayBuffer/Blob类型
*/
send: (data: string | ArrayBuffer | Blob) => {
wsInstance.value?.send(data)
},

/**
* 手动关闭连接
*/
close: () => {
wsInstance.value?.close()
status.value = 'closed'
},

/**
* 添加事件监听
* @param event 事件类型
* @param listener 监听函数
*/
on: (event: EventType, listener: Listener) => {
wsInstance.value?.on(event, listener)
},

/**
* 移除事件监听
* @param event 事件类型
* @param listener 监听函数
*/
off: (event: EventType, listener: Listener) => {
wsInstance.value?.off(event, listener)
},
}
}

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<script setup>
import { useWebSocket } from '../hooks/useWebSocket'

const {
status,
lastMessage,
send,
on,
off
} = useWebSocket('ws://127.0.0.1:8080', {
heartSendRate: 3000,
maxReconnectAttempts: 10,
})

on('open', () => {
console.log('WebSocket connection established')
setInterval(() => {
send('hello')
}, 10000)
})

on('close', () => {
console.log('WebSocket connection closed')
})

on('error', (error) => {
console.error('WebSocket error:', error)
})

on('message', (data) => {
console.log('Received:', data)
})

const listener = (data) => {
console.log('Received2:', data)
}

on('message', listener)

setTimeout(() => {
off('message', listener)
}, 15000)
</script>

<template>
<div>
<p>连接状态: {{ status }}</p>
<p>最新消息: {{ lastMessage }}</p>
</div>
</template>

服务端示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const WebSocket = require('ws')

// 创建一个 WebSocket 服务器,监听 8080 端口
const wss = new WebSocket.Server({ port: 8080 })

// 当有新的客户端连接时触发
wss.on('connection', (ws) => {
console.log('有新的客户端连接')

// 当接收到客户端发送的消息时触发
ws.on('message', (message) => {
console.log(`接收到客户端消息: ${message}`)
if (message == 'ping') {
ws.send('pong')
} else {
// 向客户端发送消息
ws.send(`服务器已收到消息: ${message}`)
}
})

// 当客户端连接关闭时触发
ws.on('close', () => {
console.log('客户端连接已关闭')
})
})

console.log('WebSocket 服务器正在监听 8080 端口')

文章作者: pengweifu
文章链接: https://www.pengwf.com/2025/02/27/web/JS-websocket/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 麦子的博客
打赏
  • 微信
    微信
  • 支付宝
    支付宝

评论