DEV Community

Aleksandr Churbakov
Aleksandr Churbakov

Posted on

Create a Torrent Application with Node from scratch.

The best way to learn stuff in development is go and try to create your own whatever it is. In this article I'll walk you through creating a minimal example of Torrent application using Node JS and swenssonp2p library.

It is strongly advised to read and comment my previous article about making p2p library from scratch in order to understand this one.


So, Torrent is a P2P network, that lets peers exchange files. The main idea behind it is that one file may appear at different peers and, by chunking and splitting download streams, peers can speed up files download. P2P network is used to exchange meta information about files, while the actual download uses a separate TCP connection directly to the seed.

In this article I won't implement leeches, but you may find this code in the repo later.

Okay, so first of all, I need to come up with some sort of interface for the end user to share the files using this application. I've decided to just index everything inside process.cwd() on application startup.

To store the files I decided to use Map where file's hash will be the key. I also decided that I don't want this process to block user from doing what they want to do and I put indexing inside async function that is not being awaited for. hashFile implementation is up to you.

const path = require('path');
const { readdir, stat } = require('fs/promises');

const index = new Map();

async function* findFiles (folder) {
  for (let filename of await readdir(folder)) {
    const filepath = path.resolve(folder, filename);
    const filestats = await stat(filepath);

    if (filestats.isDirectory()) {
      yield* findFiles(filepath);
    } else {
      yield { path: filepath, size: filestats.size };
    }
  }
}

;(async () => {
  console.log('Start indexing files...');

  for await (let { path, size } of findFiles(process.cwd())) {
    const [name] = path.split('/').slice(-1);
    const hash = await hashFile(path);

    index.set(hash, { hash, size, name, path });
  }

  console.log(`Directory content indexed, ${index.size} files found`);
})();
Enter fullscreen mode Exit fullscreen mode

The next thing I want to do is to create a P2P network. I use swenssonp2p and just call createNode. It will set up a generic p2p network node locally, after that I run listen to start accepting connections.

I don't know exactly what I'll be doing after the startup, I guess there should be many things, so I leave an event emitter call (socket), that I can fill with the listeners later. In order to be able to subscribe before listen callback is called, I delay the listen call until all the synchronous code is completed.

const EventEmitter = require('events');
const createNode = require('swenssonp2p');

const main = new EventEmitter();

const node = createNode();
const port = Number(process.argv[2]);

setTimeout(() => {
  node.listen(port, () => main.emit('startup', port));
}, 0);
Enter fullscreen mode Exit fullscreen mode

After the node is up I proceed and let user know what they can do. I want to use the same interface (typing commands into process.stdin) that I've used in chat application, but I don't know exactly what commands I should have, so I leave a socket (actually two) there as well.

main.on('startup', (port) => {
  console.log(`Node is up on ${port}.`);
  console.log('');

  main.emit('help');

  process.stdin.on('data', (data) => main.emit('command', data.toString()));
});
Enter fullscreen mode Exit fullscreen mode

The first command, as well as in chat application, will be connect command.

main.on('help', () => {
  console.log('  write "connect IP:PORT" to connect to other nodes on the network.');
});

main.on('command', (text) => {
  if (text.startsWith('connect')) {
    const ipport = text.substr(8);
    const [ip, port] = ipport.split(':');

    console.log(`Connecting to ${ip} at ${Number(port)}...`);
    node.connect(ip, Number(port), () => {
      console.log(`Connection to ${ip} established.`);
    });
  }
});
Enter fullscreen mode Exit fullscreen mode

Now I want the user to be able to search for the files first. I will only implement the search by name, but you can add other parameters in this command as well. Also the index does not help us looking for files at all, but we will use it later, I promise.

main.on('help', () => {
  console.log('  write "search FILENAME" to look for files.');
});

// Once the command arrives, we broadcast the search message on the network
main.on('command', (text) => {
  if (text.startsWith('search')) {
    const searchRequest = text.substr(7).trim();

    console.log(`Searching for file by "${searchRequest}"...`);
    node.broadcast({ type: 'search', meta: searchRequest });
  }
});

// Once we receive this message (on another node), we reply with results
node.on('broadcast', ({ origin, message: { type, meta }}) => {
  if (type === 'search' && origin !== node.id) {
    for (let key of index.keys()) {
      const data = index.get(key);

      if (data.name.toLowerCase().includes(meta.toLowerCase())) {
        node.direct(origin, { type: 'search/response', meta: data });
      }
    }
  }
});

// Once we receive the response from the file holder, we display it
node.on('direct', ({ origin, message: { type, meta: { name, size, hash } }}) => {
  if (type === 'search/response') {
    console.log(`  ${name} ${formatSize(size)} ${hash}`);
  }
});
Enter fullscreen mode Exit fullscreen mode

This ping-pong style flow is easy to implement, but feels unstable as we, in theory, can receive search/response when no search has been issued, and it still trigger the console.log. I don't consider this an issue, but a safety check here won't hurt.

The next thing I want to do is that I want the user to be able to start download. Since hash is used for the index, we can use that as command's param, which makes sense (like you can create magnet links with file hashes and ask the application to download that without performing a search).

I don't know what I will do when download start right now, so I leave a socket there.

main.on('help', () => {
  console.log('  write "download HASH" to start downloading file');
});

main.on('command', (text) => {
  if (text.startsWith('download')) {
    const hash = text.substr(9).trim();

    main.emit('download', hash);
  }
});
Enter fullscreen mode Exit fullscreen mode

In order to download file, we should establish a separate TCP connection to the peers and request chunks of data from them. The amount of chunks and the file name are not the information we have locally, even though we may have received it through search command, it's not guaranteed. So first of all, I want to setup a ping pong flow to exchange file meta information before starting download. It will be kinda the same as search flow, but in the end I will store the exchanged information in downloads and emit events once they change.

As you can see, the exchange information also contains the IP address of a seed, so I can connect to it's file server while downloading later.

const downloads = {};

main.on('download', (hash) => {
  node.broadcast({ type: 'download', meta: hash });
});

node.on('broadcast', ({ origin, message: { type, meta } }) => {
  if (type === 'download' && origin !== node.id) {
    const data = index.get(meta);

    if (!!data) {
      node.direct(origin, { type: 'download/response', meta: { ip: Array.from(node.addresses)[0], hash: data.hash, size: data.size, name: data.name } })
    }
  }
});

node.on('direct', ({ origin, message: { type, meta } }) => {
  if (type === 'download/response') {
    if (!downloads[meta.hash]) {
      downloads[meta.hash] = {
        hash,
        name: meta.name,
        size: meta.size,
        seeds: [meta.ip],
        chunks: [],
      };

      main.emit('download/ready', meta.hash);
    } else {
      downloads[meta.hash].seeds.push(meta.ip);
      main.emit('download/update', meta.hash);
    }
  }
});
Enter fullscreen mode Exit fullscreen mode

Okay, now it's time to create TCP server that will react on file data requests and send data. We will exchange data in chunks, so the file server will only need to react on one specific type of message and send one type of message back.

const FILES_SERVER_PORT = 9019;
const CHUNK_SIZE = 512;

const filesServer = net.createServer((socket) => {
  socket.on('data', (data) => {
    const { hash, offset } = JSON.parse(data);
    const meta = index.get(hash);

    const chunk = Buffer.alloc(CHUNK_SIZE);
    const file = await open(meta.path, 'r');

    await file.read(chunk, 0, CHUNK_SIZE, offset * CHUNK_SIZE);
    await file.close();

    socket.write(JSON.stringify({ hash, offset, chunk }));
  });
}).listen(FILES_SERVER_PORT);
Enter fullscreen mode Exit fullscreen mode

Alright, now it's time to implement actual download. I'll start by reacting to download/ready event and making an async loop, that will fetch chunks from seeds in parallel, one chunk by one seed at a time, but you can definitely tweak that.

In order to keep track of what chunk what state is, I fill the chunks field of the meta information with it's status and socket it is using to download data from.

main.on('download/ready', async (hash) => {
  downloads[hash].chunks = [...new Array(Math.ceil(downloads[hash].size / CHUNK_SIZE))].map(() => ({ state: 0 }));
});
Enter fullscreen mode Exit fullscreen mode

In addition to that, I need a temporary file to keep the download, let's assign it and create a file handle for it.

downloads[hash].path = path.resolve(DOWNLOADS_PATH, `${hash}.download`);

const file = await open(downloads[hash].path, 'w');
Enter fullscreen mode Exit fullscreen mode

Now I need to connect to IP addresses provided in downloads I know that once download/ready event is triggered, there are already some, but I also have to react to download/update events to update the list. I attach a listener to this event and detach it when download is finished.

const sockets = {};

const updateSocketsList = async ($hash) => {
  if ($hash !== hash) {
    return;
  }

  for (let ip of downloads[hash].seeds) {
    if (!sockets[ip]) {
      const socket = new net.Socket();

      socket.connect(FILES_SERVER_PORT, ip, () => {
        sockets[ip] = { socket, busy: false };
      });
    }
  }
};

updateSocketsList(hash);

main.on('download/update', updateSocketsList);

// ... TODO

main.off('download/update', updateSocketsList);
Enter fullscreen mode Exit fullscreen mode

The main cycle is pretty simple, I look for an available chunk (chunk state 0 is ready, 1 is being downloaded and 2 is already downloaded) to download and a socket, that is not busy. If there is no socket (meaning all of them are busy) or no chunk (meaning they all are being downloaded), I just continue after 50ms delay. If both available chunk and socket are presented, I download, but do not await for this download to finish.

while (!!downloads[hash].chunks.find((chunk) => chunk.state !== 2)) {
  const availableChunkIndex = downloads[hash].chunks.findIndex((chunk) => chunk.state === 0);
  const availableSocket = Object.values(sockets).find(({ busy }) => !busy);

  if (!availableSocket || !availableChunkIndex) {
    await new Promise((resolve) => setTimeout(() => resolve(), 50));
    continue;
  }

  availableSocket.busy = true;
  downloads[hash].chunks[availableChunkIndex].state = 1;

  ;(async () => {
    const chunk = await downloadChunk(availableSocket.socket, hash, availableChunkIndex);

    await file.write(Buffer.from(chunk), 0, CHUNK_SIZE, availableChunkIndex * CHUNK_SIZE);

    downloads[hash].chunks[availableChunkIndex].state = 2;
    availableSocket.busy = false;
  })();
}
Enter fullscreen mode Exit fullscreen mode

As you can see, I only have to implement the downloadChunk function that will actually grab data from socket. I want it to be an async function, while socket is an event emitter, so I need to do the following:

const downloadChunk = (socket, hash, offset) => new Promise((resolve) => {
  socket.write(JSON.stringify({ hash, offset }));

  const listener = (message) => {
    if (hash === message.hash && offset === message.offset) {
      resolve(message.chunk);
      socket.off('data', listener);
    }
  };

  socket.on('data', listener);
});
Enter fullscreen mode Exit fullscreen mode

Now I only need to cleanup by closing the file handle, renaming the temp file to whatever the filename it should have, removing listeners to download/update and closing seed sockets.

await file.close();
await rename(downloads[hash].path, path.resolve(DOWNLOADS_PATH, downloads[hash].name));

main.off('download/update', updateSocketsList);

for (let { socket } of Object.values(sockets)) {
  socket.destroy();
}
Enter fullscreen mode Exit fullscreen mode

This is how you can make a simplest Torrent application in less that 300 lines of code with Node and swenssonp2p. Full code of this app can be found here.

Top comments (1)

Collapse
 
lxchurbakov profile image
Aleksandr Churbakov • Edited

I've updated the description and deployed a Node of this torrent on the internet. Now you can run the example app and connect to network much easier. More information here github.com/swensson/swenssonp2p/tr...