2018-04-06 19:28:04 +10:00
/ *
This Transport layers builds on WebTorrent
Y Lists have listeners and generate events - see docs at ...
* /
// WebTorrent components
const WebTorrent = require ( 'webtorrent' ) ;
const stream = require ( 'readable-stream' ) ;
2018-07-21 11:09:20 -07:00
const Url = require ( 'url' ) ;
2019-05-07 12:02:39 +10:00
const path = require ( 'path' ) ;
const debug = require ( 'debug' ) ( 'dweb-transports:webtorrent' ) ;
2018-04-06 19:28:04 +10:00
// Other Dweb modules
const errors = require ( './Errors' ) ; // Standard Dweb Errors
const Transport = require ( './Transport.js' ) ; // Base class for TransportXyz
const Transports = require ( './Transports' ) ; // Manage all Transports that are loaded
let defaultoptions = {
} ;
class TransportWEBTORRENT extends Transport {
/ *
WebTorrent specific transport
Fields :
webtorrent : object returned when starting webtorrent
* /
2018-08-13 17:55:04 +10:00
constructor ( options ) {
super ( options ) ;
2018-04-06 19:28:04 +10:00
this . webtorrent = undefined ; // Undefined till start WebTorrent
this . options = options ; // Dictionary of options
this . name = "WEBTORRENT" ; // For console log etc
this . supportURLs = [ 'magnet' ] ;
2019-05-07 12:02:39 +10:00
this . supportFunctions = [ 'fetch' , 'createReadStream' , "seed" ] ;
2019-05-30 15:57:37 +10:00
this . supportFeatures = [ 'noCache' ] ; // Note doesnt actually support noCache, but immutable is same
2018-04-06 19:28:04 +10:00
this . status = Transport . STATUS _LOADED ;
}
2018-08-13 17:55:04 +10:00
p _webtorrentstart ( ) {
2018-04-06 19:28:04 +10:00
/ *
Start WebTorrent and wait until for ready .
* /
let self = this ;
return new Promise ( ( resolve , reject ) => {
2018-07-13 13:17:16 -07:00
this . webtorrent = new WebTorrent ( this . options ) ;
2018-04-06 19:28:04 +10:00
this . webtorrent . once ( "ready" , ( ) => {
2019-05-07 12:02:39 +10:00
debug ( "ready" ) ;
2018-04-06 19:28:04 +10:00
resolve ( ) ;
} ) ;
this . webtorrent . once ( "error" , ( err ) => reject ( err ) ) ;
this . webtorrent . on ( "warning" , ( err ) => {
console . warn ( "WebTorrent Torrent WARNING: " + err . message ) ;
} ) ;
} )
}
2019-08-19 17:11:26 +10:00
//TODO-SPLIT define load()
2018-08-13 17:55:04 +10:00
static setup0 ( options ) {
2018-04-06 19:28:04 +10:00
/ *
First part of setup , create obj , add to Transports but dont attempt to connect , typically called instead of p _setup if want to parallelize connections .
* /
2018-07-13 13:17:16 -07:00
let combinedoptions = Transport . mergeoptions ( defaultoptions , options . webtorrent ) ;
2019-05-07 12:02:39 +10:00
debug ( "setup0: options=%o" , combinedoptions ) ;
2018-08-13 17:55:04 +10:00
let t = new TransportWEBTORRENT ( combinedoptions ) ;
2018-04-06 19:28:04 +10:00
Transports . addtransport ( t ) ;
return t ;
}
2018-08-13 17:55:04 +10:00
async p _setup1 ( cb ) {
2018-04-06 19:28:04 +10:00
try {
this . status = Transport . STATUS _STARTING ;
2018-04-08 14:53:19 +10:00
if ( cb ) cb ( this ) ;
2018-08-13 17:55:04 +10:00
await this . p _webtorrentstart ( ) ;
await this . p _status ( ) ;
2018-04-06 19:28:04 +10:00
} catch ( err ) {
2018-08-13 17:55:04 +10:00
console . error ( this . name , "failed to connect" , err . message ) ;
2018-04-06 19:28:04 +10:00
this . status = Transport . STATUS _FAILED ;
}
2018-04-08 14:53:19 +10:00
if ( cb ) cb ( this ) ;
2018-04-06 19:28:04 +10:00
return this ;
}
2019-06-26 20:42:37 +10:00
stop ( refreshstatus , cb ) {
this . webtorrent . destroy ( ( err ) => {
this . status = Transport . STATUS _FAILED ;
if ( refreshstatus ) refreshstatus ( this ) ;
if ( err ) {
debug ( "Webtorrent error during stopping %o" , err ) ;
} else {
debug ( "Webtorrent stopped" ) ;
}
cb ( err , this ) ;
} ) ;
2019-05-07 12:02:39 +10:00
}
2018-08-13 17:55:04 +10:00
async p _status ( ) {
2018-04-06 19:28:04 +10:00
/ *
Return a string for the status of a transport . No particular format , but keep it short as it will probably be in a small area of the screen .
* /
if ( this . webtorrent && this . webtorrent . ready ) {
this . status = Transport . STATUS _CONNECTED ;
} else if ( this . webtorrent ) {
this . status = Transport . STATUS _STARTING ;
} else {
this . status = Transport . STATUS _FAILED ;
}
2018-08-13 17:55:04 +10:00
return super . p _status ( ) ;
2018-04-06 19:28:04 +10:00
}
webtorrentparseurl ( url ) {
/ * P a r s e a U R L
2018-04-16 08:56:13 +10:00
url : URL as string or already parsed into Url - should start magnet : or in future might support dweb : /magnet/ ; some other formats might be supported
2018-04-06 19:28:04 +10:00
returns : torrentid , path
* /
if ( ! url ) {
throw new errors . CodingError ( "TransportWEBTORRENT.p_rawfetch: requires url" ) ;
}
2018-07-13 13:17:16 -07:00
const urlstring = ( typeof url === "string" ? url : url . href ) ;
2018-04-06 19:28:04 +10:00
const index = urlstring . indexOf ( '/' ) ;
if ( index === - 1 ) {
throw new errors . CodingError ( "TransportWEBTORRENT.p_rawfetch: invalid url - missing path component. Should look like magnet:xyzabc/path/to/file" ) ;
}
const torrentId = urlstring . slice ( 0 , index ) ;
const path = urlstring . slice ( index + 1 ) ;
return { torrentId , path }
}
2019-03-19 16:24:11 -07:00
async p _webtorrentadd ( torrentId , opts ) {
2018-04-06 19:28:04 +10:00
return new Promise ( ( resolve , reject ) => {
// Check if this torrentId is already added to the webtorrent client
let torrent = this . webtorrent . get ( torrentId ) ;
// If not, then add the torrentId to the torrent client
if ( ! torrent ) {
2018-07-30 16:51:52 -07:00
// This can be added in to rewrite a known torrent for example to test a different tracker.
//let testid = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&tr=wss%3A%2F%2Ftracker.fastcast.nz&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
//let testidnewtrack = "magnet:?xt=urn:btih:ELHVM7F4VEOTZQFDHCX7OZXUXKINUIPJ&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Fdweb.archive.org:6969&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Fcommute";
//if (torrentId === testid) torrentId = testidnewtrack;
2019-03-19 16:24:11 -07:00
torrent = this . webtorrent . add ( torrentId , opts ) ;
2018-04-06 19:28:04 +10:00
torrent . once ( "error" , ( err ) => {
reject ( new errors . TransportError ( "Torrent encountered a fatal error " + err . message ) ) ;
} ) ;
torrent . on ( "warning" , ( err ) => {
console . warn ( "WebTorrent Torrent WARNING: " + err . message + " (" + torrent . name + ")" ) ;
} ) ;
}
if ( torrent . ready ) {
resolve ( torrent ) ;
} else {
torrent . once ( "ready" , ( ) => {
resolve ( torrent ) ;
} ) ;
}
} ) ;
}
webtorrentfindfile ( torrent , path ) {
/ *
Given a torrent object and a path to a file within the torrent , find the given file .
* /
const filePath = torrent . name + '/' + path ;
const file = torrent . files . find ( file => {
return file . path === filePath ;
} ) ;
if ( ! file ) {
//debugger;
throw new errors . TransportError ( "Requested file (" + path + ") not found within torrent " ) ;
}
return file ;
}
2018-08-13 17:55:04 +10:00
p _rawfetch ( url ) {
2018-04-06 19:28:04 +10:00
/ *
Fetch some bytes based on a url of the form :
magnet : xyzabc / path / to / file
( Where xyzabc is the typical magnet uri contents )
No assumption is made about the data in terms of size or structure . Returns a new Promise that resolves to a buffer .
: param string url : URL of object being retrieved
: resolve buffer : Return the object being fetched .
: throws : TransportError if url invalid - note this happens immediately , not as a catch in the promise
* /
return new Promise ( ( resolve , reject ) => {
2018-08-13 17:55:04 +10:00
// Logged by Transports
2018-04-06 19:28:04 +10:00
const { torrentId , path } = this . webtorrentparseurl ( url ) ;
this . p _webtorrentadd ( torrentId )
. then ( ( torrent ) => {
2018-04-16 08:56:13 +10:00
torrent . deselect ( 0 , torrent . pieces . length - 1 , false ) ; // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
2018-04-06 19:28:04 +10:00
const file = this . webtorrentfindfile ( torrent , path ) ;
file . getBuffer ( ( err , buffer ) => {
if ( err ) {
return reject ( new errors . TransportError ( "Torrent encountered a fatal error " + err . message + " (" + torrent . name + ")" ) ) ;
}
resolve ( buffer ) ;
} ) ;
} )
. catch ( ( err ) => reject ( err ) ) ;
} ) ;
}
2019-05-07 12:02:39 +10:00
seed ( { fileRelativePath , directoryPath , torrentRelativePath } , cb ) {
2019-05-05 21:26:53 +10:00
/* Add a file to webTorrent - this will be called each time a file is cached and adds the torrent to WT handling so its seeding this (and other) files in directory */
2019-05-07 12:02:39 +10:00
if ( ! torrentRelativePath ) { // If no torrentfile available then just skip WebTorrent, MirrorFS will often seed the file (eg to IPFS) while its fetching the torrent and then seed that.
cb ( null ) ;
} else {
const torrentfile = path . join ( directoryPath , torrentRelativePath ) ;
this . p _addTorrentFromTorrentFile ( torrentfile , directoryPath )
. then ( res => { debug ( "Added %s/%s to webtorrent" , directoryPath , fileRelativePath ) ; cb ( null ) } )
. catch ( err => {
if ( err . message . includes ( "Cannot add duplicate torrent" ) ) { // Ignore silently if already added
cb ( null ) ;
} else {
debug ( "addWebTorrent failed %s/%s" , directoryPath , fileRelativePath ) ; cb ( err ) ;
}
} ) ;
}
2019-05-05 21:26:53 +10:00
}
2018-04-16 08:56:13 +10:00
async _p _fileTorrentFromUrl ( url ) {
/ *
Then open a webtorrent for the file specified in the path part of the url
url : of form magnet : ... or magnet / : ...
return : Web Torrent file
* /
2018-04-06 19:28:04 +10:00
try {
const { torrentId , path } = this . webtorrentparseurl ( url ) ;
2018-05-24 15:23:58 -07:00
const torrent = await this . p _webtorrentadd ( torrentId ) ;
2018-04-16 08:56:13 +10:00
torrent . deselect ( 0 , torrent . pieces . length - 1 , false ) ; // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
2018-05-24 15:23:58 -07:00
const file = this . webtorrentfindfile ( torrent , path ) ;
if ( typeof window !== "undefined" ) { // Check running in browser
window . WEBTORRENT _TORRENT = torrent ;
window . WEBTORRENT _FILE = file ;
torrent . once ( 'close' , ( ) => {
window . WEBTORRENT _TORRENT = null ;
window . WEBTORRENT _FILE = null ;
} )
}
return file
2018-04-06 19:28:04 +10:00
} catch ( err ) {
2018-08-13 17:55:04 +10:00
// Logged by Transports
2018-04-06 19:28:04 +10:00
throw ( err ) ;
2018-07-13 13:17:16 -07:00
}
2018-04-06 19:28:04 +10:00
2018-04-16 08:56:13 +10:00
}
2018-04-16 18:58:18 +10:00
2019-03-19 16:24:11 -07:00
async p _addTorrentFromTorrentFile ( torrentFilePath , filesPath ) {
2019-05-22 17:04:33 +10:00
// TODO-API: doc
2019-03-19 16:24:11 -07:00
try {
const opts = { path : filesPath } ;
2019-03-19 18:14:24 -07:00
const oldTorrent = this . webtorrent . get ( torrentFilePath ) ;
2019-03-19 16:24:11 -07:00
if ( oldTorrent ) {
oldTorrent . rescanFiles ( ) ;
} else {
const torrent = await this . p _webtorrentadd ( torrentFilePath , opts ) ;
torrent . deselect ( 0 , torrent . pieces . length - 1 , false ) ; // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
}
} catch ( err ) {
// Logged by Transports
throw ( err ) ;
}
}
2018-08-13 17:55:04 +10:00
async p _f _createReadStream ( url , { wanturl = false } = { } ) {
2018-04-06 19:28:04 +10:00
/ *
Fetch bytes progressively , using a node . js readable stream , based on a url of the form :
2018-04-16 08:56:13 +10:00
No assumption is made about the data in terms of size or structure .
2018-04-06 19:28:04 +10:00
2018-04-16 18:58:18 +10:00
This is the initialisation step , which returns a function suitable for < VIDEO >
2018-04-06 19:28:04 +10:00
2018-04-16 08:56:13 +10:00
Returns a new Promise that resolves to function for a node . js readable stream .
2018-04-06 19:28:04 +10:00
2018-04-16 08:56:13 +10:00
Node . js readable stream docs : https : //nodejs.org/api/stream.html#stream_readable_streams
: param string url : URL of object being retrieved of form magnet : xyzabc / path / to / file ( Where xyzabc is the typical magnet uri contents )
2019-04-14 09:06:03 +10:00
: param boolean wanturl True if want the URL of the stream ( for service workers )
2018-04-16 08:56:13 +10:00
: resolves to : f ( { start , end } ) => stream ( The readable stream . )
2018-04-06 19:28:04 +10:00
: throws : TransportError if url invalid - note this happens immediately , not as a catch in the promise
* /
2018-08-13 17:55:04 +10:00
// Logged by Transports
2018-04-16 08:56:13 +10:00
try {
2018-04-27 11:52:27 +10:00
let filet = await this . _p _fileTorrentFromUrl ( url ) ;
2018-04-16 08:56:13 +10:00
let self = this ;
if ( wanturl ) {
return url ;
} else {
2018-08-13 17:55:04 +10:00
return function ( opts ) { return self . createReadStream ( filet , opts ) ; } ;
2018-04-16 08:56:13 +10:00
}
} catch ( err ) {
2018-08-13 17:55:04 +10:00
// Logged by Transports
2018-04-16 08:56:13 +10:00
throw ( err ) ;
2018-07-13 13:17:16 -07:00
}
2018-04-16 08:56:13 +10:00
}
2018-04-06 19:28:04 +10:00
2018-08-13 17:55:04 +10:00
createReadStream ( file , opts ) {
2018-04-16 08:56:13 +10:00
/ *
The function , encapsulated and inside another function by p _f _createReadStream ( see docs )
: param file : Webtorrent "file" as returned by webtorrentfindfile
: param opts : { start : byte to start from ; end : optional end byte }
: returns stream : The readable stream .
* /
2019-05-07 12:02:39 +10:00
debug ( "reading from stream %s %o" , file . name , opts ) ;
2018-04-27 11:52:27 +10:00
let through ;
2018-04-06 19:28:04 +10:00
try {
2018-04-27 11:52:27 +10:00
through = new stream . PassThrough ( ) ;
2018-04-06 19:28:04 +10:00
const fileStream = file . createReadStream ( opts ) ;
fileStream . pipe ( through ) ;
return through ;
} catch ( err ) {
2019-05-07 12:02:39 +10:00
debug ( "createReadStream error %s" , err ) ;
2018-04-27 11:52:27 +10:00
if ( typeof through . destroy === 'function' )
2018-07-13 13:17:16 -07:00
through . destroy ( err ) ;
2018-04-06 19:28:04 +10:00
else through . emit ( 'error' , err )
2018-04-17 10:45:02 +10:00
}
2018-04-06 19:28:04 +10:00
}
2018-08-13 17:55:04 +10:00
async p _createReadableStream ( url , opts ) {
2018-04-17 10:45:02 +10:00
//Return a readable stream (suitable for a HTTP response) from a node type stream from webtorrent.
2018-06-13 09:34:27 -07:00
// This is used by dweb-serviceworker for WebTorrent only
2018-04-16 08:56:13 +10:00
let filet = await this . _p _fileTorrentFromUrl ( url ) ;
2018-04-17 10:45:02 +10:00
return new ReadableStream ( {
start ( controller ) {
2019-05-07 12:02:39 +10:00
debug ( "start %s %o" , url , opts ) ;
2018-04-17 10:45:02 +10:00
// Create a webtorrent file stream
const filestream = filet . createReadStream ( opts ) ;
// When data comes out of webtorrent node.js style stream, put it into the WHATWG stream
filestream . on ( 'data' , value => {
controller . enqueue ( value )
} ) ;
filestream . on ( 'end' , ( ) => {
controller . close ( )
} )
} ,
cancel ( reason ) {
throw new errors . TransportError ( ` cancelled ${ url } , ${ opts } ${ reason } ` ) ;
}
} ) ;
2018-04-16 08:56:13 +10:00
}
2018-04-17 10:45:02 +10:00
2018-08-13 17:55:04 +10:00
static async p _test ( opts ) {
2018-04-06 19:28:04 +10:00
try {
2018-08-13 17:55:04 +10:00
let transport = await this . p _setup ( opts ) ; // Assumes IPFS already setup
console . log ( transport . name , "p_test setup" , opts , "complete" ) ;
let res = await transport . p _status ( ) ;
2018-04-17 10:45:02 +10:00
console . assert ( res === Transport . STATUS _CONNECTED ) ;
2018-04-06 19:28:04 +10:00
// Creative commons torrent, copied from https://webtorrent.io/free-torrents
let bigBuckBunny = 'magnet:?xt=urn:btih:dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c&dn=Big+Buck+Bunny&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&ws=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2F&xs=https%3A%2F%2Fwebtorrent.io%2Ftorrents%2Fbig-buck-bunny.torrent/Big Buck Bunny.en.srt' ;
2018-08-13 17:55:04 +10:00
let data1 = await transport . p _rawfetch ( bigBuckBunny ) ;
2018-04-06 19:28:04 +10:00
data1 = data1 . toString ( ) ;
assertData ( data1 ) ;
2018-08-13 17:55:04 +10:00
const stream = await transport . createReadStream ( bigBuckBunny ) ;
2018-04-06 19:28:04 +10:00
const chunks = [ ] ;
stream . on ( "data" , ( chunk ) => {
chunks . push ( chunk ) ;
} ) ;
stream . on ( "end" , ( ) => {
const data2 = Buffer . concat ( chunks ) . toString ( ) ;
assertData ( data2 ) ;
} ) ;
function assertData ( data ) {
// Test for a string that is contained within the file
let expectedWithinData = "00:00:02,000 --> 00:00:05,000" ;
console . assert ( data . indexOf ( expectedWithinData ) !== - 1 , "Should fetch 'Big Buck Bunny.en.srt' from the torrent" ) ;
// Test that the length is what we expect
console . assert ( data . length , 129 , "'Big Buck Bunny.en.srt' was " + data . length ) ;
}
} catch ( err ) {
2018-08-13 17:55:04 +10:00
console . log ( "Exception thrown in" , transport . name , "p_test():" , err . message ) ;
2018-04-06 19:28:04 +10:00
throw err ;
}
}
}
Transports . _transportclasses [ "WEBTORRENT" ] = TransportWEBTORRENT ;
exports = module . exports = TransportWEBTORRENT ;