You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
376 lines
8.3 KiB
JavaScript
376 lines
8.3 KiB
JavaScript
import {spawn} from 'node:child_process'
|
|
import {createHash} from 'node:crypto'
|
|
import {existsSync, readFileSync} from 'node:fs'
|
|
import http from 'node:http'
|
|
import {resolve} from 'node:path'
|
|
|
|
loadDotEnv()
|
|
|
|
const HOST = process.env.WS_HOST || '0.0.0.0'
|
|
const PORT = Number(process.env.WS_PORT || 8080)
|
|
const WS_PATH = process.env.WS_PATH || '/video'
|
|
const FFMPEG_PATH = process.env.FFMPEG_PATH || 'ffmpeg'
|
|
const RTSP_URL = process.env.RTSP_URL || buildCameraRtspUrl()
|
|
const RTSP_TRANSPORT = process.env.RTSP_TRANSPORT || 'tcp'
|
|
const VIDEO_TRANSCODE = process.env.VIDEO_TRANSCODE === '1'
|
|
|
|
function loadDotEnv() {
|
|
const envPath = resolve(process.cwd(), '.env')
|
|
|
|
|
|
if (!existsSync(envPath)) {
|
|
return
|
|
}
|
|
|
|
const lines = readFileSync(envPath, 'utf8').split(/\r?\n/)
|
|
for (const line of lines) {
|
|
const trimmed = line.trim()
|
|
if (!trimmed || trimmed.startsWith('#')) {
|
|
continue
|
|
}
|
|
|
|
const separatorIndex = trimmed.indexOf('=')
|
|
if (separatorIndex === -1) {
|
|
continue
|
|
}
|
|
|
|
const key = trimmed.slice(0, separatorIndex).trim()
|
|
let value = trimmed.slice(separatorIndex + 1).trim()
|
|
if (
|
|
(value.startsWith('"') && value.endsWith('"')) ||
|
|
(value.startsWith("'") && value.endsWith("'"))
|
|
) {
|
|
value = value.slice(1, -1)
|
|
}
|
|
|
|
if (!process.env[key]) {
|
|
process.env[key] = value
|
|
}
|
|
}
|
|
}
|
|
|
|
function buildCameraRtspUrl() {
|
|
const host = process.env.CAMERA_HOST
|
|
const port = process.env.CAMERA_PORT || '554'
|
|
const username = process.env.CAMERA_USER
|
|
const password = process.env.CAMERA_PASSWORD
|
|
const path = process.env.CAMERA_RTSP_PATH || '/Stream/Live/101'
|
|
|
|
if (!host || !username || !password) {
|
|
return ''
|
|
}
|
|
|
|
return `rtsp://${encodeURIComponent(username)}:${encodeURIComponent(password)}@${host}:${port}${path}`
|
|
}
|
|
|
|
function createFfmpegArgs() {
|
|
const inputArgs = [
|
|
'-hide_banner',
|
|
'-loglevel',
|
|
'warning',
|
|
'-rtsp_transport',
|
|
RTSP_TRANSPORT,
|
|
'-i',
|
|
RTSP_URL,
|
|
]
|
|
|
|
const videoArgs = VIDEO_TRANSCODE
|
|
? [
|
|
'-c:v',
|
|
'libx264',
|
|
'-preset',
|
|
'veryfast',
|
|
'-tune',
|
|
'zerolatency',
|
|
'-profile:v',
|
|
'baseline',
|
|
'-level',
|
|
'3.1',
|
|
'-pix_fmt',
|
|
'yuv420p',
|
|
'-g',
|
|
'25',
|
|
]
|
|
: [
|
|
'-c:v',
|
|
'copy'
|
|
]
|
|
|
|
return [
|
|
...inputArgs,
|
|
...videoArgs,
|
|
'-an',
|
|
'-f',
|
|
'mp4',
|
|
'-movflags',
|
|
'frag_keyframe+empty_moov+default_base_moof',
|
|
'-reset_timestamps',
|
|
'1',
|
|
'pipe:1',
|
|
]
|
|
}
|
|
|
|
function sendWsFrame(socket, payload, opcode = 0x2) {
|
|
if (socket.destroyed || !socket.writable) {
|
|
return
|
|
}
|
|
|
|
const body = Buffer.isBuffer(payload) ? payload : Buffer.from(payload)
|
|
const length = body.length
|
|
let header
|
|
|
|
if (length < 126) {
|
|
header = Buffer.from([
|
|
0x80 | opcode,
|
|
length
|
|
])
|
|
} else if (length <= 0xffff) {
|
|
header = Buffer.allocUnsafe(4)
|
|
header[0] = 0x80 | opcode
|
|
header[1] = 126
|
|
header.writeUInt16BE(length, 2)
|
|
} else {
|
|
header = Buffer.allocUnsafe(10)
|
|
header[0] = 0x80 | opcode
|
|
header[1] = 127
|
|
header.writeBigUInt64BE(BigInt(length), 2)
|
|
}
|
|
|
|
socket.write(Buffer.concat([
|
|
header,
|
|
body
|
|
]))
|
|
}
|
|
|
|
function sendText(socket, payload) {
|
|
sendWsFrame(socket, JSON.stringify(payload), 0x1)
|
|
}
|
|
|
|
function closeSocket(socket, code = 1000, reason = '') {
|
|
const reasonBuffer = Buffer.from(reason)
|
|
const payload = Buffer.allocUnsafe(2 + reasonBuffer.length)
|
|
payload.writeUInt16BE(code, 0)
|
|
reasonBuffer.copy(payload, 2)
|
|
sendWsFrame(socket, payload, 0x8)
|
|
socket.end()
|
|
}
|
|
|
|
function readClientFrames(buffer, onMessage) {
|
|
let offset = 0
|
|
|
|
while (offset + 2 <= buffer.length) {
|
|
const firstByte = buffer[offset]
|
|
const secondByte = buffer[offset + 1]
|
|
const opcode = firstByte & 0x0f
|
|
const masked = (secondByte & 0x80) !== 0
|
|
let length = secondByte & 0x7f
|
|
let headerLength = 2
|
|
|
|
if (length === 126) {
|
|
if (offset + 4 > buffer.length) break
|
|
length = buffer.readUInt16BE(offset + 2)
|
|
headerLength = 4
|
|
} else if (length === 127) {
|
|
if (offset + 10 > buffer.length) break
|
|
const bigLength = buffer.readBigUInt64BE(offset + 2)
|
|
if (bigLength > BigInt(Number.MAX_SAFE_INTEGER)) {
|
|
throw new Error('WebSocket frame is too large')
|
|
}
|
|
length = Number(bigLength)
|
|
headerLength = 10
|
|
}
|
|
|
|
const maskLength = masked ? 4 : 0
|
|
const frameEnd = offset + headerLength + maskLength + length
|
|
if (frameEnd > buffer.length) break
|
|
|
|
let payload = buffer.subarray(offset + headerLength + maskLength, frameEnd)
|
|
if (masked) {
|
|
const mask = buffer.subarray(offset + headerLength, offset + headerLength + 4)
|
|
payload = Buffer.from(payload.map((byte, index) => byte ^ mask[index % 4]))
|
|
}
|
|
|
|
onMessage(opcode, payload)
|
|
offset = frameEnd
|
|
}
|
|
|
|
return buffer.subarray(offset)
|
|
}
|
|
|
|
function handleControlMessage(payload) {
|
|
try {
|
|
const message = JSON.parse(payload.toString('utf8'))
|
|
if (message.type === 'control') {
|
|
console.log(`[control] action=${message.action} sentAt=${message.sentAt || ''}`)
|
|
}
|
|
} catch {
|
|
console.warn('[ws] ignored non-json text message')
|
|
}
|
|
}
|
|
|
|
function startClientStream(socket) {
|
|
if (!RTSP_URL) {
|
|
sendText(socket, {
|
|
type: 'error',
|
|
message:
|
|
'Missing RTSP_URL. Set RTSP_URL or HIKVISION_HOST/HIKVISION_USER/HIKVISION_PASSWORD.',
|
|
})
|
|
closeSocket(socket, 1011, 'missing rtsp url')
|
|
return
|
|
}
|
|
|
|
const ffmpeg = spawn(FFMPEG_PATH, createFfmpegArgs(), {
|
|
stdio: [
|
|
'ignore',
|
|
'pipe',
|
|
'pipe'
|
|
],
|
|
})
|
|
let pendingFrame = Buffer.alloc(0)
|
|
|
|
console.log(`[ffmpeg] started pid=${ffmpeg.pid} transcode=${VIDEO_TRANSCODE ? 'yes' : 'no'}`)
|
|
|
|
ffmpeg.stdout.on('data', (chunk) => {
|
|
if (!socket.writable || socket.destroyed) {
|
|
return
|
|
}
|
|
|
|
if (socket.writableLength > 8 * 1024 * 1024) {
|
|
pendingFrame = Buffer.concat([
|
|
pendingFrame,
|
|
chunk
|
|
])
|
|
return
|
|
}
|
|
|
|
if (pendingFrame.length > 0) {
|
|
chunk = Buffer.concat([
|
|
pendingFrame,
|
|
chunk
|
|
])
|
|
pendingFrame = Buffer.alloc(0)
|
|
}
|
|
|
|
sendWsFrame(socket, chunk)
|
|
})
|
|
|
|
ffmpeg.stderr.on('data', (chunk) => {
|
|
console.error(`[ffmpeg] ${chunk.toString().trim()}`)
|
|
})
|
|
|
|
ffmpeg.on('error', (error) => {
|
|
sendText(socket, {
|
|
type: 'error',
|
|
message: `Failed to start ffmpeg: ${error.message}`,
|
|
})
|
|
closeSocket(socket, 1011, 'ffmpeg start failed')
|
|
})
|
|
|
|
ffmpeg.on('close', (code, signal) => {
|
|
console.log(`[ffmpeg] stopped code=${code} signal=${signal || ''}`)
|
|
if (!socket.destroyed) {
|
|
closeSocket(socket, 1011, 'ffmpeg stopped')
|
|
}
|
|
})
|
|
|
|
socket.on('close', () => {
|
|
ffmpeg.kill('SIGTERM')
|
|
})
|
|
|
|
socket.on('end', () => {
|
|
ffmpeg.kill('SIGTERM')
|
|
})
|
|
|
|
socket.on('error', () => {
|
|
ffmpeg.kill('SIGTERM')
|
|
})
|
|
}
|
|
|
|
function handleUpgrade(request, socket) {
|
|
if (request.url !== WS_PATH) {
|
|
socket.write('HTTP/1.1 404 Not Found\r\n\r\n')
|
|
socket.destroy()
|
|
return
|
|
}
|
|
|
|
const key = request.headers['sec-websocket-key']
|
|
if (!key) {
|
|
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n')
|
|
socket.destroy()
|
|
return
|
|
}
|
|
|
|
const accept = createHash('sha1')
|
|
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
|
|
.digest('base64')
|
|
|
|
socket.write(
|
|
[
|
|
'HTTP/1.1 101 Switching Protocols',
|
|
'Upgrade: websocket',
|
|
'Connection: Upgrade',
|
|
`Sec-WebSocket-Accept: ${accept}`,
|
|
'\r\n',
|
|
].join('\r\n'),
|
|
)
|
|
|
|
let frameBuffer = Buffer.alloc(0)
|
|
|
|
socket.on('data', (chunk) => {
|
|
try {
|
|
frameBuffer = readClientFrames(Buffer.concat([
|
|
frameBuffer,
|
|
chunk
|
|
]), (opcode, payload) => {
|
|
if (opcode === 0x1) {
|
|
handleControlMessage(payload)
|
|
} else if (opcode === 0x8) {
|
|
socket.end()
|
|
} else if (opcode === 0x9) {
|
|
sendWsFrame(socket, payload, 0xA)
|
|
}
|
|
})
|
|
} catch (error) {
|
|
console.error(`[ws] ${error.message}`)
|
|
closeSocket(socket, 1002, 'bad websocket frame')
|
|
}
|
|
})
|
|
|
|
sendText(socket, {
|
|
type: 'ready',
|
|
path: WS_PATH,
|
|
transcode: VIDEO_TRANSCODE,
|
|
})
|
|
startClientStream(socket)
|
|
}
|
|
|
|
const server = http.createServer((request, response) => {
|
|
if (request.url === '/health') {
|
|
response.writeHead(200, {'content-type': 'application/json; charset=utf-8'})
|
|
response.end(
|
|
JSON.stringify({
|
|
ok: true,
|
|
wsPath: WS_PATH,
|
|
rtspConfigured: Boolean(RTSP_URL),
|
|
transcode: VIDEO_TRANSCODE,
|
|
}),
|
|
)
|
|
return
|
|
}
|
|
|
|
response.writeHead(404, {'content-type': 'text/plain; charset=utf-8'})
|
|
response.end('Not Found')
|
|
})
|
|
|
|
server.on('upgrade', handleUpgrade)
|
|
server.on('error', (error) => {
|
|
console.error(`[server] failed to listen on ${HOST}:${PORT}: ${error.message}`)
|
|
process.exitCode = 1
|
|
})
|
|
|
|
server.listen(PORT, HOST, () => {
|
|
console.log(`[server] ws://${HOST}:${PORT}${WS_PATH}`)
|
|
console.log(`[server] health http://${HOST}:${PORT}/health`)
|
|
console.log(`[server] rtsp ${RTSP_URL ? 'configured' : 'not configured'}`)
|
|
})
|