Socket.js

const debug = require('debug')('udp-messaging:socket');
const stunAlgorithm = require('./Stun');
const DatagramDecode = require('./Protocol');
const MessageSendSupervisor = require('./MessageSendSupervisor');
const MessageRecvSupervisor = require('./MessageRecvSupervisor');
const randomUint32 = require('./RandomUint32');
const {MAX_MTU,PROTOCOL_ID} = require('./Constants');
const dgram = require('dgram');
const EventEmitter = require('events');

/**
 * @class
 * A socket that handles receiving and sending messages.
 * @augments EventEmitter
 */
class MessagingSocket extends EventEmitter {
	/**
	 * @constructor
	 * @param {Object} [options] Pass options to the constructor.
	 * @param {String} [options.ip=0.0.0.0] IP address to bind to.
	 * @param {Number} [options.port=0] Port number to bind to. Defaults to 0 (random free port).
	 * @param {Object} [options.socket=null] Use an existing socket. If null then creates a new _udp4_ socket.
	 * @param {Number} [options.MTU=1400] Use a different MTU value. Messages longer than MTU are divided into subpackets.
	 */
	constructor(options){
		super();
		options = options || {};
		// Initialize variables
		this.port = options.port || 0;
		this.ip = options.ip || '0.0.0.0';
		this.socket = options.socket || null;
		this.pending_sends = [];
		this.pending_recv = [];
		this.pending_recv_map = {};
		this.last_known_ids = [];
		this.last_known_ids_map = {};
		this.last_known_ids_positions = {};
		this.MTU = options.MTU || MAX_MTU;
		this.queue_send = [];
		this.queue_recv = [];
		//this.think_xval = setInterval(() => this.think(), 1);
		this.think_xval = setTimeout(() => this.think(), 1);
		if(options.MTU > MAX_MTU)
			throw new Error('Cannot increase the MTU above '+MAX_MTU);
	}
	/**
	 * Binds the socket and configures datagram handling.
	 * @returns {Promise}
	 */
	bind(){
		return new Promise((res, rej) => {
			if(!this.socket)
				this.socket = dgram.createSocket('udp4');
			const handleErrors = (err) => {
				this.socket = null;
				debug(`Binding to ${this.address}:${this.port} failed with ${err}`);
				rej(err);
			};
			this.socket.once('error', handleErrors);
			debug(`Attempting to bind to ${this.address}:${this.port}`);
			this.socket.bind({
				port: this.port,
				address: this.address,
				exclusive: true
			}, () => {
				this.socket.removeListener('error', handleErrors);
				//this.socket.unref();
				if(this.port && this.port !== this.socket.address().port)
					throw Error('Could not bind to port '+this.port);
				this.port = this.socket.address().port;
				this.socket.on('message', (data, rinfo) => this.queue_recv.push([data, rinfo]));
				debug(`Binding to ${this.address}:${this.port} successful`);
				res();
			});
		});
	}
	/**
	 * Closes the socket, removes all listeners, abandons all Message Supervisors.
	 */
	close(){
		if(this.socket){
			this.socket.close();
			this.socket.removeAllListeners();
		}
		this.removeAllListeners();
		this.pending_sends.forEach(send => send.abandon());
		this.pending_recv.forEach(recv => recv.abandon());
		clearTimeout(this.think_xval);
	}
	/**
	 * Send a raw datagram to a remote host.
	 * @param {Buffer} data The datagram data.
	 * @param {String} address Target host ip address.
	 * @param {Number} port Target host port number.
	 */
	send(data, address, port){
		if(!port || !address)
			throw new Error(`Attempted to send a datagram to an invalid address ${address} port ${port}`);
		if(this.queue_send.length < 10000)
			return new Promise(done => this.queue_send.push([data, address, port, done]));
	}
	think(){
		// Send a packet from the queue_send
		for(let i = 0; i < 20; i++){
			if(this.queue_send.length > 0){
				const q = this.queue_send.shift();
				this.socket.send(q[0], q[2], q[1], q[3]);
			}else{
				break;
			}
		}
		// Process recvd
		for(let i = 0; i < 20; i++){
			if(this.queue_recv.length > 0){
				const q = this.queue_recv.shift();
				this.handleDatagram(q[0], q[1]);
			}else{
				break;
			}
		}
		this.think_xval = setTimeout(() => this.think(), 1);
	}
	/**
	 * Perform a STUN request.
	 * @param {String} [server] The STUN server to use.
	 * @returns {Array} An array containing [ip, port]
	 */
	discoverSelf(server){
		return stunAlgorithm(this.socket, server);
	}
	/**
	 * Handle an incoming datagram.
	 * @param {Buffer} data Raw datagram data.
	 * @param {Object} rinfo Remote host information.
	 * @param {String} rinfo.address Remote host ip address.
	 * @param {Number} rinfo.port Remote host port number.
	 */
	handleDatagram(data, rinfo){
		let datagram;
		try {
			datagram = DatagramDecode.decode(data);
		}catch(err){
			debug(`Unknown datagram from ${rinfo.address}:${rinfo.port} %o`, data);
			return;
		}
		const message = datagram.message;
		debug(`Port ${this.port} received ${data.length} bytes: %O`, message);
		if(message.data_part){
			// Pass to a message supervisor
			// Allow sending a re-send request just in case
			// Allow sending ACKs
			let recv_handler = this.pending_recv_map[message.data_part.message_id];
			if(!recv_handler){
				if(this.last_known_ids_map[message.data_part.message_id]){
					// Send an ACK, supervisor's one might have been lost
					// TODO: Remember which message_ids correspond to rinfos
					//		 Otherwise DDOS risk
					if(this.last_known_ids_positions[message.data_part.message_id]){
						debug(`Re-sending an ACK for message ${message.data_part.message_id}, was probably lost`);
						this.send(DatagramDecode.encode({
							protocol: PROTOCOL_ID,
							message: {
								data_ack: {
									message_id: message.data_part.message_id,
									position: this.last_known_ids_positions[message.data_part.message_id],
								}
							}
						}), rinfo.address, rinfo.port);
					}
					return;
				}
				if(message.data_part.position === 0){
					// This might be a new incomming message
					if(!recv_handler){
						if(this.last_known_ids_map[message.data_part.message_id])
							return;
						if(this.pending_recv.length > 2000)
							return debug(`A new message could be received, but already at max`);
						debug(`Starting a new message receiver supervisor for message ${message.data_part.message_id}`);
						const sup = new MessageRecvSupervisor({
							ip: rinfo.address,
							port: rinfo.port,
							message_id: message.data_part.message_id,
							parent: this,
						});
						this.pending_recv.push(sup);
						this.pending_recv_map[sup.options.message_id] = sup;
						recv_handler = sup;
					}
				}
			}
			// Don't use else since a few lines above a handler is created
			if(recv_handler){
				recv_handler.onDataPart(message.data_part.position, message.data_part.data, message.data_part.is_last);
				if(recv_handler.done){
					if(recv_handler.data){
						debug(`Receiver supervisor for ${message.data_part.message_id} reporting end of work`);
						/**
						 * Full message received.
						 * @event MessagingSocket#message
						 * @property {Buffer} data Message content.
						 * @property {String} ip Sender ip address.
						 * @property {Number} port Sender port number.
						 */
						this.emit('message', recv_handler.data, rinfo.address, rinfo.port);
					}
					// Some receivers might have timed out already
					const pending_recv_to_remove = this.pending_recv.filter(r => r.done);
					pending_recv_to_remove.forEach(p => {
						delete this.pending_recv_map[p.options.message_id];
					});
					this.pending_recv = this.pending_recv.filter(r => !r.done);

					// Remember the id for some more time
					// Prevents duplicates
					if(recv_handler.data){
						this.last_known_ids.push(message.data_part.message_id);
						this.last_known_ids_map[message.data_part.message_id] = true;
						this.last_known_ids_positions[message.data_part.message_id] = recv_handler.data.length;
					}
				}
			}
			// Else unexpected message part anyways, ignore
			return;
		}
		if(message.data_resend){
			for(let send of this.pending_sends){
				if(send.options.send_id == message.data_resend.message_id){
					send.onResendRequest(message.data_resend.position);
					return;
				}
			}
			// Unknown message id
		}
		if(message.data_ack){
			for(let send of this.pending_sends){
				if(send.options.send_id == message.data_ack.message_id){
					send.onDataACK(message.data_ack.position);
					return;
				}
			}
			// Unknown message id
		}
	}
	/**
	 * Send a message to a remote host.
	 * The remote host must also use this library to receive and understand the data.
	 * @param {Buffer} data The message contents.
	 * @param {String} ip Remote host ip address.
	 * @param {Number} port Remote host port number.
	 * @returns {Promise} Message delivery promise. Resolves when a message is fully received _or the socket is closed_. Rejects if the remote end did not respond.
	 */
	sendMessage(data, ip, port){
		return new Promise((res,rej) => {
			// Generate a unique id for this message
			// The while loop will most likely be executed only once
			let send_id;
			while(!send_id){
				send_id = randomUint32();
				if(this.last_known_ids_map[send_id])
					continue;
				for(let send of this.pending_sends){
					if(send.options.send_id === send_id)
						continue;
				}
			}
			if(this.last_known_ids.length >= 10000){
				const key = this.last_known_ids.shift();
				delete this.last_known_ids_map[key];
				delete this.last_known_ids_positions[key];
			}
			const _post = () => {
				this.last_known_ids.push(send_id);
				this.last_known_ids_map[send_id] = true;
				this.last_known_ids_positions[send_id] = 0; // Senders dont need this
				res(...arguments);
			}
			debug(`Sending ${data.length} bytes with id ${send_id}`);
			this.pending_sends.push(
				new MessageSendSupervisor({
					data, ip, port, res: _post, rej, send_id,
					parent: this,
				})
			);
		});
	}
}

module.exports = MessagingSocket;