fix(node-ws): Bugfix/603 hono node ws duplicate event listeners (#605)
* add multi connection test case * fix node incoming env * fix spel miss * Simple bug fixes * Improve performance * chore add changesetpull/617/head
parent
f61e748dd2
commit
967fd48d5b
|
@ -0,0 +1,5 @@
|
||||||
|
---
|
||||||
|
'@hono/node-ws': patch
|
||||||
|
---
|
||||||
|
|
||||||
|
Fixed bug with multiple connections in node-ws
|
|
@ -5,35 +5,105 @@ import { WebSocket } from 'ws'
|
||||||
import { createNodeWebSocket } from '.'
|
import { createNodeWebSocket } from '.'
|
||||||
|
|
||||||
describe('WebSocket helper', () => {
|
describe('WebSocket helper', () => {
|
||||||
const app = new Hono()
|
let app: Hono
|
||||||
const { injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app })
|
let server: ServerType
|
||||||
|
let injectWebSocket: ReturnType<typeof createNodeWebSocket>['injectWebSocket']
|
||||||
|
let upgradeWebSocket: ReturnType<typeof createNodeWebSocket>['upgradeWebSocket']
|
||||||
|
|
||||||
const mainPromise = new Promise((resolve) =>
|
beforeEach(async () => {
|
||||||
app.get(
|
app = new Hono()
|
||||||
'/',
|
;({ injectWebSocket, upgradeWebSocket } = createNodeWebSocket({ app }))
|
||||||
upgradeWebSocket(() => ({
|
|
||||||
onOpen() {
|
|
||||||
resolve(true)
|
|
||||||
},
|
|
||||||
}))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
it('Should be able to connect', async () => {
|
server = await new Promise<ServerType>((resolve) => {
|
||||||
const server = await new Promise<ServerType>((resolve) => {
|
const server = serve({ fetch: app.fetch, port: 3030 }, () => resolve(server))
|
||||||
const server = serve(
|
|
||||||
{
|
|
||||||
fetch: app.fetch,
|
|
||||||
port: 3030,
|
|
||||||
},
|
|
||||||
() => {
|
|
||||||
resolve(server)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
injectWebSocket(server)
|
injectWebSocket(server)
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
server.close()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should be able to connect', async () => {
|
||||||
|
const mainPromise = new Promise<boolean>((resolve) =>
|
||||||
|
app.get(
|
||||||
|
'/',
|
||||||
|
upgradeWebSocket(() => ({
|
||||||
|
onOpen() {
|
||||||
|
resolve(true)
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
new WebSocket('ws://localhost:3030/')
|
new WebSocket('ws://localhost:3030/')
|
||||||
|
|
||||||
expect(await mainPromise).toBe(true)
|
expect(await mainPromise).toBe(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('Should be able to send and receive messages', async () => {
|
||||||
|
const mainPromise = new Promise((resolve) =>
|
||||||
|
app.get(
|
||||||
|
'/',
|
||||||
|
upgradeWebSocket(() => ({
|
||||||
|
onMessage(data) {
|
||||||
|
resolve(data.data)
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
const ws = new WebSocket('ws://localhost:3030/')
|
||||||
|
await new Promise<void>((resolve) => ws.on('open', resolve))
|
||||||
|
ws.send('Hello')
|
||||||
|
|
||||||
|
expect(await mainPromise).toBe('Hello')
|
||||||
|
})
|
||||||
|
|
||||||
|
it('Should handle multiple concurrent connections', async () => {
|
||||||
|
const connectionCount = 5
|
||||||
|
let openConnections = 0
|
||||||
|
const messages: string[] = []
|
||||||
|
|
||||||
|
app.get(
|
||||||
|
'/',
|
||||||
|
upgradeWebSocket(() => ({
|
||||||
|
onOpen() {
|
||||||
|
openConnections++
|
||||||
|
},
|
||||||
|
onMessage(data, ws) {
|
||||||
|
messages.push(data.data as string)
|
||||||
|
ws.send(data.data as string)
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
)
|
||||||
|
|
||||||
|
const connections = await Promise.all(
|
||||||
|
Array(connectionCount)
|
||||||
|
.fill(null)
|
||||||
|
.map(async () => {
|
||||||
|
const ws = new WebSocket('ws://localhost:3030/')
|
||||||
|
await new Promise<void>((resolve) => ws.on('open', resolve))
|
||||||
|
return ws
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(openConnections).toBe(connectionCount)
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
connections.map((ws, index) => {
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
ws.send(`Hello from connection ${index + 1}`)
|
||||||
|
ws.on('message', () => resolve())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(messages.length).toBe(connectionCount)
|
||||||
|
messages.forEach((msg, index) => {
|
||||||
|
expect(msg).toBe(`Hello from connection ${index + 1}`)
|
||||||
|
})
|
||||||
|
|
||||||
|
connections.forEach((ws) => ws.close())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -3,7 +3,9 @@ import type { Server } from 'node:http'
|
||||||
import type { Http2SecureServer, Http2Server } from 'node:http2'
|
import type { Http2SecureServer, Http2Server } from 'node:http2'
|
||||||
import type { Hono } from 'hono'
|
import type { Hono } from 'hono'
|
||||||
import type { UpgradeWebSocket, WSContext } from 'hono/ws'
|
import type { UpgradeWebSocket, WSContext } from 'hono/ws'
|
||||||
|
import type { WebSocket } from 'ws'
|
||||||
import { WebSocketServer } from 'ws'
|
import { WebSocketServer } from 'ws'
|
||||||
|
import type { IncomingMessage } from 'http'
|
||||||
|
|
||||||
export interface NodeWebSocket {
|
export interface NodeWebSocket {
|
||||||
upgradeWebSocket: UpgradeWebSocket
|
upgradeWebSocket: UpgradeWebSocket
|
||||||
|
@ -20,7 +22,22 @@ export interface NodeWebSocketInit {
|
||||||
* @returns NodeWebSocket
|
* @returns NodeWebSocket
|
||||||
*/
|
*/
|
||||||
export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
|
export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
|
||||||
const wss = new WebSocketServer({noServer: true})
|
const wss = new WebSocketServer({ noServer: true })
|
||||||
|
const waiter = new Map<IncomingMessage, (ws: WebSocket) => void>()
|
||||||
|
|
||||||
|
wss.on('connection', (ws, request) => {
|
||||||
|
const waiterFn = waiter.get(request)
|
||||||
|
if (waiterFn) {
|
||||||
|
waiterFn(ws)
|
||||||
|
waiter.delete(request)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const nodeUpgradeWebSocket = (request: IncomingMessage) => {
|
||||||
|
return new Promise<WebSocket>((resolve) => {
|
||||||
|
waiter.set(request, resolve)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
injectWebSocket(server) {
|
injectWebSocket(server) {
|
||||||
|
@ -34,9 +51,11 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
|
||||||
}
|
}
|
||||||
headers.append(key, Array.isArray(value) ? value[0] : value)
|
headers.append(key, Array.isArray(value) ? value[0] : value)
|
||||||
}
|
}
|
||||||
await init.app.request(url, {
|
await init.app.request(
|
||||||
headers: headers,
|
url,
|
||||||
})
|
{ headers: headers },
|
||||||
|
{ incoming: request, outgoing: undefined }
|
||||||
|
)
|
||||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||||
wss.emit('connection', ws, request)
|
wss.emit('connection', ws, request)
|
||||||
})
|
})
|
||||||
|
@ -49,8 +68,11 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
|
||||||
await next()
|
await next()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
const events = await createEvents(c)
|
|
||||||
wss.on('connection', (ws) => {
|
;(async () => {
|
||||||
|
const events = await createEvents(c)
|
||||||
|
const ws = await nodeUpgradeWebSocket(c.env.incoming)
|
||||||
|
|
||||||
const ctx: WSContext = {
|
const ctx: WSContext = {
|
||||||
binaryType: 'arraybuffer',
|
binaryType: 'arraybuffer',
|
||||||
close(code, reason) {
|
close(code, reason) {
|
||||||
|
@ -92,7 +114,7 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
|
||||||
ctx
|
ctx
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
})
|
})()
|
||||||
|
|
||||||
return new Response()
|
return new Response()
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue