import {spawn} from 'node:child_process' import {createHash} from 'node:crypto' import {existsSync, readFileSync} from 'node:fs' import http from 'node:http' import {resolve} from 'node:path' loadDotEnv() const HOST = process.env.WS_HOST || '0.0.0.0' const PORT = Number(process.env.WS_PORT || 8080) const WS_PATH = process.env.WS_PATH || '/video' const FFMPEG_PATH = process.env.FFMPEG_PATH || 'ffmpeg' const RTSP_URL = process.env.RTSP_URL || buildCameraRtspUrl() const RTSP_TRANSPORT = process.env.RTSP_TRANSPORT || 'tcp' const VIDEO_TRANSCODE = process.env.VIDEO_TRANSCODE === '1' function loadDotEnv() { const envPath = resolve(process.cwd(), '.env') if (!existsSync(envPath)) { return } const lines = readFileSync(envPath, 'utf8').split(/\r?\n/) for (const line of lines) { const trimmed = line.trim() if (!trimmed || trimmed.startsWith('#')) { continue } const separatorIndex = trimmed.indexOf('=') if (separatorIndex === -1) { continue } const key = trimmed.slice(0, separatorIndex).trim() let value = trimmed.slice(separatorIndex + 1).trim() if ( (value.startsWith('"') && value.endsWith('"')) || (value.startsWith("'") && value.endsWith("'")) ) { value = value.slice(1, -1) } if (!process.env[key]) { process.env[key] = value } } } function buildCameraRtspUrl() { const host = process.env.CAMERA_HOST const port = process.env.CAMERA_PORT || '554' const username = process.env.CAMERA_USER const password = process.env.CAMERA_PASSWORD const path = process.env.CAMERA_RTSP_PATH || '/Stream/Live/101' if (!host || !username || !password) { return '' } return `rtsp://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}${path}` } function createFfmpegArgs() { const inputArgs = [ '-hide_banner', '-loglevel', 'warning', '-rtsp_transport', RTSP_TRANSPORT, '-i', RTSP_URL, ] const videoArgs = VIDEO_TRANSCODE ? [ '-c:v', 'libx264', '-preset', 'veryfast', '-tune', 'zerolatency', '-profile:v', 'baseline', '-level', '3.1', '-pix_fmt', 'yuv420p', '-g', '25', ] : [ '-c:v', 'copy' ] return [ ...inputArgs, ...videoArgs, '-an', '-f', 'mp4', '-movflags', 'frag_keyframe+empty_moov+default_base_moof', '-reset_timestamps', '1', 'pipe:1', ] } function sendWsFrame(socket, payload, opcode = 0x2) { if (socket.destroyed || !socket.writable) { return } const body = Buffer.isBuffer(payload) ? payload : Buffer.from(payload) const length = body.length let header if (length < 126) { header = Buffer.from([ 0x80 | opcode, length ]) } else if (length <= 0xffff) { header = Buffer.allocUnsafe(4) header[0] = 0x80 | opcode header[1] = 126 header.writeUInt16BE(length, 2) } else { header = Buffer.allocUnsafe(10) header[0] = 0x80 | opcode header[1] = 127 header.writeBigUInt64BE(BigInt(length), 2) } socket.write(Buffer.concat([ header, body ])) } function sendText(socket, payload) { sendWsFrame(socket, JSON.stringify(payload), 0x1) } function closeSocket(socket, code = 1000, reason = '') { const reasonBuffer = Buffer.from(reason) const payload = Buffer.allocUnsafe(2 + reasonBuffer.length) payload.writeUInt16BE(code, 0) reasonBuffer.copy(payload, 2) sendWsFrame(socket, payload, 0x8) socket.end() } function readClientFrames(buffer, onMessage) { let offset = 0 while (offset + 2 <= buffer.length) { const firstByte = buffer[offset] const secondByte = buffer[offset + 1] const opcode = firstByte & 0x0f const masked = (secondByte & 0x80) !== 0 let length = secondByte & 0x7f let headerLength = 2 if (length === 126) { if (offset + 4 > buffer.length) break length = buffer.readUInt16BE(offset + 2) headerLength = 4 } else if (length === 127) { if (offset + 10 > buffer.length) break const bigLength = buffer.readBigUInt64BE(offset + 2) if (bigLength > BigInt(Number.MAX_SAFE_INTEGER)) { throw new Error('WebSocket frame is too large') } length = Number(bigLength) headerLength = 10 } const maskLength = masked ? 4 : 0 const frameEnd = offset + headerLength + maskLength + length if (frameEnd > buffer.length) break let payload = buffer.subarray(offset + headerLength + maskLength, frameEnd) if (masked) { const mask = buffer.subarray(offset + headerLength, offset + headerLength + 4) payload = Buffer.from(payload.map((byte, index) => byte ^ mask[index % 4])) } onMessage(opcode, payload) offset = frameEnd } return buffer.subarray(offset) } function handleControlMessage(payload) { try { const message = JSON.parse(payload.toString('utf8')) if (message.type === 'control') { console.log(`[control] action=${message.action} sentAt=${message.sentAt || ''}`) } } catch { console.warn('[ws] ignored non-json text message') } } function startClientStream(socket) { if (!RTSP_URL) { sendText(socket, { type: 'error', message: 'Missing RTSP_URL. Set RTSP_URL or HIKVISION_HOST/HIKVISION_USER/HIKVISION_PASSWORD.', }) closeSocket(socket, 1011, 'missing rtsp url') return } const ffmpeg = spawn(FFMPEG_PATH, createFfmpegArgs(), { stdio: [ 'ignore', 'pipe', 'pipe' ], }) let pendingFrame = Buffer.alloc(0) console.log(`[ffmpeg] started pid=${ffmpeg.pid} transcode=${VIDEO_TRANSCODE ? 'yes' : 'no'}`) ffmpeg.stdout.on('data', (chunk) => { if (!socket.writable || socket.destroyed) { return } if (socket.writableLength > 8 * 1024 * 1024) { pendingFrame = Buffer.concat([ pendingFrame, chunk ]) return } if (pendingFrame.length > 0) { chunk = Buffer.concat([ pendingFrame, chunk ]) pendingFrame = Buffer.alloc(0) } sendWsFrame(socket, chunk) }) ffmpeg.stderr.on('data', (chunk) => { console.error(`[ffmpeg] ${chunk.toString().trim()}`) }) ffmpeg.on('error', (error) => { sendText(socket, { type: 'error', message: `Failed to start ffmpeg: ${error.message}`, }) closeSocket(socket, 1011, 'ffmpeg start failed') }) ffmpeg.on('close', (code, signal) => { console.log(`[ffmpeg] stopped code=${code} signal=${signal || ''}`) if (!socket.destroyed) { closeSocket(socket, 1011, 'ffmpeg stopped') } }) socket.on('close', () => { ffmpeg.kill('SIGTERM') }) socket.on('end', () => { ffmpeg.kill('SIGTERM') }) socket.on('error', () => { ffmpeg.kill('SIGTERM') }) } function handleUpgrade(request, socket) { if (request.url !== WS_PATH) { socket.write('HTTP/1.1 404 Not Found\r\n\r\n') socket.destroy() return } const key = request.headers['sec-websocket-key'] if (!key) { socket.write('HTTP/1.1 400 Bad Request\r\n\r\n') socket.destroy() return } const accept = createHash('sha1') .update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`) .digest('base64') socket.write( [ 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', `Sec-WebSocket-Accept: ${accept}`, '\r\n', ].join('\r\n'), ) let frameBuffer = Buffer.alloc(0) socket.on('data', (chunk) => { try { frameBuffer = readClientFrames(Buffer.concat([ frameBuffer, chunk ]), (opcode, payload) => { if (opcode === 0x1) { handleControlMessage(payload) } else if (opcode === 0x8) { socket.end() } else if (opcode === 0x9) { sendWsFrame(socket, payload, 0xA) } }) } catch (error) { console.error(`[ws] ${error.message}`) closeSocket(socket, 1002, 'bad websocket frame') } }) sendText(socket, { type: 'ready', path: WS_PATH, transcode: VIDEO_TRANSCODE, }) startClientStream(socket) } const server = http.createServer((request, response) => { if (request.url === '/health') { response.writeHead(200, {'content-type': 'application/json; charset=utf-8'}) response.end( JSON.stringify({ ok: true, wsPath: WS_PATH, rtspConfigured: Boolean(RTSP_URL), transcode: VIDEO_TRANSCODE, }), ) return } response.writeHead(404, {'content-type': 'text/plain; charset=utf-8'}) response.end('Not Found') }) server.on('upgrade', handleUpgrade) server.on('error', (error) => { console.error(`[server] failed to listen on ${HOST}:${PORT}: ${error.message}`) process.exitCode = 1 }) server.listen(PORT, HOST, () => { console.log(`[server] ws://${HOST}:${PORT}${WS_PATH}`) console.log(`[server] health http://${HOST}:${PORT}/health`) console.log(`[server] rtsp ${RTSP_URL ? 'configured' : 'not configured'}`) })