使用Node从零开始创建一个Torrent应用程序。
学习开发的最佳方法就是动手实践,尝试创建自己想要的东西。在本文中,我将引导你使用 Node.js 和 swenssonp2p 库创建一个最小的 Torrent 应用示例。
强烈建议先阅读并评论我之前关于从零开始构建 p2p 库的文章,以便理解本文。
所以,Torrent 是一种 P2P 网络,允许节点间交换文件。其核心思想是同一个文件可以同时出现在多个节点上,通过分块和分割下载流,节点可以加快文件下载速度。P2P 网络用于交换文件的元数据,而实际的下载则使用单独的 TCP 连接直接连接到种子节点。
本文中我不会实现水蛭,但您稍后可能会在代码库中找到这部分代码。
好的,首先,我需要为最终用户设计一个界面,让他们能够通过这个应用程序共享文件。我决定process.cwd()在应用程序启动时对所有内容进行索引。
为了存储文件,我决定使用 Map,其中文件的哈希值将作为键。我还决定不让这个过程阻塞用户的操作,因此我将索引操作放在一个异步函数中,该函数不会被等待。hashFile具体实现方式取决于你。
const path = require('path');
const { readdir, stat } = require('fs/promises');
const index = new Map();
async function* findFiles (folder) {
for (let filename of await readdir(folder)) {
const filepath = path.resolve(folder, filename);
const filestats = await stat(filepath);
if (filestats.isDirectory()) {
yield* findFiles(filepath);
} else {
yield { path: filepath, size: filestats.size };
}
}
}
;(async () => {
console.log('Start indexing files...');
for await (let { path, size } of findFiles(process.cwd())) {
const [name] = path.split('/').slice(-1);
const hash = await hashFile(path);
index.set(hash, { hash, size, name, path });
}
console.log(`Directory content indexed, ${index.size} files found`);
})();
接下来我想创建一个P2P网络。我使用swenssonp2p并调用该命令createNode。它会在本地设置一个通用的P2P网络节点,之后我运行该命令listen开始接受连接。
启动之后我具体要做什么还不确定,估计有很多事情要做,所以我留下了一个事件发射器调用(socket),之后可以添加监听器。为了能够在监听回调函数调用之前进行订阅,我延迟了监听调用,直到所有同步代码执行完毕。
const EventEmitter = require('events');
const createNode = require('swenssonp2p');
const main = new EventEmitter();
const node = createNode();
const port = Number(process.argv[2]);
setTimeout(() => {
node.listen(port, () => main.emit('startup', port));
}, 0);
节点启动后,我会继续并告知用户他们可以执行哪些操作。我希望使用与聊天应用程序相同的界面(通过 process.stdin 输入命令) ,但我不太清楚具体应该使用哪些命令,所以我也留了一个(实际上是两个)套接字。
main.on('startup', (port) => {
console.log(`Node is up on ${port}.`);
console.log('');
main.emit('help');
process.stdin.on('data', (data) => main.emit('command', data.toString()));
});
第一个命令,以及在聊天应用程序中的命令,都将是connect命令。
main.on('help', () => {
console.log(' write "connect IP:PORT" to connect to other nodes on the network.');
});
main.on('command', (text) => {
if (text.startsWith('connect')) {
const ipport = text.substr(8);
const [ip, port] = ipport.split(':');
console.log(`Connecting to ${ip} at ${Number(port)}...`);
node.connect(ip, Number(port), () => {
console.log(`Connection to ${ip} established.`);
});
}
});
现在我希望用户能够先搜索文件。我目前只实现了按文件名搜索的功能,但你也可以在这个命令中添加其他参数。另外,索引目前对查找文件没有任何帮助,但我们稍后会用到它,我保证。
main.on('help', () => {
console.log(' write "search FILENAME" to look for files.');
});
// Once the command arrives, we broadcast the search message on the network
main.on('command', (text) => {
if (text.startsWith('search')) {
const searchRequest = text.substr(7).trim();
console.log(`Searching for file by "${searchRequest}"...`);
node.broadcast({ type: 'search', meta: searchRequest });
}
});
// Once we receive this message (on another node), we reply with results
node.on('broadcast', ({ origin, message: { type, meta }}) => {
if (type === 'search' && origin !== node.id) {
for (let key of index.keys()) {
const data = index.get(key);
if (data.name.toLowerCase().includes(meta.toLowerCase())) {
node.direct(origin, { type: 'search/response', meta: data });
}
}
}
});
// Once we receive the response from the file holder, we display it
node.on('direct', ({ origin, message: { type, meta: { name, size, hash } }}) => {
if (type === 'search/response') {
console.log(` ${name} ${formatSize(size)} ${hash}`);
}
});
这种乒乓式的流程虽然易于实现,但感觉不太稳定,因为理论上我们可能会search/response在没有发出搜索请求的情况下收到数据,并且仍然会触发 console.log。我不认为这是一个问题,但在这里添加一个安全检查也无妨。
接下来我想做的是让用户能够开始下载。由于哈希值用于索引,我们可以将其用作命令参数,这很合理(就像你可以使用文件哈希值创建磁力链接,并让应用程序下载该链接而无需执行搜索一样)。
我不知道下载开始后该怎么办,所以我留了一个套接字在那里。
main.on('help', () => {
console.log(' write "download HASH" to start downloading file');
});
main.on('command', (text) => {
if (text.startsWith('download')) {
const hash = text.substr(9).trim();
main.emit('download', hash);
}
});
为了下载文件,我们需要与对等节点建立独立的 TCP 连接,并从中请求数据块。数据块的数量和文件名并非我们本地掌握的信息,即使我们可能通过搜索命令获取到这些信息,也无法保证。因此,首先,我希望设置一个 Ping Pong 流程,在开始下载之前交换文件元信息。这与搜索流程类似,但最终我会将交换的信息存储起来,downloads并在信息发生变化时发出事件。
如您所见,交换信息中还包含种子文件的 IP 地址,因此我可以在稍后下载时连接到它的文件服务器。
const downloads = {};
main.on('download', (hash) => {
node.broadcast({ type: 'download', meta: hash });
});
node.on('broadcast', ({ origin, message: { type, meta } }) => {
if (type === 'download' && origin !== node.id) {
const data = index.get(meta);
if (!!data) {
node.direct(origin, { type: 'download/response', meta: { ip: Array.from(node.addresses)[0], hash: data.hash, size: data.size, name: data.name } })
}
}
});
node.on('direct', ({ origin, message: { type, meta } }) => {
if (type === 'download/response') {
if (!downloads[meta.hash]) {
downloads[meta.hash] = {
hash,
name: meta.name,
size: meta.size,
seeds: [meta.ip],
chunks: [],
};
main.emit('download/ready', meta.hash);
} else {
downloads[meta.hash].seeds.push(meta.ip);
main.emit('download/update', meta.hash);
}
}
});
好了,现在该创建 TCP 服务器了,它将响应文件数据请求并发送数据。我们将分块交换数据,因此文件服务器只需要响应一种特定类型的消息并发送一种类型的消息即可。
const FILES_SERVER_PORT = 9019;
const CHUNK_SIZE = 512;
const filesServer = net.createServer((socket) => {
socket.on('data', (data) => {
const { hash, offset } = JSON.parse(data);
const meta = index.get(hash);
const chunk = Buffer.alloc(CHUNK_SIZE);
const file = await open(meta.path, 'r');
await file.read(chunk, 0, CHUNK_SIZE, offset * CHUNK_SIZE);
await file.close();
socket.write(JSON.stringify({ hash, offset, chunk }));
});
}).listen(FILES_SERVER_PORT);
好了,现在该实现实际下载了。我会先响应download/ready事件并创建一个异步循环,它会并行地从种子节点获取数据块,一次获取一个种子节点的数据块,当然你也可以根据需要进行调整。
为了跟踪每个数据块的状态,我chunks在元信息字段中填充了数据块的状态以及它用于从中下载数据的套接字。
main.on('download/ready', async (hash) => {
downloads[hash].chunks = [...new Array(Math.ceil(downloads[hash].size / CHUNK_SIZE))].map(() => ({ state: 0 }));
});
除此之外,我还需要一个临时文件来保存下载内容,我们来分配它并为其创建一个文件句柄。
downloads[hash].path = path.resolve(DOWNLOADS_PATH, `${hash}.download`);
const file = await open(downloads[hash].path, 'w');
现在我需要连接到提供的 IP 地址。downloads我知道download/ready事件触发后已经有一些 IP 地址可用,但我还需要响应download/update事件来更新列表。我为该事件添加了一个监听器,并在下载完成后将其移除。
const sockets = {};
const updateSocketsList = async ($hash) => {
if ($hash !== hash) {
return;
}
for (let ip of downloads[hash].seeds) {
if (!sockets[ip]) {
const socket = new net.Socket();
socket.connect(FILES_SERVER_PORT, ip, () => {
sockets[ip] = { socket, busy: false };
});
}
}
};
updateSocketsList(hash);
main.on('download/update', updateSocketsList);
// ... TODO
main.off('download/update', updateSocketsList);
主循环很简单:我寻找可用的数据块(数据块状态0为“就绪”、1“正在下载”或2“已下载”)和一个空闲的套接字。如果没有可用的套接字(意味着所有套接字都已占用)或数据块(意味着所有数据块都在下载中),则continue延迟 50 毫秒。如果同时存在可用的数据块和套接字,则立即下载,但不会等待下载完成。
while (!!downloads[hash].chunks.find((chunk) => chunk.state !== 2)) {
const availableChunkIndex = downloads[hash].chunks.findIndex((chunk) => chunk.state === 0);
const availableSocket = Object.values(sockets).find(({ busy }) => !busy);
if (!availableSocket || !availableChunkIndex) {
await new Promise((resolve) => setTimeout(() => resolve(), 50));
continue;
}
availableSocket.busy = true;
downloads[hash].chunks[availableChunkIndex].state = 1;
;(async () => {
const chunk = await downloadChunk(availableSocket.socket, hash, availableChunkIndex);
await file.write(Buffer.from(chunk), 0, CHUNK_SIZE, availableChunkIndex * CHUNK_SIZE);
downloads[hash].chunks[availableChunkIndex].state = 2;
availableSocket.busy = false;
})();
}
如您所见,我只需要实现downloadChunk从套接字获取数据的函数。我希望它是一个异步函数,而套接字是一个事件发射器,所以我需要执行以下操作:
const downloadChunk = (socket, hash, offset) => new Promise((resolve) => {
socket.write(JSON.stringify({ hash, offset }));
const listener = (message) => {
if (hash === message.hash && offset === message.offset) {
resolve(message.chunk);
socket.off('data', listener);
}
};
socket.on('data', listener);
});
现在我只需要清理一下,关闭文件句柄,将临时文件重命名为它应该有的文件名,移除监听器download/update并关闭种子套接字。
await file.close();
await rename(downloads[hash].path, path.resolve(DOWNLOADS_PATH, downloads[hash].name));
main.off('download/update', updateSocketsList);
for (let { socket } of Object.values(sockets)) {
socket.destroy();
}
本文将介绍如何使用 Node 和swenssonp2p,用不到 300 行代码创建一个最简单的 Torrent 应用程序。该应用程序的完整代码可以在这里找到。
文章来源:https://dev.to/lxchurbakov/create-a-torrent-application-with-node-from-scratch-1j3e