You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

590 lines
16 KiB

'use strict'
const Busboy = require('@fastify/busboy')
const os = require('os')
const fp = require('fastify-plugin')
const eos = require('end-of-stream')
const { createWriteStream } = require('fs')
const { unlink } = require('fs').promises
const path = require('path')
const hexoid = require('hexoid')
const util = require('util')
const createError = require('@fastify/error')
const sendToWormhole = require('stream-wormhole')
const deepmergeAll = require('@fastify/deepmerge')({ all: true })
const { PassThrough, pipeline } = require('stream')
const pump = util.promisify(pipeline)
const secureJSON = require('secure-json-parse')
const kMultipart = Symbol('multipart')
const kMultipartHandler = Symbol('multipartHandler')
const getDescriptor = Object.getOwnPropertyDescriptor
function setMultipart (req, payload, done) {
// nothing to do, it will be done by the Request.multipart object
req.raw[kMultipart] = true
done()
}
function attachToBody (options, req, reply, next) {
if (req.raw[kMultipart] !== true) {
next()
return
}
const consumerStream = options.onFile || defaultConsumer
const body = {}
const mp = req.multipart((field, file, filename, encoding, mimetype) => {
body[field] = body[field] || []
body[field].push({
data: [],
filename,
encoding,
mimetype,
limit: false
})
const result = consumerStream(field, file, filename, encoding, mimetype, body)
if (result && typeof result.then === 'function') {
result.catch((err) => {
// continue with the workflow
err.statusCode = 500
file.destroy(err)
})
}
}, function (err) {
if (!err) {
req.body = body
}
next(err)
}, options)
mp.on('field', (key, value) => {
if (key === '__proto__' || key === 'constructor') {
mp.destroy(new Error(`${key} is not allowed as field name`))
return
}
if (body[key] === undefined) {
body[key] = value
} else if (Array.isArray(body[key])) {
body[key].push(value)
} else {
body[key] = [body[key], value]
}
})
}
function defaultConsumer (field, file, filename, encoding, mimetype, body) {
const fileData = []
const lastFile = body[field][body[field].length - 1]
file.on('data', data => { if (!lastFile.limit) { fileData.push(data) } })
file.on('limit', () => { lastFile.limit = true })
file.on('end', () => {
if (!lastFile.limit) {
lastFile.data = Buffer.concat(fileData)
} else {
lastFile.data = undefined
}
})
}
function busboy (options) {
try {
return new Busboy(options)
} catch (error) {
const errorEmitter = new PassThrough()
process.nextTick(function () {
errorEmitter.emit('error', error)
})
return errorEmitter
}
}
function fastifyMultipart (fastify, options, done) {
if (options.addToBody === true) {
if (typeof options.sharedSchemaId === 'string') {
fastify.addSchema({
$id: options.sharedSchemaId,
type: 'object',
properties: {
encoding: { type: 'string' },
filename: { type: 'string' },
limit: { type: 'boolean' },
mimetype: { type: 'string' }
}
})
}
fastify.addHook('preValidation', function (req, reply, next) {
attachToBody(options, req, reply, next)
})
}
if (options.attachFieldsToBody === true || options.attachFieldsToBody === 'keyValues') {
if (typeof options.sharedSchemaId === 'string') {
fastify.addSchema({
$id: options.sharedSchemaId,
type: 'object',
properties: {
fieldname: { type: 'string' },
encoding: { type: 'string' },
filename: { type: 'string' },
mimetype: { type: 'string' }
}
})
}
fastify.addHook('preValidation', async function (req, reply) {
if (!req.isMultipart()) {
return
}
for await (const part of req.parts()) {
req.body = part.fields
if (part.file) {
if (options.onFile) {
await options.onFile(part)
} else {
await part.toBuffer()
}
}
}
if (options.attachFieldsToBody === 'keyValues') {
const body = {}
for (const key of Object.keys(req.body)) {
const field = req.body[key]
if (field.value !== undefined) {
body[key] = field.value
} else if (Array.isArray(field)) {
body[key] = field.map(item => {
if (item._buf !== undefined) {
return item._buf.toString()
}
return item.value
})
} else if (field._buf !== undefined) {
body[key] = field._buf.toString()
}
}
req.body = body
}
})
}
let throwFileSizeLimit = true
if (typeof options.throwFileSizeLimit === 'boolean') {
throwFileSizeLimit = options.throwFileSizeLimit
}
const PartsLimitError = createError('FST_PARTS_LIMIT', 'reach parts limit', 413)
const FilesLimitError = createError('FST_FILES_LIMIT', 'reach files limit', 413)
const FieldsLimitError = createError('FST_FIELDS_LIMIT', 'reach fields limit', 413)
const RequestFileTooLargeError = createError('FST_REQ_FILE_TOO_LARGE', 'request file too large, please check multipart config', 413)
const PrototypeViolationError = createError('FST_PROTO_VIOLATION', 'prototype property is not allowed as field name', 400)
const InvalidMultipartContentTypeError = createError('FST_INVALID_MULTIPART_CONTENT_TYPE', 'the request is not multipart', 406)
const InvalidJSONFieldError = createError('FST_INVALID_JSON_FIELD_ERROR', 'a request field is not a valid JSON as declared by its Content-Type', 406)
fastify.decorate('multipartErrors', {
PartsLimitError,
FilesLimitError,
FieldsLimitError,
PrototypeViolationError,
InvalidMultipartContentTypeError,
RequestFileTooLargeError
})
fastify.addContentTypeParser('multipart', setMultipart)
fastify.decorateRequest(kMultipartHandler, handleMultipart)
fastify.decorateRequest('parts', getMultipartIterator)
fastify.decorateRequest('isMultipart', isMultipart)
fastify.decorateRequest('tmpUploads', null)
// legacy
fastify.decorateRequest('multipart', handleLegacyMultipartApi)
// Stream mode
fastify.decorateRequest('file', getMultipartFile)
fastify.decorateRequest('files', getMultipartFiles)
// Disk mode
fastify.decorateRequest('saveRequestFiles', saveRequestFiles)
fastify.decorateRequest('cleanRequestFiles', cleanRequestFiles)
fastify.addHook('onResponse', async (request, reply) => {
await request.cleanRequestFiles()
})
const toID = hexoid()
function isMultipart () {
return this.raw[kMultipart] || false
}
// handler definition is in multipart-readstream
// handler(field, file, filename, encoding, mimetype)
// opts is a per-request override for the options object
function handleLegacyMultipartApi (handler, done, opts) {
if (typeof handler !== 'function') {
throw new Error('handler must be a function')
}
if (typeof done !== 'function') {
throw new Error('the callback must be a function')
}
if (!this.isMultipart()) {
done(new Error('the request is not multipart'))
return
}
const log = this.log
log.warn('the multipart callback-based api is deprecated in favour of the new promise api')
log.debug('starting multipart parsing')
const req = this.raw
const busboyOptions = deepmergeAll({ headers: req.headers }, options || {}, opts || {})
const stream = busboy(busboyOptions)
let completed = false
let files = 0
req.on('error', function (err) {
stream.destroy()
if (!completed) {
completed = true
done(err)
}
})
stream.on('finish', function () {
log.debug('finished receiving stream, total %d files', files)
if (!completed) {
completed = true
setImmediate(done)
}
})
stream.on('file', wrap)
req.pipe(stream)
.on('error', function (error) {
req.emit('error', error)
})
function wrap (field, file, filename, encoding, mimetype) {
log.debug({ field, filename, encoding, mimetype }, 'parsing part')
files++
eos(file, waitForFiles)
if (field === '__proto__' || field === 'constructor') {
file.destroy(new Error(`${field} is not allowed as field name`))
return
}
handler(field, file, filename, encoding, mimetype)
}
function waitForFiles (err) {
if (err) {
completed = true
done(err)
}
}
return stream
}
function handleMultipart (opts = {}) {
if (!this.isMultipart()) {
throw new InvalidMultipartContentTypeError()
}
this.log.debug('starting multipart parsing')
let values = []
let pendingHandler = null
// only one file / field can be processed at a time
// "null" will close the consumer side
const ch = (val) => {
if (pendingHandler) {
pendingHandler(val)
pendingHandler = null
} else {
values.push(val)
}
}
const handle = (handler) => {
if (values.length > 0) {
const value = values[0]
values = values.slice(1)
handler(value)
} else {
pendingHandler = handler
}
}
const parts = () => {
return new Promise((resolve, reject) => {
handle((val) => {
if (val instanceof Error) return reject(val)
resolve(val)
})
})
}
const body = {}
let lastError = null
let currentFile = null
const request = this.raw
const busboyOptions = deepmergeAll(
{ headers: request.headers },
options,
opts
)
this.log.trace({ busboyOptions }, 'Providing options to busboy')
const bb = busboy(busboyOptions)
request.on('close', cleanup)
request.on('error', cleanup)
bb
.on('field', onField)
.on('file', onFile)
.on('close', cleanup)
.on('error', onEnd)
.on('end', onEnd)
.on('finish', onEnd)
bb.on('partsLimit', function () {
onError(new PartsLimitError())
})
bb.on('filesLimit', function () {
onError(new FilesLimitError())
})
bb.on('fieldsLimit', function () {
onError(new FieldsLimitError())
})
request.pipe(bb)
function onField (name, fieldValue, fieldnameTruncated, valueTruncated, encoding, contentType) {
// don't overwrite prototypes
if (getDescriptor(Object.prototype, name)) {
onError(new PrototypeViolationError())
return
}
// If it is a JSON field, parse it
if (contentType.startsWith('application/json')) {
// If the value was truncated, it can never be a valid JSON. Don't even try to parse
if (valueTruncated) {
onError(new InvalidJSONFieldError())
return
}
try {
fieldValue = secureJSON.parse(fieldValue)
contentType = 'application/json'
} catch (e) {
onError(new InvalidJSONFieldError())
return
}
}
const value = {
fieldname: name,
mimetype: contentType,
encoding,
value: fieldValue,
fieldnameTruncated,
valueTruncated,
fields: body
}
if (body[name] === undefined) {
body[name] = value
} else if (Array.isArray(body[name])) {
body[name].push(value)
} else {
body[name] = [body[name], value]
}
ch(value)
}
function onFile (name, file, filename, encoding, mimetype) {
// don't overwrite prototypes
if (getDescriptor(Object.prototype, name)) {
// ensure that stream is consumed, any error is suppressed
sendToWormhole(file)
onError(new PrototypeViolationError())
return
}
if (typeof opts.throwFileSizeLimit === 'boolean') {
throwFileSizeLimit = opts.throwFileSizeLimit
}
const value = {
fieldname: name,
filename,
encoding,
mimetype,
file,
fields: body,
_buf: null,
async toBuffer () {
if (this._buf) {
return this._buf
}
const fileChunks = []
let err
for await (const chunk of this.file) {
fileChunks.push(chunk)
if (throwFileSizeLimit && this.file.truncated) {
err = new RequestFileTooLargeError()
err.part = this
onError(err)
fileChunks.length = 0
}
}
if (err) {
// throwing in the async iterator will
// cause the file.destroy() to be called
// The stream has already been managed by
// busboy instead
throw err
}
this._buf = Buffer.concat(fileChunks)
return this._buf
}
}
if (throwFileSizeLimit) {
file.on('limit', function () {
const err = new RequestFileTooLargeError()
err.part = value
onError(err)
})
}
if (body[name] === undefined) {
body[name] = value
} else if (Array.isArray(body[name])) {
body[name].push(value)
} else {
body[name] = [body[name], value]
}
currentFile = file
ch(value)
}
function onError (err) {
lastError = err
currentFile = null
}
function onEnd (err) {
cleanup()
ch(err || lastError)
}
function cleanup (err) {
request.unpipe(bb)
// in node 10 it seems that error handler is not called but request.aborted is set
if ((err || request.aborted) && currentFile) {
currentFile.destroy()
}
}
return parts
}
async function saveRequestFiles (options) {
const requestFiles = []
const tmpdir = (options && options.tmpdir) || os.tmpdir()
const files = await this.files(options)
this.tmpUploads = []
for await (const file of files) {
const filepath = path.join(tmpdir, toID() + path.extname(file.filename))
const target = createWriteStream(filepath)
try {
await pump(file.file, target)
requestFiles.push({ ...file, filepath })
this.tmpUploads.push(filepath)
} catch (err) {
this.log.error({ err }, 'save request file')
throw err
}
}
return requestFiles
}
async function cleanRequestFiles () {
if (!this.tmpUploads) {
return
}
for (const filepath of this.tmpUploads) {
try {
await unlink(filepath)
} catch (error) {
this.log.error(error, 'could not delete file')
}
}
}
async function getMultipartFile (options) {
const parts = this[kMultipartHandler](options)
let part
while ((part = await parts()) != null) {
if (part.file) {
return part
}
}
}
async function * getMultipartFiles (options) {
const parts = this[kMultipartHandler](options)
let part
while ((part = await parts()) != null) {
if (part.file) {
yield part
}
}
}
async function * getMultipartIterator (options) {
const parts = this[kMultipartHandler](options)
let part
while ((part = await parts()) != null) {
yield part
}
}
done()
}
const _fastifyMultipart = fp(fastifyMultipart, {
fastify: '4.x',
name: '@fastify/multipart'
})
/**
* These export configurations enable JS and TS developers
* to consumer fastify in whatever way best suits their needs.
*/
module.exports = _fastifyMultipart
module.exports.fastifyMultipart = _fastifyMultipart
module.exports.default = _fastifyMultipart