Skip to content

切片上传

依赖 前端:spark-md5(创建hash) 后端: multiparly(解析FormData)

上传

切片

  • 获取File对象
  • 使用File.slice切片
js
const CHUNK_SIZE = 1024 * 1024 //1M
function createFileSlice(file) {
    const chunks = []
    const cur = 0
    while(cur <= file.size) {
        const chunk = file.slice(cur, cur + CHUNK_SIZE)
        chunks.push(chunk)
        cur += CHUNK_SIZE
    }
    return chunks
}
const CHUNK_SIZE = 1024 * 1024 //1M
function createFileSlice(file) {
    const chunks = []
    const cur = 0
    while(cur <= file.size) {
        const chunk = file.slice(cur, cur + CHUNK_SIZE)
        chunks.push(chunk)
        cur += CHUNK_SIZE
    }
    return chunks
}

计算hash

  • 安装spark-md5
  • 计算hash
js
import SparkMD5 from 'spark-md5'
function createHash(fileChunks) {
    return new Promise((resolve) => {
        const spark = new SparkMD5.ArrayBuffer()
        const chunks = []
        // 防止文件过大,计算耗时,取第一片和最后一片,其他取中间两个字节
        fileChunks.forEach((chunk, index) => {
            if(index === 0 || index === fileChunks.length - 1) {
                chunks.push(chunk)
            } else {
                chunks.push(chunk.slice(0, 2))
                chunks.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
                chunks.push(chunk.slice(-2))
            }
        })

        const fileReader = new FileReander()
        fileReader.readAsArrayBuffer(new Blob(chunks))
        fileReader.onload = function(e) {
            spark.append(e.target.result)
            resolve(spark.end())
        }
    })
}
import SparkMD5 from 'spark-md5'
function createHash(fileChunks) {
    return new Promise((resolve) => {
        const spark = new SparkMD5.ArrayBuffer()
        const chunks = []
        // 防止文件过大,计算耗时,取第一片和最后一片,其他取中间两个字节
        fileChunks.forEach((chunk, index) => {
            if(index === 0 || index === fileChunks.length - 1) {
                chunks.push(chunk)
            } else {
                chunks.push(chunk.slice(0, 2))
                chunks.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
                chunks.push(chunk.slice(-2))
            }
        })

        const fileReader = new FileReander()
        fileReader.readAsArrayBuffer(new Blob(chunks))
        fileReader.onload = function(e) {
            spark.append(e.target.result)
            resolve(spark.end())
        }
    })
}

并发上传

  • 生成formData
js
function createFromData(fileChunks, hash, fileName) {
    const data = fileChunks.map((chunk, index) => ({
        fileHash: hash,
        index,
        chunk,
        fileName,
        chunkHash: `${hash}-${index}`
    }))

    const formDatas = data.map((item, index) => {
        const formData = new FormData()
        formData.append('fileHash', item.fileHash)
        formData.append('index', item.index)
        formData.append('chunk', item.chunk)
        formData.append('chunkHash', item.chunkHash)
        return formData
    })
    return formDatas
}
function createFromData(fileChunks, hash, fileName) {
    const data = fileChunks.map((chunk, index) => ({
        fileHash: hash,
        index,
        chunk,
        fileName,
        chunkHash: `${hash}-${index}`
    }))

    const formDatas = data.map((item, index) => {
        const formData = new FormData()
        formData.append('fileHash', item.fileHash)
        formData.append('index', item.index)
        formData.append('chunk', item.chunk)
        formData.append('chunkHash', item.chunkHash)
        return formData
    })
    return formDatas
}
  • 并发上传
js
let index = 0
const max = 6
const taskPool = []
async function upload(formDatas) {
    wihle (index < formDatas.length) {
        const task = fetch('/upload', {
            method: 'POST',
            body: formDatas[index]
        })
        task.then(() => {
            taskPool.splice(taskPool.findIndex(t => t === tash), 1)
        })
        if(max === taskPool.length) {
            await Promise.race(taskPool)
        }
        index ++

    }
    await Promise.all(taskPool)
}
let index = 0
const max = 6
const taskPool = []
async function upload(formDatas) {
    wihle (index < formDatas.length) {
        const task = fetch('/upload', {
            method: 'POST',
            body: formDatas[index]
        })
        task.then(() => {
            taskPool.splice(taskPool.findIndex(t => t === tash), 1)
        })
        if(max === taskPool.length) {
            await Promise.race(taskPool)
        }
        index ++

    }
    await Promise.all(taskPool)
}

upload接口

js
const multiparly = require('multiparly')
const path = require('path')
const fse = require('fs-extra')
const UPLOAD_DIR = '/upload'
app.post('/upload', async (req, res) => {
    const form = new multiparly.Form()
    
    form.parse(req, async (err, fields, files) => {
        if(err){
            return res.status(200, {
                code: -1,
                msg: '上传失败' + err.message
            })
        }
        const fileName = fields['fileName'][0]
        const fileHash = fields['fileHash'][0]
        const chunkHash = fields['chunkHash'][0]

        const chunkDir = path.resolve(UPLOAD_DIR, fileHash)

        if(!fse.existsSync(chunkDir)){
            await fse.mkdirs(chunkDir)
        }

        const oldPath = files[0].path
        await fse.move(oldPath, path.resolve(chunkDir, chunkHash))
        res.status(200, {
                code: 0,
                msg: '上传成功'
            })
    })
})
const multiparly = require('multiparly')
const path = require('path')
const fse = require('fs-extra')
const UPLOAD_DIR = '/upload'
app.post('/upload', async (req, res) => {
    const form = new multiparly.Form()
    
    form.parse(req, async (err, fields, files) => {
        if(err){
            return res.status(200, {
                code: -1,
                msg: '上传失败' + err.message
            })
        }
        const fileName = fields['fileName'][0]
        const fileHash = fields['fileHash'][0]
        const chunkHash = fields['chunkHash'][0]

        const chunkDir = path.resolve(UPLOAD_DIR, fileHash)

        if(!fse.existsSync(chunkDir)){
            await fse.mkdirs(chunkDir)
        }

        const oldPath = files[0].path
        await fse.move(oldPath, path.resolve(chunkDir, chunkHash))
        res.status(200, {
                code: 0,
                msg: '上传成功'
            })
    })
})

合并

发起合并

js
fetch('/marge', {
    method: 'POST',
    headers: {
        'content-type': 'application/json'
    },
    body: JSON.stringify({
        fileHash: fileHash,
        fileName: fileName,
        size: CHUNK_SIZE
    })
})
fetch('/marge', {
    method: 'POST',
    headers: {
        'content-type': 'application/json'
    },
    body: JSON.stringify({
        fileHash: fileHash,
        fileName: fileName,
        size: CHUNK_SIZE
    })
})

marge接口

js
app.post('/marge', async (req, res) => {
    const { fileHash, fileName, size } = req
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
    const fileExt = getFileExt(fileName)
    const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
    // chunks文件夹不存在
    if(!fse.existsSync(chunkDir)){
        return res.status(200, {
            code: -1,
            msg: '没有需要合并的文件'
        })
    }
    // 文件已存在
    if(fse.existsSync(filePath)){
        return res.status(200, {
            code: 0,
            msg: '合并成功'
        })
    }
    const chunkFiles = fse.readdirSync(chunkDir)
    await writeFile(chunkFiles)
    fse.removeSync(chunkDir)

    return res.status(200, {
        code: 0,
        msg: '合并成功'
    })
})

async function writeFile(chunkFiles) {
     // 排序
        chunkFiles.sort((a, b) => {
            return a.split('-')[1] - b.split('-')[1]
        })
        const tasks = chunkFiles.map((fileName, index) => {
            return new Promise((resolve) => {
                const chunkPath = path.resolve(chunkDir, fileName)
                //读取chunk文件
                const readStream = fse.createReadStream(chunkPath)
                //写入chunk到文件
                const writeStream = fse.createWriteStream(filePath, {
                    start: index * size,
                    end: (index + 1) * size
                })
                readStream.on('end', () => {
                    // 读取完成后删除chunk文件
                    fse.unlinkSync(chunkPath)
                    resolve()
                })
                readStream.pipe(writeStream)
            })
        })
        return await Promise.all(tasks)
}

function getFileExt(fileName) {
    return fileName.slice(fileName.lastIndexOf('.') - 1)[1]
}
app.post('/marge', async (req, res) => {
    const { fileHash, fileName, size } = req
    const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
    const fileExt = getFileExt(fileName)
    const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
    // chunks文件夹不存在
    if(!fse.existsSync(chunkDir)){
        return res.status(200, {
            code: -1,
            msg: '没有需要合并的文件'
        })
    }
    // 文件已存在
    if(fse.existsSync(filePath)){
        return res.status(200, {
            code: 0,
            msg: '合并成功'
        })
    }
    const chunkFiles = fse.readdirSync(chunkDir)
    await writeFile(chunkFiles)
    fse.removeSync(chunkDir)

    return res.status(200, {
        code: 0,
        msg: '合并成功'
    })
})

async function writeFile(chunkFiles) {
     // 排序
        chunkFiles.sort((a, b) => {
            return a.split('-')[1] - b.split('-')[1]
        })
        const tasks = chunkFiles.map((fileName, index) => {
            return new Promise((resolve) => {
                const chunkPath = path.resolve(chunkDir, fileName)
                //读取chunk文件
                const readStream = fse.createReadStream(chunkPath)
                //写入chunk到文件
                const writeStream = fse.createWriteStream(filePath, {
                    start: index * size,
                    end: (index + 1) * size
                })
                readStream.on('end', () => {
                    // 读取完成后删除chunk文件
                    fse.unlinkSync(chunkPath)
                    resolve()
                })
                readStream.pipe(writeStream)
            })
        })
        return await Promise.all(tasks)
}

function getFileExt(fileName) {
    return fileName.slice(fileName.lastIndexOf('.') - 1)[1]
}

秒传和断点续传

  • 增加verify接口,判断文件是否已存在
js
app.post('/verify', async (req, res) => {
    const { fileName, fileHash } = req
    const fileExt = getFileExt(fileName)
    const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
    if(fs.existsSync(filePath)){
        // 已存在不需要上传,秒传
        res.status(200, {
            code: 0,
            msg: '请求成功',
            data: {
                shouldUpload: false
            }
        })
    } else {
        // 不存在需要上传
        const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
        // 将已上传的分片hash传回给客户端
        const existsChunk = ffse.readdirSync(chunkDir)
        res.status(200, {
            code: 0,
            msg: '请求成功',
            data: {
                shouldUpload: true,
                existsChunk
            }
        })
    }
})
app.post('/verify', async (req, res) => {
    const { fileName, fileHash } = req
    const fileExt = getFileExt(fileName)
    const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
    if(fs.existsSync(filePath)){
        // 已存在不需要上传,秒传
        res.status(200, {
            code: 0,
            msg: '请求成功',
            data: {
                shouldUpload: false
            }
        })
    } else {
        // 不存在需要上传
        const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
        // 将已上传的分片hash传回给客户端
        const existsChunk = ffse.readdirSync(chunkDir)
        res.status(200, {
            code: 0,
            msg: '请求成功',
            data: {
                shouldUpload: true,
                existsChunk
            }
        })
    }
})

两种处理方式

  • 客户端拿到existsChunk之后,文件分片中过滤掉已上传的文件分片,只上传不存在的分片。
  • 客户端拿到existsChunk之后, 又传回服务器,由服务器来过滤分片,只上传不存在的分片。