mirror of
https://github.com/fluencelabs/dweb-transports
synced 2025-03-15 18:30:49 +00:00
SW WebTorrent partial
This commit is contained in:
parent
d527236697
commit
a689e730d5
5
API.md
5
API.md
@ -225,10 +225,11 @@ returns: Dictionary of Key:Value pairs, note take care if this could
|
|||||||
```
|
```
|
||||||
|
|
||||||
###Transports - other functions
|
###Transports - other functions
|
||||||
#####static async p_f_createReadStream(url, verbose, options)
|
#####static async p_f_createReadStream(url, {wanturl, verbose})
|
||||||
Provide a function of the form needed by <VIDEO> tag and renderMedia library etc
|
Provide a function of the form needed by <VIDEO> tag and renderMedia library etc
|
||||||
```
|
```
|
||||||
url Urlsof stream
|
url Urlsof stream
|
||||||
|
wanturl True if want the URL of the stream (for service workers)
|
||||||
returns f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
|
returns f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -353,7 +354,7 @@ static async p_rawlist(urls, {verbose})|[sigs]|Tries all and combines results
|
|||||||
static async p_rawadd(urls, sig, {verbose})||Tries on all urls, error if none succeed
|
static async p_rawadd(urls, sig, {verbose})||Tries on all urls, error if none succeed
|
||||||
static listmonitor(urls, cb)||Tries on all urls (so note cb may be called multiple times)
|
static listmonitor(urls, cb)||Tries on all urls (so note cb may be called multiple times)
|
||||||
static p_newlisturls(cl, {verbose})|[urls]|Tries all and combines results
|
static p_newlisturls(cl, {verbose})|[urls]|Tries all and combines results
|
||||||
static async p_f_createReadStream(urls, verbose, options)|f(opts)=>stream|Returns first success
|
static async p_f_createReadStream(urls, options)|f(opts)=>stream|Returns first success
|
||||||
static async p_get(urls, keys, {verbose})|currently returns on first success, TODO - will combine results and relay across transports
|
static async p_get(urls, keys, {verbose})|currently returns on first success, TODO - will combine results and relay across transports
|
||||||
static async p_set(urls, keyvalues, value, {verbose})|Tries all, error if none succeed
|
static async p_set(urls, keyvalues, value, {verbose})|Tries all, error if none succeed
|
||||||
static async p_delete(urls, keys, {verbose})|Tries all, error if none succeed
|
static async p_delete(urls, keys, {verbose})|Tries all, error if none succeed
|
||||||
|
@ -309,7 +309,7 @@ class TransportIPFS extends Transport {
|
|||||||
console.log(err.message);
|
console.log(err.message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async p_f_createReadStream(url, verbose=false) { // Asynchronously return a function that can be used in createReadStream TODO-API
|
async p_f_createReadStream(url, {verbose=false}={}) { // Asynchronously return a function that can be used in createReadStream TODO-API
|
||||||
verbose = true;
|
verbose = true;
|
||||||
if (verbose) console.log("p_f_createReadStream",url);
|
if (verbose) console.log("p_f_createReadStream",url);
|
||||||
const mh = TransportIPFS.multihashFrom(url);
|
const mh = TransportIPFS.multihashFrom(url);
|
||||||
|
@ -96,14 +96,14 @@ class TransportWEBTORRENT extends Transport {
|
|||||||
|
|
||||||
webtorrentparseurl(url) {
|
webtorrentparseurl(url) {
|
||||||
/* Parse a URL
|
/* Parse a URL
|
||||||
url: URL as string or already parsed into Url
|
url: URL as string or already parsed into Url - should start magnet: or in future might support dweb:/magnet/; some other formats might be supported
|
||||||
returns: torrentid, path
|
returns: torrentid, path
|
||||||
*/
|
*/
|
||||||
if (!url) {
|
if (!url) {
|
||||||
throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: requires url");
|
throw new errors.CodingError("TransportWEBTORRENT.p_rawfetch: requires url");
|
||||||
}
|
}
|
||||||
|
|
||||||
const urlstring = typeof url === "string" ? url : url.href
|
const urlstring = (typeof url === "string" ? url : url.href)
|
||||||
const index = urlstring.indexOf('/');
|
const index = urlstring.indexOf('/');
|
||||||
|
|
||||||
if (index === -1) {
|
if (index === -1) {
|
||||||
@ -159,12 +159,10 @@ class TransportWEBTORRENT extends Transport {
|
|||||||
const file = torrent.files.find(file => {
|
const file = torrent.files.find(file => {
|
||||||
return file.path === filePath;
|
return file.path === filePath;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!file) {
|
if (!file) {
|
||||||
//debugger;
|
//debugger;
|
||||||
throw new errors.TransportError("Requested file (" + path + ") not found within torrent ");
|
throw new errors.TransportError("Requested file (" + path + ") not found within torrent ");
|
||||||
}
|
}
|
||||||
|
|
||||||
return file;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,7 +187,7 @@ class TransportWEBTORRENT extends Transport {
|
|||||||
const { torrentId, path } = this.webtorrentparseurl(url);
|
const { torrentId, path } = this.webtorrentparseurl(url);
|
||||||
this.p_webtorrentadd(torrentId)
|
this.p_webtorrentadd(torrentId)
|
||||||
.then((torrent) => {
|
.then((torrent) => {
|
||||||
torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want
|
torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
|
||||||
const file = this.webtorrentfindfile(torrent, path);
|
const file = this.webtorrentfindfile(torrent, path);
|
||||||
file.getBuffer((err, buffer) => {
|
file.getBuffer((err, buffer) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
@ -202,16 +200,50 @@ class TransportWEBTORRENT extends Transport {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async p_f_createReadStream(url, verbose) { //TODO-API
|
async _p_fileTorrentFromUrl(url) {
|
||||||
if (verbose) console.log("TransportWEBTORRENT p_f_createreadstream %o", url);
|
/*
|
||||||
|
Then open a webtorrent for the file specified in the path part of the url
|
||||||
|
url: of form magnet:... or magnet/:...
|
||||||
|
return: Web Torrent file
|
||||||
|
*/
|
||||||
try {
|
try {
|
||||||
const {torrentId, path} = this.webtorrentparseurl(url);
|
const {torrentId, path} = this.webtorrentparseurl(url);
|
||||||
let torrent = await this.p_webtorrentadd(torrentId);
|
let torrent = await this.p_webtorrentadd(torrentId);
|
||||||
let filet = this.webtorrentfindfile(torrent, path);
|
torrent.deselect(0, torrent.pieces.length - 1, false); // Dont download entire torrent as will pull just the file we want (warning - may give problems if multiple reads from same webtorrent)
|
||||||
|
return this.webtorrentfindfile(torrent, path);
|
||||||
|
} catch(err) {
|
||||||
|
console.log(`p_fileFrom failed on ${url} ${err.message}`);
|
||||||
|
throw(err);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
async p_f_createReadStream(url, {verbose=false, wanturl=false}={}) { //TODO-API
|
||||||
|
/*
|
||||||
|
Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
|
||||||
|
|
||||||
|
|
||||||
|
No assumption is made about the data in terms of size or structure.
|
||||||
|
|
||||||
|
This is the initializtion step, which returns a function suitable for <VIDEO>
|
||||||
|
|
||||||
|
Returns a new Promise that resolves to function for a node.js readable stream.
|
||||||
|
|
||||||
|
Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
|
||||||
|
|
||||||
|
:param string url: URL of object being retrieved of form magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
|
||||||
|
:param boolean verbose: true for debugging output
|
||||||
|
:resolves to: f({start, end}) => stream (The readable stream.)
|
||||||
|
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
|
||||||
|
*/
|
||||||
|
if (verbose) console.log("TransportWEBTORRENT p_f_createreadstream %o", url);
|
||||||
|
try {
|
||||||
|
let filet = this._p_fileTorrentFromUrl(url);
|
||||||
let self = this;
|
let self = this;
|
||||||
return function (opts) {
|
if (wanturl) {
|
||||||
return self.createReadStream(filet, opts, verbose);
|
return url;
|
||||||
};
|
} else {
|
||||||
|
return function (opts) { return self.createReadStream(filet, opts, verbose); };
|
||||||
|
}
|
||||||
} catch(err) {
|
} catch(err) {
|
||||||
console.log(`p_f_createReadStream failed on ${url} ${err.message}`);
|
console.log(`p_f_createReadStream failed on ${url} ${err.message}`);
|
||||||
throw(err);
|
throw(err);
|
||||||
@ -220,35 +252,31 @@ class TransportWEBTORRENT extends Transport {
|
|||||||
|
|
||||||
createReadStream(file, opts, verbose) {
|
createReadStream(file, opts, verbose) {
|
||||||
/*
|
/*
|
||||||
Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
|
The function, encapsulated and inside another function by p_f_createReadStream (see docs)
|
||||||
|
|
||||||
magnet:xyzabc/path/to/file
|
:param file: Webtorrent "file" as returned by webtorrentfindfile
|
||||||
|
:param opts: { start: byte to start from; end: optional end byte }
|
||||||
(Where xyzabc is the typical magnet uri contents)
|
|
||||||
|
|
||||||
No assumption is made about the data in terms of size or structure. Returns a new Promise that resolves to a node.js readable stream.
|
|
||||||
|
|
||||||
Node.js readable stream docs:
|
|
||||||
https://nodejs.org/api/stream.html#stream_readable_streams
|
|
||||||
|
|
||||||
:param string url: URL of object being retrieved
|
|
||||||
:param boolean verbose: true for debugging output
|
:param boolean verbose: true for debugging output
|
||||||
:returns stream: The readable stream.
|
:returns stream: The readable stream.
|
||||||
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
|
|
||||||
*/
|
*/
|
||||||
if (verbose) console.log("TransportWEBTORRENT createreadstream %o %o", file.name, opts);
|
if (verbose) console.log("TransportWEBTORRENT createreadstream %o %o", file.name, opts);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const through = new stream.PassThrough();
|
const through = new stream.PassThrough();
|
||||||
const fileStream = file.createReadStream(opts);
|
const fileStream = file.createReadStream(opts);
|
||||||
fileStream.pipe(through);
|
fileStream.pipe(through);
|
||||||
return through;
|
return through;
|
||||||
} catch(err) {
|
} catch(err) {
|
||||||
|
console.log("TransportWEBTORRENT caught error", err)
|
||||||
if (typeof through.destroy === 'function') through.destroy(err)
|
if (typeof through.destroy === 'function') through.destroy(err)
|
||||||
else through.emit('error', err)
|
else through.emit('error', err)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async p_createReadStream(url, opts, verbose) {
|
||||||
|
let filet = await this._p_fileTorrentFromUrl(url);
|
||||||
|
return this.createReadStream(filet, opts, verbose);
|
||||||
|
}
|
||||||
|
|
||||||
static async p_test(opts, verbose) {
|
static async p_test(opts, verbose) {
|
||||||
try {
|
try {
|
||||||
let transport = await this.p_setup(opts, verbose); // Assumes IPFS already setup
|
let transport = await this.p_setup(opts, verbose); // Assumes IPFS already setup
|
||||||
|
@ -33,7 +33,10 @@ class Transports {
|
|||||||
*/
|
*/
|
||||||
return this._transports.map((t) => { return {"name": t.name, "status": t.status}})
|
return this._transports.map((t) => { return {"name": t.name, "status": t.status}})
|
||||||
}
|
}
|
||||||
|
static async p_urlsValidFor(urls, func, options) {
|
||||||
|
// Need a async version of this for serviceworker and TransportsProxy
|
||||||
|
return this.validFor(urls, func, options).map((ut) => ut[0]);
|
||||||
|
}
|
||||||
static validFor(urls, func, options) {
|
static validFor(urls, func, options) {
|
||||||
/*
|
/*
|
||||||
Finds an array or Transports that can support this URL.
|
Finds an array or Transports that can support this URL.
|
||||||
@ -65,6 +68,11 @@ class Transports {
|
|||||||
return Transports._connected().find((t) => t.name === "IPFS")
|
return Transports._connected().find((t) => t.name === "IPFS")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static webtorrent(verbose) {
|
||||||
|
// Find an ipfs transport if it exists, so for example ServiceWorker.p_respondWebTorrent can use it.
|
||||||
|
return Transports._connected().find((t) => t.name === "WEBTORRENT")
|
||||||
|
}
|
||||||
|
|
||||||
static async p_resolveNames(urls) {
|
static async p_resolveNames(urls) {
|
||||||
/* If and only if TransportNAME was loaded (it might not be as it depends on higher level classes like Domain and SmartDict)
|
/* If and only if TransportNAME was loaded (it might not be as it depends on higher level classes like Domain and SmartDict)
|
||||||
then resolve urls that might be names, returning a modified array.
|
then resolve urls that might be names, returning a modified array.
|
||||||
@ -233,12 +241,12 @@ class Transports {
|
|||||||
|
|
||||||
// Stream handling ===========================================
|
// Stream handling ===========================================
|
||||||
|
|
||||||
static async p_f_createReadStream(urls, verbose, options) { // Note options is options for selecting a stream, not the start/end in a createReadStream call
|
static async p_f_createReadStream(urls, {verbose=false, wanturl=false}={}) { // Note options is options for selecting a stream, not the start/end in a createReadStream call
|
||||||
/*
|
/*
|
||||||
urls: Urls of the stream
|
urls: Urls of the stream
|
||||||
returns: f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
|
returns: f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
|
||||||
*/
|
*/
|
||||||
let tt = this.validFor(urls, "createReadStream", options); //[ [Url,t],[Url,t]] // Passing options - most callers will ignore TODO-STREAM support options in validFor
|
let tt = this.validFor(urls, "createReadStream", {}); //[ [Url,t],[Url,t]] // Can pass options TODO-STREAM support options in validFor
|
||||||
if (!tt.length) {
|
if (!tt.length) {
|
||||||
throw new errors.TransportError("Transports.p_createReadStream cant find any transport for urls: " + urls);
|
throw new errors.TransportError("Transports.p_createReadStream cant find any transport for urls: " + urls);
|
||||||
}
|
}
|
||||||
@ -246,7 +254,7 @@ class Transports {
|
|||||||
let errs = [];
|
let errs = [];
|
||||||
for (const [url, t] of tt) {
|
for (const [url, t] of tt) {
|
||||||
try {
|
try {
|
||||||
return await t.p_f_createReadStream(url, verbose);
|
return await t.p_f_createReadStream(url, {verbose, wanturl} );
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
errs.push(err);
|
errs.push(err);
|
||||||
console.log("Could not retrieve ", url.href, "from", t.name, err.message);
|
console.log("Could not retrieve ", url.href, "from", t.name, err.message);
|
||||||
@ -510,12 +518,10 @@ class Transports {
|
|||||||
if (! tabbrevs.length) { tabbrevs = ["HTTP", "YJS", "IPFS", "WEBTORRENT"]; }
|
if (! tabbrevs.length) { tabbrevs = ["HTTP", "YJS", "IPFS", "WEBTORRENT"]; }
|
||||||
tabbrevs = tabbrevs.map(n => n.toUpperCase());
|
tabbrevs = tabbrevs.map(n => n.toUpperCase());
|
||||||
let transports = this.setup0(tabbrevs, options, verbose);
|
let transports = this.setup0(tabbrevs, options, verbose);
|
||||||
//TODO-SW copy this statuselement buildout somewhere for proxy too
|
|
||||||
if (options.statuscb) {
|
if (options.statuscb) {
|
||||||
this.statuscb = options.statuscb;
|
this.statuscb = options.statuscb;
|
||||||
}
|
}
|
||||||
if (!!options.statuselement) {
|
if (!!options.statuselement) {
|
||||||
//TODO-SW need to return status through messages
|
|
||||||
while (statuselement.lastChild) {statuselement.removeChild(statuselement.lastChild); } // Remove any exist status
|
while (statuselement.lastChild) {statuselement.removeChild(statuselement.lastChild); } // Remove any exist status
|
||||||
statuselement.appendChild(
|
statuselement.appendChild(
|
||||||
utils.createElement("UL", {}, transports.map(t => {
|
utils.createElement("UL", {}, transports.map(t => {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user