切片上传
依赖 前端:spark-md5(创建hash) 后端: multiparly(解析FormData)
上传
切片
- 获取File对象
- 使用
File.slice切片
js
const CHUNK_SIZE = 1024 * 1024 //1M
function createFileSlice(file) {
const chunks = []
const cur = 0
while(cur <= file.size) {
const chunk = file.slice(cur, cur + CHUNK_SIZE)
chunks.push(chunk)
cur += CHUNK_SIZE
}
return chunks
}const CHUNK_SIZE = 1024 * 1024 //1M
function createFileSlice(file) {
const chunks = []
const cur = 0
while(cur <= file.size) {
const chunk = file.slice(cur, cur + CHUNK_SIZE)
chunks.push(chunk)
cur += CHUNK_SIZE
}
return chunks
}计算hash
- 安装
spark-md5 - 计算hash
js
import SparkMD5 from 'spark-md5'
function createHash(fileChunks) {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer()
const chunks = []
// 防止文件过大,计算耗时,取第一片和最后一片,其他取中间两个字节
fileChunks.forEach((chunk, index) => {
if(index === 0 || index === fileChunks.length - 1) {
chunks.push(chunk)
} else {
chunks.push(chunk.slice(0, 2))
chunks.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
chunks.push(chunk.slice(-2))
}
})
const fileReader = new FileReander()
fileReader.readAsArrayBuffer(new Blob(chunks))
fileReader.onload = function(e) {
spark.append(e.target.result)
resolve(spark.end())
}
})
}import SparkMD5 from 'spark-md5'
function createHash(fileChunks) {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer()
const chunks = []
// 防止文件过大,计算耗时,取第一片和最后一片,其他取中间两个字节
fileChunks.forEach((chunk, index) => {
if(index === 0 || index === fileChunks.length - 1) {
chunks.push(chunk)
} else {
chunks.push(chunk.slice(0, 2))
chunks.push(chunk.slice(CHUNK_SIZE / 2, CHUNK_SIZE / 2 + 2))
chunks.push(chunk.slice(-2))
}
})
const fileReader = new FileReander()
fileReader.readAsArrayBuffer(new Blob(chunks))
fileReader.onload = function(e) {
spark.append(e.target.result)
resolve(spark.end())
}
})
}并发上传
- 生成formData
js
function createFromData(fileChunks, hash, fileName) {
const data = fileChunks.map((chunk, index) => ({
fileHash: hash,
index,
chunk,
fileName,
chunkHash: `${hash}-${index}`
}))
const formDatas = data.map((item, index) => {
const formData = new FormData()
formData.append('fileHash', item.fileHash)
formData.append('index', item.index)
formData.append('chunk', item.chunk)
formData.append('chunkHash', item.chunkHash)
return formData
})
return formDatas
}function createFromData(fileChunks, hash, fileName) {
const data = fileChunks.map((chunk, index) => ({
fileHash: hash,
index,
chunk,
fileName,
chunkHash: `${hash}-${index}`
}))
const formDatas = data.map((item, index) => {
const formData = new FormData()
formData.append('fileHash', item.fileHash)
formData.append('index', item.index)
formData.append('chunk', item.chunk)
formData.append('chunkHash', item.chunkHash)
return formData
})
return formDatas
}- 并发上传
js
let index = 0
const max = 6
const taskPool = []
async function upload(formDatas) {
wihle (index < formDatas.length) {
const task = fetch('/upload', {
method: 'POST',
body: formDatas[index]
})
task.then(() => {
taskPool.splice(taskPool.findIndex(t => t === tash), 1)
})
if(max === taskPool.length) {
await Promise.race(taskPool)
}
index ++
}
await Promise.all(taskPool)
}let index = 0
const max = 6
const taskPool = []
async function upload(formDatas) {
wihle (index < formDatas.length) {
const task = fetch('/upload', {
method: 'POST',
body: formDatas[index]
})
task.then(() => {
taskPool.splice(taskPool.findIndex(t => t === tash), 1)
})
if(max === taskPool.length) {
await Promise.race(taskPool)
}
index ++
}
await Promise.all(taskPool)
}upload接口
js
const multiparly = require('multiparly')
const path = require('path')
const fse = require('fs-extra')
const UPLOAD_DIR = '/upload'
app.post('/upload', async (req, res) => {
const form = new multiparly.Form()
form.parse(req, async (err, fields, files) => {
if(err){
return res.status(200, {
code: -1,
msg: '上传失败' + err.message
})
}
const fileName = fields['fileName'][0]
const fileHash = fields['fileHash'][0]
const chunkHash = fields['chunkHash'][0]
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
if(!fse.existsSync(chunkDir)){
await fse.mkdirs(chunkDir)
}
const oldPath = files[0].path
await fse.move(oldPath, path.resolve(chunkDir, chunkHash))
res.status(200, {
code: 0,
msg: '上传成功'
})
})
})const multiparly = require('multiparly')
const path = require('path')
const fse = require('fs-extra')
const UPLOAD_DIR = '/upload'
app.post('/upload', async (req, res) => {
const form = new multiparly.Form()
form.parse(req, async (err, fields, files) => {
if(err){
return res.status(200, {
code: -1,
msg: '上传失败' + err.message
})
}
const fileName = fields['fileName'][0]
const fileHash = fields['fileHash'][0]
const chunkHash = fields['chunkHash'][0]
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
if(!fse.existsSync(chunkDir)){
await fse.mkdirs(chunkDir)
}
const oldPath = files[0].path
await fse.move(oldPath, path.resolve(chunkDir, chunkHash))
res.status(200, {
code: 0,
msg: '上传成功'
})
})
})合并
发起合并
js
fetch('/marge', {
method: 'POST',
headers: {
'content-type': 'application/json'
},
body: JSON.stringify({
fileHash: fileHash,
fileName: fileName,
size: CHUNK_SIZE
})
})fetch('/marge', {
method: 'POST',
headers: {
'content-type': 'application/json'
},
body: JSON.stringify({
fileHash: fileHash,
fileName: fileName,
size: CHUNK_SIZE
})
})marge接口
js
app.post('/marge', async (req, res) => {
const { fileHash, fileName, size } = req
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
const fileExt = getFileExt(fileName)
const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
// chunks文件夹不存在
if(!fse.existsSync(chunkDir)){
return res.status(200, {
code: -1,
msg: '没有需要合并的文件'
})
}
// 文件已存在
if(fse.existsSync(filePath)){
return res.status(200, {
code: 0,
msg: '合并成功'
})
}
const chunkFiles = fse.readdirSync(chunkDir)
await writeFile(chunkFiles)
fse.removeSync(chunkDir)
return res.status(200, {
code: 0,
msg: '合并成功'
})
})
async function writeFile(chunkFiles) {
// 排序
chunkFiles.sort((a, b) => {
return a.split('-')[1] - b.split('-')[1]
})
const tasks = chunkFiles.map((fileName, index) => {
return new Promise((resolve) => {
const chunkPath = path.resolve(chunkDir, fileName)
//读取chunk文件
const readStream = fse.createReadStream(chunkPath)
//写入chunk到文件
const writeStream = fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size
})
readStream.on('end', () => {
// 读取完成后删除chunk文件
fse.unlinkSync(chunkPath)
resolve()
})
readStream.pipe(writeStream)
})
})
return await Promise.all(tasks)
}
function getFileExt(fileName) {
return fileName.slice(fileName.lastIndexOf('.') - 1)[1]
}app.post('/marge', async (req, res) => {
const { fileHash, fileName, size } = req
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
const fileExt = getFileExt(fileName)
const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
// chunks文件夹不存在
if(!fse.existsSync(chunkDir)){
return res.status(200, {
code: -1,
msg: '没有需要合并的文件'
})
}
// 文件已存在
if(fse.existsSync(filePath)){
return res.status(200, {
code: 0,
msg: '合并成功'
})
}
const chunkFiles = fse.readdirSync(chunkDir)
await writeFile(chunkFiles)
fse.removeSync(chunkDir)
return res.status(200, {
code: 0,
msg: '合并成功'
})
})
async function writeFile(chunkFiles) {
// 排序
chunkFiles.sort((a, b) => {
return a.split('-')[1] - b.split('-')[1]
})
const tasks = chunkFiles.map((fileName, index) => {
return new Promise((resolve) => {
const chunkPath = path.resolve(chunkDir, fileName)
//读取chunk文件
const readStream = fse.createReadStream(chunkPath)
//写入chunk到文件
const writeStream = fse.createWriteStream(filePath, {
start: index * size,
end: (index + 1) * size
})
readStream.on('end', () => {
// 读取完成后删除chunk文件
fse.unlinkSync(chunkPath)
resolve()
})
readStream.pipe(writeStream)
})
})
return await Promise.all(tasks)
}
function getFileExt(fileName) {
return fileName.slice(fileName.lastIndexOf('.') - 1)[1]
}秒传和断点续传
- 增加verify接口,判断文件是否已存在
js
app.post('/verify', async (req, res) => {
const { fileName, fileHash } = req
const fileExt = getFileExt(fileName)
const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
if(fs.existsSync(filePath)){
// 已存在不需要上传,秒传
res.status(200, {
code: 0,
msg: '请求成功',
data: {
shouldUpload: false
}
})
} else {
// 不存在需要上传
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
// 将已上传的分片hash传回给客户端
const existsChunk = ffse.readdirSync(chunkDir)
res.status(200, {
code: 0,
msg: '请求成功',
data: {
shouldUpload: true,
existsChunk
}
})
}
})app.post('/verify', async (req, res) => {
const { fileName, fileHash } = req
const fileExt = getFileExt(fileName)
const filePath = path.resolve(UPLOAD_DIR, fileHash + fileExt)
if(fs.existsSync(filePath)){
// 已存在不需要上传,秒传
res.status(200, {
code: 0,
msg: '请求成功',
data: {
shouldUpload: false
}
})
} else {
// 不存在需要上传
const chunkDir = path.resolve(UPLOAD_DIR, fileHash)
// 将已上传的分片hash传回给客户端
const existsChunk = ffse.readdirSync(chunkDir)
res.status(200, {
code: 0,
msg: '请求成功',
data: {
shouldUpload: true,
existsChunk
}
})
}
})两种处理方式
- 客户端拿到
existsChunk之后,文件分片中过滤掉已上传的文件分片,只上传不存在的分片。 - 客户端拿到
existsChunk之后, 又传回服务器,由服务器来过滤分片,只上传不存在的分片。