IPFS streams partial

This commit is contained in:
Mitra Ardron 2018-06-13 09:34:27 -07:00
parent 935de30f8d
commit 68746d0c14
2 changed files with 82 additions and 6 deletions

View File

@ -62,7 +62,7 @@ class TransportIPFS extends Transport {
this.options = options; // Dictionary of options { ipfs: {...}, "yarrays", yarray: {...} } this.options = options; // Dictionary of options { ipfs: {...}, "yarrays", yarray: {...} }
this.name = "IPFS"; // For console log etc this.name = "IPFS"; // For console log etc
this.supportURLs = ['ipfs']; this.supportURLs = ['ipfs'];
this.supportFunctions = ['fetch', 'store']; // Does not support reverse, createReadStream fails on files uploaded with urlstore TODO reenable when Kyle fixes urlstore this.supportFunctions = ['fetch', 'store', 'createReadStream']; // Does not support reverse
this.status = Transport.STATUS_LOADED; this.status = Transport.STATUS_LOADED;
} }
@ -297,7 +297,7 @@ class TransportIPFS extends Transport {
return TransportIPFS.urlFrom(res); return TransportIPFS.urlFrom(res);
} }
// Based on https://github.com/ipfs/js-ipfs/pull/1231/files /* OLD WAY Based on https://github.com/ipfs/js-ipfs/pull/1231/files
async p_offsetStream(stream, links, startByte, endByte) { async p_offsetStream(stream, links, startByte, endByte) {
let streamPosition = 0 let streamPosition = 0
@ -345,9 +345,8 @@ class TransportIPFS extends Transport {
function crs(opts) { // This is a synchronous function function crs(opts) { // This is a synchronous function
// Return a readable stream that provides the bytes between offsets "start" and "end" inclusive // Return a readable stream that provides the bytes between offsets "start" and "end" inclusive
console.log("opts=",JSON.stringify(opts)); console.log("opts=",JSON.stringify(opts));
/* Can replace rest of crs with this when https://github.com/ipfs/js-ipfs/pull/1231/files lands (hopefully v0.28.3) // Can replace rest of crs with this when https://github.com/ipfs/js-ipfs/pull/1231/files lands (hopefully v0.28.3)
return self.ipfs.catReadableStream(mh, opts ? opts.start : 0, opts && opts.end) ? opts.end+1 : undefined) // return self.ipfs.catReadableStream(mh, opts ? opts.start : 0, opts && opts.end) ? opts.end+1 : undefined)
*/
if (!opts) return throughstream; //TODO-STREAM unclear why called without opts - take this out when figured out if (!opts) return throughstream; //TODO-STREAM unclear why called without opts - take this out when figured out
if (throughstream && throughstream.destroy) throughstream.destroy(); if (throughstream && throughstream.destroy) throughstream.destroy();
throughstream = new stream.PassThrough(); throughstream = new stream.PassThrough();
@ -361,6 +360,82 @@ class TransportIPFS extends Transport {
} }
return crs; return crs;
} }
*/
async p_f_createReadStream(url, {verbose=false, wanturl=false}={}) {
/*
Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
No assumption is made about the data in terms of size or structure.
This is the initialisation step, which returns a function suitable for <VIDEO>
Returns a new Promise that resolves to function for a node.js readable stream.
Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
:param string url: URL of object being retrieved of form:
magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
ipfs:/ipfs/Q123
:param boolean verbose: true for debugging output
:resolves to: f({start, end}) => stream (The readable stream.)
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
*/
if (verbose) console.log(this.name, "p_f_createreadstream %o", url);
let stream;
try {
let multihash = url.pathname.split('/ipfs/')[1]
if (multihash.includes('/'))
throw new CodingError("Should not be seeing URLS with a path here:"+url);
let self = this;
if (wanturl) { // In ServiceWorker
return url;
} else {
return function createReadStream(opts) {
/*
The function, encapsulated and inside another function by p_f_createReadStream (see docs)
:param opts: { start: byte to start from; end: optional end byte }
:param boolean verbose: true for debugging output
:returns stream: The readable stream.
FOR IPFS this is copied and adapted from git repo js-ipfs/examples/browser-readablestream/index.js
*/
if (verbose) console.log("TransportIPFS createreadstream %o %o", multihash, opts);
const start = opts ? opts.start : 0
// The videostream library does not always pass an end byte but when
// it does, it wants bytes between start & end inclusive.
// catReadableStream returns the bytes exclusive so increment the end
// byte if it's been requested
const end = (opts && opts.end) ? start + opts.end + 1 : undefined
// If we've streamed before, clean up the existing stream
if (stream && stream.destroy) {
stream.destroy()
}
// This stream will contain the requested bytes
stream = self.ipfs.files.catReadableStream(multihash, {
offset: start,
length: end && end - start
})
// Log error messages
stream.on('error', (err) => console.error(err));
/* Gimmick from example :-)
if (start === 0) {
// Show the user some messages while we wait for the data stream to start
statusMessages(stream, log)
}
*/
return stream
};
}
} catch(err) {
if (stream && stream.destroy) {
stream.destroy()
}
console.log(`p_f_createReadStream failed on ${url} ${err.message}`);
throw(err);
};
}
static async p_test(opts, verbose) { static async p_test(opts, verbose) {
if (verbose) {console.log("TransportIPFS.test")} if (verbose) {console.log("TransportIPFS.test")}

View File

@ -32,7 +32,7 @@ class TransportWEBTORRENT extends Transport {
this.options = options; // Dictionary of options this.options = options; // Dictionary of options
this.name = "WEBTORRENT"; // For console log etc this.name = "WEBTORRENT"; // For console log etc
this.supportURLs = ['magnet']; this.supportURLs = ['magnet'];
this.supportFunctions = ['fetch', 'createReadStream']; this.supportFunctions = ['fetch']; //TODO-STREAM reenable video on WT 'createReadStream'];
this.status = Transport.STATUS_LOADED; this.status = Transport.STATUS_LOADED;
} }
@ -277,6 +277,7 @@ class TransportWEBTORRENT extends Transport {
async p_createReadableStream(url, opts, verbose) { async p_createReadableStream(url, opts, verbose) {
//Return a readable stream (suitable for a HTTP response) from a node type stream from webtorrent. //Return a readable stream (suitable for a HTTP response) from a node type stream from webtorrent.
// This is used by dweb-serviceworker for WebTorrent only
let filet = await this._p_fileTorrentFromUrl(url); let filet = await this._p_fileTorrentFromUrl(url);
return new ReadableStream({ return new ReadableStream({
start (controller) { start (controller) {