2024-03-14 07:30:49 +00:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
require('dotenv').config();
|
|
|
|
const express = require('express');
|
|
|
|
const bodyParser = require('body-parser');
|
|
|
|
const cors = require('cors');
|
|
|
|
const fs = require('fs');
|
|
|
|
const fsp = require('fs/promises');
|
|
|
|
const { openAsBlob } = require('node:fs');
|
|
|
|
const { rm, stat } = require('fs/promises');
|
|
|
|
const os = require('os');
|
|
|
|
const path = require('path');
|
|
|
|
const SseStream = require('ssestream').default;
|
|
|
|
const { Transform, Readable } = require('node:stream');
|
|
|
|
const { pipeline } = require('node:stream/promises');
|
|
|
|
const { differenceInSeconds } = require('date-fns');
|
|
|
|
|
|
|
|
const cidRegex = /Qm[1-9A-HJ-NP-Za-km-z]{44,}|b[A-Za-z2-7]{58,}|B[A-Z2-7]{58,}|z[1-9A-HJ-NP-Za-km-z]{48,}|F[0-9A-F]{50,}/;
|
|
|
|
const app = express();
|
|
|
|
app.use(cors());
|
|
|
|
app.use(bodyParser.json());
|
|
|
|
app.use(bodyParser.urlencoded({ extended: false }));
|
|
|
|
|
|
|
|
// environment variables
|
|
|
|
const port = process.env.PORT || 3000;
|
|
|
|
const ipfsUrl = process.env.IPFS_URL || 'http://localhost:5001';
|
|
|
|
if (!process.env.API_KEY) throw new Error('API_KEY was missing in env');
|
2024-03-16 03:48:24 +00:00
|
|
|
if (!process.env.PORT) throw new Error('PORT is missing in env');
|
2024-03-14 07:30:49 +00:00
|
|
|
|
|
|
|
// greetz https://stackoverflow.com/a/51302466/1004931
|
|
|
|
async function downloadFile(url, filePath, sse) {
|
|
|
|
console.log(`downloading url=${url} to filePath=${filePath}`);
|
|
|
|
const res = await fetch(url);
|
|
|
|
const fileSize = res.headers.get('content-length');
|
|
|
|
const fileStream = fs.createWriteStream(filePath, { flags: 'wx' });
|
|
|
|
|
|
|
|
let downloadedBytes = 0;
|
|
|
|
const logInterval = 1 * 1024 * 1024; // 1MB in bytes
|
|
|
|
|
|
|
|
const progressLogger = new Transform({
|
|
|
|
transform(chunk, encoding, callback) {
|
|
|
|
downloadedBytes += chunk.length;
|
|
|
|
|
|
|
|
if (downloadedBytes % logInterval < chunk.length) {
|
|
|
|
console.log(`${downloadedBytes / (1024 * 1024)} MB processed`);
|
|
|
|
const progress = (downloadedBytes / fileSize) * 100;
|
|
|
|
console.log(`Download Progress: ${progress.toFixed(2)}%`);
|
|
|
|
sse.write({
|
|
|
|
event: 'dlProgress',
|
|
|
|
data: `${Math.floor(progress)}`
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
this.push(chunk);
|
|
|
|
callback();
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
await pipeline(
|
|
|
|
res.body,
|
|
|
|
progressLogger,
|
|
|
|
fileStream
|
|
|
|
)
|
|
|
|
|
|
|
|
console.log('download finished');
|
|
|
|
|
|
|
|
// verify the file
|
|
|
|
// If we don't, we get text error messages sent to kubo which gets added and it's a bad time.
|
|
|
|
console.log(`fileSize=${fileSize}. downloadedBytes=${downloadedBytes}`);
|
|
|
|
if (fileSize != downloadedBytes) throw new Error('downloadedBytes did not match fileSize');
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async function healthRes(_, res) {
|
|
|
|
const version = await getPackageVersion();
|
|
|
|
res.json({ error: false, message: `*link2cid ${version} pisses on the floor*` });
|
|
|
|
}
|
|
|
|
|
|
|
|
async function getPackageVersion() {
|
|
|
|
const packageJsonFile = await fsp.readFile(path.join(__dirname, 'package.json'), { encoding: 'utf-8' });
|
|
|
|
const json = JSON.parse(packageJsonFile);
|
|
|
|
return json.version;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
*
|
|
|
|
* We use this to upload files and get progress notifications
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
async function streamingPostFetch(
|
|
|
|
url,
|
|
|
|
formData,
|
|
|
|
basename,
|
|
|
|
sse,
|
|
|
|
filesize
|
|
|
|
) {
|
|
|
|
console.log(`streamingPostFetch with url=${url}, formData=${formData.get('file')}, basename=${basename}, sse=${sse}, filesize=${filesize}`);
|
|
|
|
|
|
|
|
try {
|
|
|
|
const res = await fetch(url, {
|
|
|
|
method: 'POST',
|
|
|
|
body: formData
|
|
|
|
});
|
|
|
|
|
|
|
|
if (!res.ok) {
|
|
|
|
throw new Error(`HTTP error! Status-- ${res.status}`);
|
|
|
|
}
|
|
|
|
|
|
|
|
const reader = res.body?.getReader();
|
|
|
|
if (!reader) {
|
|
|
|
throw new Error('Failed to get reader from response body');
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
const { done, value } = await reader.read();
|
|
|
|
|
|
|
|
const chunk = new TextDecoder().decode(value);
|
|
|
|
const lines = chunk.split('\n');
|
|
|
|
for (const line of lines) {
|
|
|
|
const trimmedLine = line.trim()
|
|
|
|
if (!!trimmedLine) {
|
|
|
|
console.log(trimmedLine);
|
|
|
|
const json = JSON.parse(trimmedLine);
|
|
|
|
// console.log(`comparing json.Name=${json.Name} with basename=${basename}`);
|
|
|
|
sse.write({
|
|
|
|
event: 'addProgress',
|
|
|
|
data: `${Math.floor(json?.Size / filesize * 100)}`
|
|
|
|
})
|
|
|
|
if (json.Name === basename && json.Hash && json.Size) {
|
|
|
|
// this is the last chunk
|
|
|
|
return json;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (done) {
|
|
|
|
throw new Error('Response reader finished before receiving a CID which indicates a failiure.');
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
} catch (error) {
|
|
|
|
console.error('An error occurred:', error);
|
|
|
|
throw error;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
function authenticate(req, res, next) {
|
|
|
|
const apiKey = req.query?.token;
|
|
|
|
if (!apiKey) {
|
|
|
|
const msg = `authorization 'token' was missing from query`;
|
|
|
|
console.error(msg);
|
|
|
|
return res.status(401).json({ error: true, message: msg });
|
|
|
|
}
|
|
|
|
if (apiKey !== process.env.API_KEY) {
|
|
|
|
const msg = 'INCORRECT API_KEY (wrong token)';
|
|
|
|
console.error(msg);
|
|
|
|
return res.status(403).json({ error: true, message: msg });
|
|
|
|
} else {
|
|
|
|
next();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
async function getFormStuff(filePath) {
|
|
|
|
const url = `${ipfsUrl}/api/v0/add?progress=false&cid-version=1&pin=true`;
|
|
|
|
const blob = await openAsBlob(filePath);
|
|
|
|
const basename = path.basename(filePath);
|
|
|
|
const filesize = (await stat(filePath)).size;
|
|
|
|
const formData = new FormData();
|
|
|
|
return {
|
|
|
|
url,
|
|
|
|
blob,
|
|
|
|
basename,
|
|
|
|
filesize,
|
|
|
|
formData
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Add a file from URL to IPFS.
|
|
|
|
*
|
|
|
|
* uses SSE to send progress reports as the script
|
|
|
|
* downloads the file to disk and then does `ipfs add`
|
|
|
|
* finally returning a CID
|
|
|
|
*
|
|
|
|
* events:
|
|
|
|
* - heartbeat
|
|
|
|
* - dlProgress
|
|
|
|
* - addProgress
|
|
|
|
* - end
|
|
|
|
*/
|
|
|
|
async function addHandler(req, res) {
|
|
|
|
console.log(`/add`)
|
|
|
|
let url;
|
|
|
|
const urlStr = req.query.url;
|
|
|
|
if (!urlStr) return res.status(400).json({
|
|
|
|
error: 'url was missing from query'
|
|
|
|
});
|
|
|
|
try {
|
|
|
|
url = new URL(urlStr);
|
|
|
|
} catch (e) {
|
|
|
|
return res.status(400).json({
|
|
|
|
error: e?.message
|
|
|
|
})
|
|
|
|
}
|
|
|
|
const timestamp = new Date().valueOf();
|
|
|
|
const fileName = `${timestamp}-${url.pathname.split('/').at(-1)}`;
|
|
|
|
const destinationFilePath = path.join(os.tmpdir(), fileName);
|
|
|
|
|
|
|
|
console.log(`fileName=${fileName}, destinationFilePath=${destinationFilePath}`);
|
|
|
|
|
|
|
|
const sse = new SseStream(req);
|
|
|
|
sse.pipe(res);
|
|
|
|
|
|
|
|
let hbStartTime = new Date();
|
|
|
|
const heartbeat = setInterval(() => {
|
|
|
|
sse.write({
|
|
|
|
event: 'heartbeat',
|
|
|
|
data: `${differenceInSeconds(new Date(), hbStartTime)}`
|
|
|
|
});
|
|
|
|
}, 15000);
|
|
|
|
|
|
|
|
res.on('close', () => {
|
|
|
|
console.log('Connection closed.');
|
|
|
|
clearTimeout(heartbeat);
|
|
|
|
sse.unpipe(res);
|
|
|
|
});
|
|
|
|
|
|
|
|
console.log(`Downloading '${urlStr}' to destinationFilePath=${destinationFilePath}`);
|
|
|
|
await downloadFile(urlStr, destinationFilePath, sse);
|
|
|
|
|
|
|
|
|
|
|
|
sse.write({
|
|
|
|
event: 'dlProgress',
|
|
|
|
data: '100'
|
|
|
|
})
|
|
|
|
|
|
|
|
console.log(`'ipfs add' the file ${destinationFilePath}`);
|
|
|
|
const { url: kuboUrl, blob, basename, filesize, formData } = await getFormStuff(destinationFilePath);
|
|
|
|
|
|
|
|
formData.append('file', blob, basename);
|
|
|
|
|
|
|
|
let cid;
|
|
|
|
try {
|
|
|
|
const output = await streamingPostFetch(kuboUrl, formData, basename, sse, filesize);
|
|
|
|
console.log(`streamingPostFetch output as follows.`);
|
|
|
|
console.log(output);
|
|
|
|
if (!output?.Hash) throw new Error('No CID was received from remote IPFS node.');
|
|
|
|
if (!output?.Size) throw new Error(`'ipfs add' was missing Size in its output.`);
|
|
|
|
// if (output.Size !== filesize) throw new Error(`input and output sizes did not match. Expected output.Size ${output.Size} to equal ${filesize}.`);
|
|
|
|
// console.log(`filesize=${filesize} output.Size=${output.Size}`);
|
|
|
|
cid = output.Hash;
|
|
|
|
|
|
|
|
console.log('cleanup');
|
|
|
|
await rm(destinationFilePath);
|
|
|
|
|
|
|
|
console.log('end SSE');
|
|
|
|
clearTimeout(heartbeat);
|
|
|
|
} catch (e) {
|
|
|
|
return sse.end({
|
|
|
|
event: 'end',
|
|
|
|
error: true,
|
|
|
|
message: e
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return sse.end({
|
|
|
|
event: 'end',
|
|
|
|
data: cid
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
app.get('/', authenticate, healthRes);
|
|
|
|
app.get('/health', healthRes);
|
|
|
|
app.get('/add', authenticate, addHandler);
|
|
|
|
|
|
|
|
app.listen(port, async () => {
|
|
|
|
const version = await getPackageVersion();
|
|
|
|
console.log(`link2cid ${version} listening on port ${port}`);
|
|
|
|
});
|