fix(node-ws): use defineWebSocket helper (#1187)

* fix: use buffering to fix #1129

* chore: changeset

* chore: fmt

* feat(node-ws): use defineWebSocket helper

* changeset
pull/1189/head
Shotaro Nakamura 2025-06-02 21:06:25 +09:00 committed by GitHub
parent 3c70dcd6ae
commit 69a0a586f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 92 additions and 112 deletions

View File

@ -0,0 +1,5 @@
---
'@hono/node-ws': patch
---
use defineWebSocket helper

View File

@ -0,0 +1,5 @@
---
'@hono/node-ws': patch
---
fix a bug if upgrading process uses async function

View File

@ -215,26 +215,6 @@ describe('WebSocket helper', () => {
}) })
}) })
it('Should onError works well', async () => {
const mainPromise = new Promise<unknown>((resolve) =>
app.get(
'/',
upgradeWebSocket(
() => {
throw 0
},
{
onError(err) {
resolve(err)
},
}
)
)
)
const ws = new WebSocket('ws://localhost:3030/')
expect(await mainPromise).toBe(0)
})
describe('Types', () => { describe('Types', () => {
it('Should not throw a type error with an app with Variables generics', () => { it('Should not throw a type error with an app with Variables generics', () => {
const app = new Hono<{ const app = new Hono<{

View File

@ -1,4 +1,5 @@
import type { Hono } from 'hono' import type { Hono } from 'hono'
import { defineWebSocketHelper } from 'hono/ws'
import type { UpgradeWebSocket, WSContext, WSEvents } from 'hono/ws' import type { UpgradeWebSocket, WSContext, WSEvents } from 'hono/ws'
import type { WebSocket } from 'ws' import type { WebSocket } from 'ws'
import { WebSocketServer } from 'ws' import { WebSocketServer } from 'ws'
@ -94,107 +95,96 @@ export const createNodeWebSocket = (init: NodeWebSocketInit): NodeWebSocket => {
}) })
}) })
}, },
upgradeWebSocket: (createEvents, options) => upgradeWebSocket: defineWebSocketHelper(async (c, events, options) => {
async function upgradeWebSocket(c, next) { if (c.req.header('upgrade')?.toLowerCase() !== 'websocket') {
if (c.req.header('upgrade')?.toLowerCase() !== 'websocket') { // Not websocket
// Not websocket return
await next() }
return
const connectionSymbol = generateConnectionSymbol()
c.env[CONNECTION_SYMBOL_KEY] = connectionSymbol
;(async () => {
const ws = await nodeUpgradeWebSocket(c.env.incoming, connectionSymbol)
// buffer messages to handle messages received before the events are set up
const messagesReceivedInStarting: [data: WebSocket.RawData, isBinary: boolean][] = []
const bufferMessage = (data: WebSocket.RawData, isBinary: boolean) => {
messagesReceivedInStarting.push([data, isBinary])
}
ws.on('message', bufferMessage)
const ctx: WSContext<WebSocket> = {
binaryType: 'arraybuffer',
close(code, reason) {
ws.close(code, reason)
},
protocol: ws.protocol,
raw: ws,
get readyState() {
return ws.readyState
},
send(source, opts) {
ws.send(source, {
compress: opts?.compress,
})
},
url: new URL(c.req.url),
}
try {
events?.onOpen?.(new Event('open'), ctx)
} catch (e) {
;(options?.onError ?? console.error)(e)
} }
const connectionSymbol = generateConnectionSymbol() const handleMessage = (data: WebSocket.RawData, isBinary: boolean) => {
c.env[CONNECTION_SYMBOL_KEY] = connectionSymbol const datas = Array.isArray(data) ? data : [data]
;(async () => { for (const data of datas) {
const ws = await nodeUpgradeWebSocket(c.env.incoming, connectionSymbol)
// buffer messages to handle messages received before the events are set up
const messagesReceivedInStarting: [data: WebSocket.RawData, isBinary: boolean][] = []
const bufferMessage = (data: WebSocket.RawData, isBinary: boolean) => {
messagesReceivedInStarting.push([data, isBinary])
}
ws.on('message', bufferMessage)
let events: WSEvents<WebSocket>
try {
events = await createEvents(c)
} catch (e) {
;(options?.onError ?? console.error)(e)
ws.close()
return
}
const ctx: WSContext<WebSocket> = {
binaryType: 'arraybuffer',
close(code, reason) {
ws.close(code, reason)
},
protocol: ws.protocol,
raw: ws,
get readyState() {
return ws.readyState
},
send(source, opts) {
ws.send(source, {
compress: opts?.compress,
})
},
url: new URL(c.req.url),
}
try {
events?.onOpen?.(new Event('open'), ctx)
} catch (e) {
;(options?.onError ?? console.error)(e)
}
const handleMessage = (data: WebSocket.RawData, isBinary: boolean) => {
const datas = Array.isArray(data) ? data : [data]
for (const data of datas) {
try {
events?.onMessage?.(
new MessageEvent('message', {
data: isBinary
? data instanceof ArrayBuffer
? data
: data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength)
: data.toString('utf-8'),
}),
ctx
)
} catch (e) {
;(options?.onError ?? console.error)(e)
}
}
}
ws.off('message', bufferMessage)
for (const message of messagesReceivedInStarting) {
handleMessage(...message)
}
ws.on('message', (data, isBinary) => {
handleMessage(data, isBinary)
})
ws.on('close', (code, reason) => {
try { try {
events?.onClose?.(new CloseEvent('close', { code, reason: reason.toString() }), ctx) events?.onMessage?.(
} catch (e) { new MessageEvent('message', {
;(options?.onError ?? console.error)(e) data: isBinary
} ? data instanceof ArrayBuffer
}) ? data
ws.on('error', (error) => { : data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength)
try { : data.toString('utf-8'),
events?.onError?.(
new ErrorEvent('error', {
error: error,
}), }),
ctx ctx
) )
} catch (e) { } catch (e) {
;(options?.onError ?? console.error)(e) ;(options?.onError ?? console.error)(e)
} }
}) }
})() }
ws.off('message', bufferMessage)
for (const message of messagesReceivedInStarting) {
handleMessage(...message)
}
return new Response() ws.on('message', (data, isBinary) => {
}, handleMessage(data, isBinary)
})
ws.on('close', (code, reason) => {
try {
events?.onClose?.(new CloseEvent('close', { code, reason: reason.toString() }), ctx)
} catch (e) {
;(options?.onError ?? console.error)(e)
}
})
ws.on('error', (error) => {
try {
events?.onError?.(
new ErrorEvent('error', {
error: error,
}),
ctx
)
} catch (e) {
;(options?.onError ?? console.error)(e)
}
})
})()
return new Response()
}),
} }
} }