'use strict' const debug = require('debug') const log = debug('libp2p:upgrader') log.error = debug('libp2p:upgrader:error') const Multistream = require('multistream-select') const { Connection } = require('libp2p-interfaces/src/connection') const ConnectionStatus = require('libp2p-interfaces/src/connection/status') const PeerId = require('peer-id') const pipe = require('it-pipe') const errCode = require('err-code') const mutableProxy = require('mutable-proxy') const { codes } = require('./errors') /** * @typedef MultiaddrConnection * @property {Function} sink * @property {AsyncIterator} source * @property {*} conn * @property {Multiaddr} remoteAddr */ /** * @typedef CryptoResult * @property {*} conn A duplex iterable * @property {PeerId} remotePeer * @property {string} protocol */ class Upgrader { /** * @param {object} options * @param {PeerId} options.localPeer * @param {Metrics} options.metrics * @param {Map} options.cryptos * @param {Map} options.muxers * @param {function(Connection)} options.onConnection - Called when a connection is upgraded * @param {function(Connection)} options.onConnectionEnd */ constructor ({ localPeer, metrics, cryptos, muxers, onConnectionEnd = () => {}, onConnection = () => {} }) { this.localPeer = localPeer this.metrics = metrics this.cryptos = cryptos || new Map() this.muxers = muxers || new Map() this.protector = null this.protocols = new Map() this.onConnection = onConnection this.onConnectionEnd = onConnectionEnd } /** * Upgrades an inbound connection * * @async * @param {MultiaddrConnection} maConn * @returns {Promise} */ async upgradeInbound (maConn) { let encryptedConn let remotePeer let upgradedConn let Muxer let cryptoProtocol let setPeer let proxyPeer if (this.metrics) { ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() setPeer({ toB58String: () => idString }) maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('Starting the inbound connection upgrade') // Protect let protectedConn = maConn if (this.protector) { protectedConn = await this.protector.protect(maConn) } try { // Encrypt the connection ({ conn: encryptedConn, remotePeer, protocol: cryptoProtocol } = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos)) // Multiplex the connection if (this.muxers.size) { ({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) } else { upgradedConn = encryptedConn } } catch (err) { log.error('Failed to upgrade inbound connection', err) await maConn.close(err) throw err } if (this.metrics) { this.metrics.updatePlaceholder(proxyPeer, remotePeer) setPeer(remotePeer) } log('Successfully upgraded inbound connection') return this._createConnection({ cryptoProtocol, direction: 'inbound', maConn, upgradedConn, Muxer, remotePeer }) } /** * Upgrades an outbound connection * * @async * @param {MultiaddrConnection} maConn * @returns {Promise} */ async upgradeOutbound (maConn) { let remotePeerId try { remotePeerId = PeerId.createFromB58String(maConn.remoteAddr.getPeerId()) } catch (err) { log.error('multiaddr did not contain a valid peer id', err) } let encryptedConn let remotePeer let upgradedConn let cryptoProtocol let Muxer let setPeer let proxyPeer if (this.metrics) { ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() setPeer({ toB58String: () => idString }) maConn = this.metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('Starting the outbound connection upgrade') // Protect let protectedConn = maConn if (this.protector) { protectedConn = await this.protector.protect(maConn) } try { // Encrypt the connection ({ conn: encryptedConn, remotePeer, protocol: cryptoProtocol } = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos)) // Multiplex the connection if (this.muxers.size) { ({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) } else { upgradedConn = encryptedConn } } catch (err) { log.error('Failed to upgrade outbound connection', err) await maConn.close(err) throw err } if (this.metrics) { this.metrics.updatePlaceholder(proxyPeer, remotePeer) setPeer(remotePeer) } log('Successfully upgraded outbound connection') return this._createConnection({ cryptoProtocol, direction: 'outbound', maConn, upgradedConn, Muxer, remotePeer }) } /** * A convenience method for generating a new `Connection` * * @private * @param {object} options * @param {string} options.cryptoProtocol - The crypto protocol that was negotiated * @param {string} options.direction - One of ['inbound', 'outbound'] * @param {MultiaddrConnection} options.maConn - The transport layer connection * @param {*} options.upgradedConn - A duplex connection returned from multiplexer and/or crypto selection * @param {Muxer} options.Muxer - The muxer to be used for muxing * @param {PeerId} options.remotePeer - The peer the connection is with * @returns {Connection} */ _createConnection ({ cryptoProtocol, direction, maConn, upgradedConn, Muxer, remotePeer }) { let muxer let newStream // eslint-disable-next-line prefer-const let connection if (Muxer) { // Create the muxer muxer = new Muxer({ // Run anytime a remote stream is created onStream: async muxedStream => { const mss = new Multistream.Listener(muxedStream) try { const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) log('%s: incoming stream opened on %s', direction, protocol) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) connection.addStream(muxedStream, { protocol }) this._onStream({ connection, stream: { ...muxedStream, ...stream }, protocol }) } catch (err) { log.error(err) } }, // Run anytime a stream closes onStreamEnd: muxedStream => { connection.removeStream(muxedStream.id) } }) newStream = async protocols => { log('%s: starting new stream on %s', direction, protocols) const muxedStream = muxer.newStream() const mss = new Multistream.Dialer(muxedStream) try { const { stream, protocol } = await mss.select(protocols) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) return { stream: { ...muxedStream, ...stream }, protocol } } catch (err) { log.error('could not create new stream', err) throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) } } // Pipe all data through the muxer pipe(upgradedConn, muxer, upgradedConn).catch(log.error) } const _timeline = maConn.timeline maConn.timeline = new Proxy(_timeline, { set: (...args) => { if (connection && args[1] === 'close' && args[2] && !_timeline.close) { // Wait for close to finish before notifying of the closure (async () => { try { if (connection.stat.status === ConnectionStatus.OPEN) { await connection.close() } } catch (err) { log.error(err) } finally { this.onConnectionEnd(connection) } })() } return Reflect.set(...args) } }) maConn.timeline.upgraded = Date.now() const errConnectionNotMultiplexed = () => { throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED') } // Create the connection connection = new Connection({ localAddr: maConn.localAddr, remoteAddr: maConn.remoteAddr, localPeer: this.localPeer, remotePeer: remotePeer, stat: { direction, timeline: maConn.timeline, multiplexer: Muxer && Muxer.multicodec, encryption: cryptoProtocol }, newStream: newStream || errConnectionNotMultiplexed, getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed, close: async (err) => { await maConn.close(err) // Ensure remaining streams are aborted if (muxer) { muxer.streams.map(stream => stream.abort(err)) } } }) this.onConnection(connection) return connection } /** * Routes incoming streams to the correct handler * * @private * @param {object} options * @param {Connection} options.connection - The connection the stream belongs to * @param {Stream} options.stream * @param {string} options.protocol */ _onStream ({ connection, stream, protocol }) { const handler = this.protocols.get(protocol) handler({ connection, stream, protocol }) } /** * Attempts to encrypt the incoming `connection` with the provided `cryptos`. * * @private * @async * @param {PeerId} localPeer - The initiators PeerId * @param {*} connection * @param {Map} cryptos * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptInbound (localPeer, connection, cryptos) { const mss = new Multistream.Listener(connection) const protocols = Array.from(cryptos.keys()) log('handling inbound crypto protocol selection', protocols) try { const { stream, protocol } = await mss.handle(protocols) const crypto = cryptos.get(protocol) log('encrypting inbound connection...') return { ...await crypto.secureInbound(localPeer, stream), protocol } } catch (err) { throw errCode(err, codes.ERR_ENCRYPTION_FAILED) } } /** * Attempts to encrypt the given `connection` with the provided `cryptos`. * The first `Crypto` module to succeed will be used * * @private * @async * @param {PeerId} localPeer - The initiators PeerId * @param {*} connection * @param {PeerId} remotePeerId * @param {Map} cryptos * @returns {CryptoResult} An encrypted connection, remote peer `PeerId` and the protocol of the `Crypto` used */ async _encryptOutbound (localPeer, connection, remotePeerId, cryptos) { const mss = new Multistream.Dialer(connection) const protocols = Array.from(cryptos.keys()) log('selecting outbound crypto protocol', protocols) try { const { stream, protocol } = await mss.select(protocols) const crypto = cryptos.get(protocol) log('encrypting outbound connection to %j', remotePeerId) return { ...await crypto.secureOutbound(localPeer, stream, remotePeerId), protocol } } catch (err) { throw errCode(err, codes.ERR_ENCRYPTION_FAILED) } } /** * Selects one of the given muxers via multistream-select. That * muxer will be used for all future streams on the connection. * * @private * @async * @param {*} connection - A basic duplex connection to multiplex * @param {Map} muxers - The muxers to attempt multiplexing with * @returns {*} A muxed connection */ async _multiplexOutbound (connection, muxers) { const dialer = new Multistream.Dialer(connection) const protocols = Array.from(muxers.keys()) log('outbound selecting muxer %s', protocols) try { const { stream, protocol } = await dialer.select(protocols) log('%s selected as muxer protocol', protocol) const Muxer = muxers.get(protocol) return { stream, Muxer } } catch (err) { throw errCode(err, codes.ERR_MUXER_UNAVAILABLE) } } /** * Registers support for one of the given muxers via multistream-select. The * selected muxer will be used for all future streams on the connection. * * @private * @async * @param {*} connection - A basic duplex connection to multiplex * @param {Map} muxers - The muxers to attempt multiplexing with * @returns {*} A muxed connection */ async _multiplexInbound (connection, muxers) { const listener = new Multistream.Listener(connection) const protocols = Array.from(muxers.keys()) log('inbound handling muxers %s', protocols) try { const { stream, protocol } = await listener.handle(protocols) const Muxer = muxers.get(protocol) return { stream, Muxer } } catch (err) { throw errCode(err, codes.ERR_MUXER_UNAVAILABLE) } } } module.exports = Upgrader