fp/packages/link2cid/index.js

285 lines
8.5 KiB
JavaScript
Raw Normal View History

2024-03-14 07:30:49 +00:00
'use strict';
require('dotenv').config();
const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');
const fs = require('fs');
const fsp = require('fs/promises');
const { openAsBlob } = require('node:fs');
const { rm, stat } = require('fs/promises');
const os = require('os');
const path = require('path');
const SseStream = require('ssestream').default;
const { Transform, Readable } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const { differenceInSeconds } = require('date-fns');
const cidRegex = /Qm[1-9A-HJ-NP-Za-km-z]{44,}|b[A-Za-z2-7]{58,}|B[A-Z2-7]{58,}|z[1-9A-HJ-NP-Za-km-z]{48,}|F[0-9A-F]{50,}/;
const app = express();
app.use(cors());
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: false }));
// environment variables
const port = process.env.PORT || 3000;
const ipfsUrl = process.env.IPFS_URL || 'http://localhost:5001';
if (!process.env.API_KEY) throw new Error('API_KEY was missing in env');
2024-03-16 03:48:24 +00:00
if (!process.env.PORT) throw new Error('PORT is missing in env');
2024-03-14 07:30:49 +00:00
// greetz https://stackoverflow.com/a/51302466/1004931
async function downloadFile(url, filePath, sse) {
console.log(`downloading url=${url} to filePath=${filePath}`);
const res = await fetch(url);
const fileSize = res.headers.get('content-length');
const fileStream = fs.createWriteStream(filePath, { flags: 'wx' });
let downloadedBytes = 0;
const logInterval = 1 * 1024 * 1024; // 1MB in bytes
const progressLogger = new Transform({
transform(chunk, encoding, callback) {
downloadedBytes += chunk.length;
if (downloadedBytes % logInterval < chunk.length) {
console.log(`${downloadedBytes / (1024 * 1024)} MB processed`);
const progress = (downloadedBytes / fileSize) * 100;
console.log(`Download Progress: ${progress.toFixed(2)}%`);
sse.write({
event: 'dlProgress',
data: `${Math.floor(progress)}`
});
}
this.push(chunk);
callback();
}
});
await pipeline(
res.body,
progressLogger,
fileStream
)
console.log('download finished');
// verify the file
// If we don't, we get text error messages sent to kubo which gets added and it's a bad time.
console.log(`fileSize=${fileSize}. downloadedBytes=${downloadedBytes}`);
if (fileSize != downloadedBytes) throw new Error('downloadedBytes did not match fileSize');
}
async function healthRes(_, res) {
const version = await getPackageVersion();
res.json({ error: false, message: `*link2cid ${version} pisses on the floor*` });
}
async function getPackageVersion() {
const packageJsonFile = await fsp.readFile(path.join(__dirname, 'package.json'), { encoding: 'utf-8' });
const json = JSON.parse(packageJsonFile);
return json.version;
}
/**
*
* We use this to upload files and get progress notifications
*
*/
async function streamingPostFetch(
url,
formData,
basename,
sse,
filesize
) {
console.log(`streamingPostFetch with url=${url}, formData=${formData.get('file')}, basename=${basename}, sse=${sse}, filesize=${filesize}`);
try {
const res = await fetch(url, {
method: 'POST',
body: formData
});
if (!res.ok) {
throw new Error(`HTTP error! Status-- ${res.status}`);
}
const reader = res.body?.getReader();
if (!reader) {
throw new Error('Failed to get reader from response body');
}
while (true) {
const { done, value } = await reader.read();
const chunk = new TextDecoder().decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
const trimmedLine = line.trim()
if (!!trimmedLine) {
console.log(trimmedLine);
const json = JSON.parse(trimmedLine);
// console.log(`comparing json.Name=${json.Name} with basename=${basename}`);
sse.write({
event: 'addProgress',
data: `${Math.floor(json?.Size / filesize * 100)}`
})
if (json.Name === basename && json.Hash && json.Size) {
// this is the last chunk
return json;
}
}
}
if (done) {
throw new Error('Response reader finished before receiving a CID which indicates a failiure.');
}
}
} catch (error) {
console.error('An error occurred:', error);
throw error;
}
}
function authenticate(req, res, next) {
const apiKey = req.query?.token;
if (!apiKey) {
const msg = `authorization 'token' was missing from query`;
console.error(msg);
return res.status(401).json({ error: true, message: msg });
}
if (apiKey !== process.env.API_KEY) {
const msg = 'INCORRECT API_KEY (wrong token)';
console.error(msg);
return res.status(403).json({ error: true, message: msg });
} else {
next();
}
}
async function getFormStuff(filePath) {
const url = `${ipfsUrl}/api/v0/add?progress=false&cid-version=1&pin=true`;
const blob = await openAsBlob(filePath);
const basename = path.basename(filePath);
const filesize = (await stat(filePath)).size;
const formData = new FormData();
return {
url,
blob,
basename,
filesize,
formData
}
}
/**
* Add a file from URL to IPFS.
*
* uses SSE to send progress reports as the script
* downloads the file to disk and then does `ipfs add`
* finally returning a CID
*
* events:
* - heartbeat
* - dlProgress
* - addProgress
* - end
*/
async function addHandler(req, res) {
console.log(`/add`)
let url;
const urlStr = req.query.url;
if (!urlStr) return res.status(400).json({
error: 'url was missing from query'
});
try {
url = new URL(urlStr);
} catch (e) {
return res.status(400).json({
error: e?.message
})
}
const timestamp = new Date().valueOf();
const fileName = `${timestamp}-${url.pathname.split('/').at(-1)}`;
const destinationFilePath = path.join(os.tmpdir(), fileName);
console.log(`fileName=${fileName}, destinationFilePath=${destinationFilePath}`);
const sse = new SseStream(req);
sse.pipe(res);
let hbStartTime = new Date();
const heartbeat = setInterval(() => {
sse.write({
event: 'heartbeat',
data: `${differenceInSeconds(new Date(), hbStartTime)}`
});
}, 15000);
res.on('close', () => {
console.log('Connection closed.');
clearTimeout(heartbeat);
sse.unpipe(res);
});
console.log(`Downloading '${urlStr}' to destinationFilePath=${destinationFilePath}`);
await downloadFile(urlStr, destinationFilePath, sse);
sse.write({
event: 'dlProgress',
data: '100'
})
console.log(`'ipfs add' the file ${destinationFilePath}`);
const { url: kuboUrl, blob, basename, filesize, formData } = await getFormStuff(destinationFilePath);
formData.append('file', blob, basename);
let cid;
try {
const output = await streamingPostFetch(kuboUrl, formData, basename, sse, filesize);
console.log(`streamingPostFetch output as follows.`);
console.log(output);
if (!output?.Hash) throw new Error('No CID was received from remote IPFS node.');
if (!output?.Size) throw new Error(`'ipfs add' was missing Size in its output.`);
// if (output.Size !== filesize) throw new Error(`input and output sizes did not match. Expected output.Size ${output.Size} to equal ${filesize}.`);
// console.log(`filesize=${filesize} output.Size=${output.Size}`);
cid = output.Hash;
console.log('cleanup');
await rm(destinationFilePath);
console.log('end SSE');
clearTimeout(heartbeat);
} catch (e) {
return sse.end({
event: 'end',
error: true,
message: e
})
}
return sse.end({
event: 'end',
data: cid
})
}
app.get('/', authenticate, healthRes);
app.get('/health', healthRes);
app.get('/add', authenticate, addHandler);
app.listen(port, async () => {
const version = await getPackageVersion();
console.log(`link2cid ${version} listening on port ${port}`);
});