You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
8.3 KiB
JavaScript

import {spawn} from 'node:child_process'
import {createHash} from 'node:crypto'
import {existsSync, readFileSync} from 'node:fs'
import http from 'node:http'
import {resolve} from 'node:path'
loadDotEnv()
const HOST = process.env.WS_HOST || '0.0.0.0'
const PORT = Number(process.env.WS_PORT || 8080)
const WS_PATH = process.env.WS_PATH || '/video'
const FFMPEG_PATH = process.env.FFMPEG_PATH || 'ffmpeg'
const RTSP_URL = process.env.RTSP_URL || buildCameraRtspUrl()
const RTSP_TRANSPORT = process.env.RTSP_TRANSPORT || 'tcp'
const VIDEO_TRANSCODE = process.env.VIDEO_TRANSCODE === '1'
function loadDotEnv() {
const envPath = resolve(process.cwd(), '.env')
if (!existsSync(envPath)) {
return
}
const lines = readFileSync(envPath, 'utf8').split(/\r?\n/)
for (const line of lines) {
const trimmed = line.trim()
if (!trimmed || trimmed.startsWith('#')) {
continue
}
const separatorIndex = trimmed.indexOf('=')
if (separatorIndex === -1) {
continue
}
const key = trimmed.slice(0, separatorIndex).trim()
let value = trimmed.slice(separatorIndex + 1).trim()
if (
(value.startsWith('"') && value.endsWith('"')) ||
(value.startsWith("'") && value.endsWith("'"))
) {
value = value.slice(1, -1)
}
if (!process.env[key]) {
process.env[key] = value
}
}
}
function buildCameraRtspUrl() {
const host = process.env.CAMERA_HOST
const port = process.env.CAMERA_PORT || '554'
const username = process.env.CAMERA_USER
const password = process.env.CAMERA_PASSWORD
const path = process.env.CAMERA_RTSP_PATH || '/Stream/Live/101'
if (!host || !username || !password) {
return ''
}
return `rtsp://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}${path}`
}
function createFfmpegArgs() {
const inputArgs = [
'-hide_banner',
'-loglevel',
'warning',
'-rtsp_transport',
RTSP_TRANSPORT,
'-i',
RTSP_URL,
]
const videoArgs = VIDEO_TRANSCODE
? [
'-c:v',
'libx264',
'-preset',
'veryfast',
'-tune',
'zerolatency',
'-profile:v',
'baseline',
'-level',
'3.1',
'-pix_fmt',
'yuv420p',
'-g',
'25',
]
: [
'-c:v',
'copy'
]
return [
...inputArgs,
...videoArgs,
'-an',
'-f',
'mp4',
'-movflags',
'frag_keyframe+empty_moov+default_base_moof',
'-reset_timestamps',
'1',
'pipe:1',
]
}
function sendWsFrame(socket, payload, opcode = 0x2) {
if (socket.destroyed || !socket.writable) {
return
}
const body = Buffer.isBuffer(payload) ? payload : Buffer.from(payload)
const length = body.length
let header
if (length < 126) {
header = Buffer.from([
0x80 | opcode,
length
])
} else if (length <= 0xffff) {
header = Buffer.allocUnsafe(4)
header[0] = 0x80 | opcode
header[1] = 126
header.writeUInt16BE(length, 2)
} else {
header = Buffer.allocUnsafe(10)
header[0] = 0x80 | opcode
header[1] = 127
header.writeBigUInt64BE(BigInt(length), 2)
}
socket.write(Buffer.concat([
header,
body
]))
}
function sendText(socket, payload) {
sendWsFrame(socket, JSON.stringify(payload), 0x1)
}
function closeSocket(socket, code = 1000, reason = '') {
const reasonBuffer = Buffer.from(reason)
const payload = Buffer.allocUnsafe(2 + reasonBuffer.length)
payload.writeUInt16BE(code, 0)
reasonBuffer.copy(payload, 2)
sendWsFrame(socket, payload, 0x8)
socket.end()
}
function readClientFrames(buffer, onMessage) {
let offset = 0
while (offset + 2 <= buffer.length) {
const firstByte = buffer[offset]
const secondByte = buffer[offset + 1]
const opcode = firstByte & 0x0f
const masked = (secondByte & 0x80) !== 0
let length = secondByte & 0x7f
let headerLength = 2
if (length === 126) {
if (offset + 4 > buffer.length) break
length = buffer.readUInt16BE(offset + 2)
headerLength = 4
} else if (length === 127) {
if (offset + 10 > buffer.length) break
const bigLength = buffer.readBigUInt64BE(offset + 2)
if (bigLength > BigInt(Number.MAX_SAFE_INTEGER)) {
throw new Error('WebSocket frame is too large')
}
length = Number(bigLength)
headerLength = 10
}
const maskLength = masked ? 4 : 0
const frameEnd = offset + headerLength + maskLength + length
if (frameEnd > buffer.length) break
let payload = buffer.subarray(offset + headerLength + maskLength, frameEnd)
if (masked) {
const mask = buffer.subarray(offset + headerLength, offset + headerLength + 4)
payload = Buffer.from(payload.map((byte, index) => byte ^ mask[index % 4]))
}
onMessage(opcode, payload)
offset = frameEnd
}
return buffer.subarray(offset)
}
function handleControlMessage(payload) {
try {
const message = JSON.parse(payload.toString('utf8'))
if (message.type === 'control') {
console.log(`[control] action=${message.action} sentAt=${message.sentAt || ''}`)
}
} catch {
console.warn('[ws] ignored non-json text message')
}
}
function startClientStream(socket) {
if (!RTSP_URL) {
sendText(socket, {
type: 'error',
message:
'Missing RTSP_URL. Set RTSP_URL or HIKVISION_HOST/HIKVISION_USER/HIKVISION_PASSWORD.',
})
closeSocket(socket, 1011, 'missing rtsp url')
return
}
const ffmpeg = spawn(FFMPEG_PATH, createFfmpegArgs(), {
stdio: [
'ignore',
'pipe',
'pipe'
],
})
let pendingFrame = Buffer.alloc(0)
console.log(`[ffmpeg] started pid=${ffmpeg.pid} transcode=${VIDEO_TRANSCODE ? 'yes' : 'no'}`)
ffmpeg.stdout.on('data', (chunk) => {
if (!socket.writable || socket.destroyed) {
return
}
if (socket.writableLength > 8 * 1024 * 1024) {
pendingFrame = Buffer.concat([
pendingFrame,
chunk
])
return
}
if (pendingFrame.length > 0) {
chunk = Buffer.concat([
pendingFrame,
chunk
])
pendingFrame = Buffer.alloc(0)
}
sendWsFrame(socket, chunk)
})
ffmpeg.stderr.on('data', (chunk) => {
console.error(`[ffmpeg] ${chunk.toString().trim()}`)
})
ffmpeg.on('error', (error) => {
sendText(socket, {
type: 'error',
message: `Failed to start ffmpeg: ${error.message}`,
})
closeSocket(socket, 1011, 'ffmpeg start failed')
})
ffmpeg.on('close', (code, signal) => {
console.log(`[ffmpeg] stopped code=${code} signal=${signal || ''}`)
if (!socket.destroyed) {
closeSocket(socket, 1011, 'ffmpeg stopped')
}
})
socket.on('close', () => {
ffmpeg.kill('SIGTERM')
})
socket.on('end', () => {
ffmpeg.kill('SIGTERM')
})
socket.on('error', () => {
ffmpeg.kill('SIGTERM')
})
}
function handleUpgrade(request, socket) {
if (request.url !== WS_PATH) {
socket.write('HTTP/1.1 404 Not Found\r\n\r\n')
socket.destroy()
return
}
const key = request.headers['sec-websocket-key']
if (!key) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n')
socket.destroy()
return
}
const accept = createHash('sha1')
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
.digest('base64')
socket.write(
[
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${accept}`,
'\r\n',
].join('\r\n'),
)
let frameBuffer = Buffer.alloc(0)
socket.on('data', (chunk) => {
try {
frameBuffer = readClientFrames(Buffer.concat([
frameBuffer,
chunk
]), (opcode, payload) => {
if (opcode === 0x1) {
handleControlMessage(payload)
} else if (opcode === 0x8) {
socket.end()
} else if (opcode === 0x9) {
sendWsFrame(socket, payload, 0xA)
}
})
} catch (error) {
console.error(`[ws] ${error.message}`)
closeSocket(socket, 1002, 'bad websocket frame')
}
})
sendText(socket, {
type: 'ready',
path: WS_PATH,
transcode: VIDEO_TRANSCODE,
})
startClientStream(socket)
}
const server = http.createServer((request, response) => {
if (request.url === '/health') {
response.writeHead(200, {'content-type': 'application/json; charset=utf-8'})
response.end(
JSON.stringify({
ok: true,
wsPath: WS_PATH,
rtspConfigured: Boolean(RTSP_URL),
transcode: VIDEO_TRANSCODE,
}),
)
return
}
response.writeHead(404, {'content-type': 'text/plain; charset=utf-8'})
response.end('Not Found')
})
server.on('upgrade', handleUpgrade)
server.on('error', (error) => {
console.error(`[server] failed to listen on ${HOST}:${PORT}: ${error.message}`)
process.exitCode = 1
})
server.listen(PORT, HOST, () => {
console.log(`[server] ws://${HOST}:${PORT}${WS_PATH}`)
console.log(`[server] health http://${HOST}:${PORT}/health`)
console.log(`[server] rtsp ${RTSP_URL ? 'configured' : 'not configured'}`)
})