metamaps--metamaps/realtime/node_modules/socket.io/lib/transport.js

535 lines
10 KiB
JavaScript
Raw Normal View History

/*!
* socket.io-node
* Copyright(c) 2011 LearnBoost <dev@learnboost.com>
* MIT Licensed
*/
/**
* Module dependencies.
*/
var parser = require('./parser');
/**
* Expose the constructor.
*/
exports = module.exports = Transport;
/**
* Transport constructor.
*
* @api public
*/
function Transport (mng, data, req) {
this.manager = mng;
this.id = data.id;
this.disconnected = false;
this.drained = true;
this.handleRequest(req);
};
/**
* Access the logger.
*
* @api public
*/
Transport.prototype.__defineGetter__('log', function () {
return this.manager.log;
});
/**
* Access the store.
*
* @api public
*/
Transport.prototype.__defineGetter__('store', function () {
return this.manager.store;
});
/**
* Handles a request when it's set.
*
* @api private
*/
Transport.prototype.handleRequest = function (req) {
this.log.debug('setting request', req.method, req.url);
this.req = req;
if (req.method == 'GET') {
this.socket = req.socket;
this.open = true;
this.drained = true;
this.setHeartbeatInterval();
this.setHandlers();
this.onSocketConnect();
}
};
/**
* Called when a connection is first set.
*
* @api private
*/
Transport.prototype.onSocketConnect = function () { };
/**
* Sets transport handlers
*
* @api private
*/
Transport.prototype.setHandlers = function () {
var self = this;
// we need to do this in a pub/sub way since the client can POST the message
// over a different socket (ie: different Transport instance)
this.store.subscribe('heartbeat-clear:' + this.id, function () {
self.onHeartbeatClear();
});
this.store.subscribe('disconnect-force:' + this.id, function () {
self.onForcedDisconnect();
});
this.store.subscribe('dispatch:' + this.id, function (packet, volatile) {
self.onDispatch(packet, volatile);
});
this.bound = {
end: this.onSocketEnd.bind(this)
, close: this.onSocketClose.bind(this)
, error: this.onSocketError.bind(this)
, drain: this.onSocketDrain.bind(this)
};
this.socket.on('end', this.bound.end);
this.socket.on('close', this.bound.close);
this.socket.on('error', this.bound.error);
this.socket.on('drain', this.bound.drain);
this.handlersSet = true;
};
/**
* Removes transport handlers
*
* @api private
*/
Transport.prototype.clearHandlers = function () {
if (this.handlersSet) {
this.store.unsubscribe('disconnect-force:' + this.id);
this.store.unsubscribe('heartbeat-clear:' + this.id);
this.store.unsubscribe('dispatch:' + this.id);
this.socket.removeListener('end', this.bound.end);
this.socket.removeListener('close', this.bound.close);
this.socket.removeListener('error', this.bound.error);
this.socket.removeListener('drain', this.bound.drain);
}
};
/**
* Called when the connection dies
*
* @api private
*/
Transport.prototype.onSocketEnd = function () {
this.end('socket end');
};
/**
* Called when the connection dies
*
* @api private
*/
Transport.prototype.onSocketClose = function (error) {
this.end(error ? 'socket error' : 'socket close');
};
/**
* Called when the connection has an error.
*
* @api private
*/
Transport.prototype.onSocketError = function (err) {
if (this.open) {
this.socket.destroy();
this.onClose();
}
this.log.info('socket error ' + err.stack);
};
/**
* Called when the connection is drained.
*
* @api private
*/
Transport.prototype.onSocketDrain = function () {
this.drained = true;
};
/**
* Called upon receiving a heartbeat packet.
*
* @api private
*/
Transport.prototype.onHeartbeatClear = function () {
this.clearHeartbeatTimeout();
this.setHeartbeatInterval();
};
/**
* Called upon a forced disconnection.
*
* @api private
*/
Transport.prototype.onForcedDisconnect = function () {
if (!this.disconnected) {
this.log.info('transport end by forced client disconnection');
if (this.open) {
this.packet({ type: 'disconnect' });
}
this.end('booted');
}
};
/**
* Dispatches a packet.
*
* @api private
*/
Transport.prototype.onDispatch = function (packet, volatile) {
if (volatile) {
this.writeVolatile(packet);
} else {
this.write(packet);
}
};
/**
* Sets the close timeout.
*/
Transport.prototype.setCloseTimeout = function () {
if (!this.closeTimeout) {
var self = this;
this.closeTimeout = setTimeout(function () {
self.log.debug('fired close timeout for client', self.id);
self.closeTimeout = null;
self.end('close timeout');
}, this.manager.get('close timeout') * 1000);
this.log.debug('set close timeout for client', this.id);
}
};
/**
* Clears the close timeout.
*/
Transport.prototype.clearCloseTimeout = function () {
if (this.closeTimeout) {
clearTimeout(this.closeTimeout);
this.closeTimeout = null;
this.log.debug('cleared close timeout for client', this.id);
}
};
/**
* Sets the heartbeat timeout
*/
Transport.prototype.setHeartbeatTimeout = function () {
if (!this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
var self = this;
this.heartbeatTimeout = setTimeout(function () {
self.log.debug('fired heartbeat timeout for client', self.id);
self.heartbeatTimeout = null;
self.end('heartbeat timeout');
}, this.manager.get('heartbeat timeout') * 1000);
this.log.debug('set heartbeat timeout for client', this.id);
}
};
/**
* Clears the heartbeat timeout
*
* @param text
*/
Transport.prototype.clearHeartbeatTimeout = function () {
if (this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
clearTimeout(this.heartbeatTimeout);
this.heartbeatTimeout = null;
this.log.debug('cleared heartbeat timeout for client', this.id);
}
};
/**
* Sets the heartbeat interval. To be called when a connection opens and when
* a heartbeat is received.
*
* @api private
*/
Transport.prototype.setHeartbeatInterval = function () {
if (!this.heartbeatInterval && this.manager.enabled('heartbeats')) {
var self = this;
this.heartbeatInterval = setTimeout(function () {
self.heartbeat();
self.heartbeatInterval = null;
}, this.manager.get('heartbeat interval') * 1000);
this.log.debug('set heartbeat interval for client', this.id);
}
};
/**
* Clears all timeouts.
*
* @api private
*/
Transport.prototype.clearTimeouts = function () {
this.clearCloseTimeout();
this.clearHeartbeatTimeout();
this.clearHeartbeatInterval();
};
/**
* Sends a heartbeat
*
* @api private
*/
Transport.prototype.heartbeat = function () {
if (this.open) {
this.log.debug('emitting heartbeat for client', this.id);
this.packet({ type: 'heartbeat' });
this.setHeartbeatTimeout();
}
return this;
};
/**
* Handles a message.
*
* @param {Object} packet object
* @api private
*/
Transport.prototype.onMessage = function (packet) {
var current = this.manager.transports[this.id];
if ('heartbeat' == packet.type) {
this.log.debug('got heartbeat packet');
if (current && current.open) {
current.onHeartbeatClear();
} else {
this.store.publish('heartbeat-clear:' + this.id);
}
} else {
if ('disconnect' == packet.type && packet.endpoint == '') {
this.log.debug('got disconnection packet');
if (current) {
current.onForcedDisconnect();
} else {
this.store.publish('disconnect-force:' + this.id);
}
return;
}
if (packet.id && packet.ack != 'data') {
this.log.debug('acknowledging packet automatically');
var ack = parser.encodePacket({
type: 'ack'
, ackId: packet.id
, endpoint: packet.endpoint || ''
});
if (current && current.open) {
current.onDispatch(ack);
} else {
this.manager.onClientDispatch(this.id, ack);
this.store.publish('dispatch:' + this.id, ack);
}
}
// handle packet locally or publish it
if (current) {
this.manager.onClientMessage(this.id, packet);
} else {
this.store.publish('message:' + this.id, packet);
}
}
};
/**
* Clears the heartbeat interval
*
* @api private
*/
Transport.prototype.clearHeartbeatInterval = function () {
if (this.heartbeatInterval && this.manager.enabled('heartbeats')) {
clearTimeout(this.heartbeatInterval);
this.heartbeatInterval = null;
this.log.debug('cleared heartbeat interval for client', this.id);
}
};
/**
* Finishes the connection and makes sure client doesn't reopen
*
* @api private
*/
Transport.prototype.disconnect = function (reason) {
this.packet({ type: 'disconnect' });
this.end(reason);
return this;
};
/**
* Closes the connection.
*
* @api private
*/
Transport.prototype.close = function () {
if (this.open) {
this.doClose();
this.onClose();
}
};
/**
* Called upon a connection close.
*
* @api private
*/
Transport.prototype.onClose = function () {
if (this.open) {
this.setCloseTimeout();
this.clearHandlers();
this.open = false;
this.manager.onClose(this.id);
this.store.publish('close', this.id);
}
};
/**
* Cleans up the connection, considers the client disconnected.
*
* @api private
*/
Transport.prototype.end = function (reason) {
if (!this.disconnected) {
this.log.info('transport end (' + reason + ')');
var local = this.manager.transports[this.id];
this.close();
this.clearTimeouts();
this.disconnected = true;
if (local) {
this.manager.onClientDisconnect(this.id, reason, true);
} else {
this.store.publish('disconnect:' + this.id, reason);
}
}
};
/**
* Signals that the transport should pause and buffer data.
*
* @api public
*/
Transport.prototype.discard = function () {
this.log.debug('discarding transport');
this.discarded = true;
this.clearTimeouts();
this.clearHandlers();
return this;
};
/**
* Writes an error packet with the specified reason and advice.
*
* @param {Number} advice
* @param {Number} reason
* @api public
*/
Transport.prototype.error = function (reason, advice) {
this.packet({
type: 'error'
, reason: reason
, advice: advice
});
this.log.warn(reason, advice ? ('client should ' + advice) : '');
this.end('error');
};
/**
* Write a packet.
*
* @api public
*/
Transport.prototype.packet = function (obj) {
return this.write(parser.encodePacket(obj));
};
/**
* Writes a volatile message.
*
* @api private
*/
Transport.prototype.writeVolatile = function (msg) {
if (this.open) {
if (this.drained) {
this.write(msg);
} else {
this.log.debug('ignoring volatile packet, buffer not drained');
}
} else {
this.log.debug('ignoring volatile packet, transport not open');
}
};