http-api init

This commit is contained in:
folex 2019-06-15 10:52:52 +02:00
parent e34714773c
commit efc419f3d5
23 changed files with 1594 additions and 0 deletions

20
.gitignore vendored
View File

@ -1,2 +1,22 @@
*.class *.class
*.log *.log
# Scala compiled
target/
# Python compiled
*.pyc
# IntelliJ
.idea
.vscode
*.iml
# MacOS folder metadata
.DS_Store
# Text writing
*.scriv
# Temp
docs/checklist.md

55
build.sbt Normal file
View File

@ -0,0 +1,55 @@
name := "http-scala-api"
version := "0.1"
scalaVersion := "2.12.8"
val fs2Version = "1.0.4"
val fs2 = "co.fs2" %% "fs2-core" % fs2Version
val fs2rx = "co.fs2" %% "fs2-reactive-streams" % fs2Version
val fs2io = "co.fs2" %% "fs2-io" % fs2Version
val sttpVersion = "1.5.17"
val sttp = "com.softwaremill.sttp" %% "core" % sttpVersion
val sttpCirce = "com.softwaremill.sttp" %% "circe" % sttpVersion
val sttpFs2Backend = "com.softwaremill.sttp" %% "async-http-client-backend-fs2" % sttpVersion
val sttpCatsBackend = "com.softwaremill.sttp" %% "async-http-client-backend-cats" % sttpVersion
val http4sVersion = "0.20.0-M7"
val http4sDsl = "org.http4s" %% "http4s-dsl" % http4sVersion
val http4sServer = "org.http4s" %% "http4s-blaze-server" % http4sVersion
val http4sCirce = "org.http4s" %% "http4s-circe" % http4sVersion
val circeVersion = "0.11.1"
val circeCore = "io.circe" %% "circe-core" % circeVersion
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val circeGenericExtras = "io.circe" %% "circe-generic-extras" % circeVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
val circeFs2 = "io.circe" %% "circe-fs2" % "0.11.0"
val catsVersion = "1.6.0"
val cats = "org.typelevel" %% "cats-core" % catsVersion
val catsEffect = "org.typelevel" %% "cats-effect" % "1.3.0"
resolvers += Resolver.sonatypeRepo("releases")
addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.10.0")
libraryDependencies ++= Seq(
fs2,
fs2rx,
fs2io,
sttp,
sttpCirce,
sttpFs2Backend,
sttpCatsBackend,
http4sDsl,
http4sServer,
http4sCirce,
circeCore,
circeGeneric,
circeGenericExtras,
circeParser,
circeFs2,
cats,
catsEffect
)

1
project/build.properties Normal file
View File

@ -0,0 +1 @@
sbt.version = 1.2.8

View File

@ -0,0 +1,22 @@
package hackhack
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.functor._
import cats.syntax.flatMap._
/**
* 1. Read docker logs, parse, push to fs2.Stream
* 2. Serve events from that stream to a websocket
*/
object Api extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
WebsocketServer
.make[IO](8080)
.use {
case (_, f) =>
IO(println("Started websocket server")) >>
f.join as ExitCode.Success
}
}
}

View File

@ -0,0 +1,44 @@
package hackhack
import cats.{Applicative, Monad}
import cats.data.EitherT
import cats.effect.Timer
import cats.syntax.flatMap._
import cats.syntax.apply._
import scala.concurrent.duration._
import scala.language.higherKinds
/**
* Exponential backoff delays.
*
* @param delayPeriod will be applied next time
* @param maxDelay upper bound for a single delay
*/
case class Backoff[E](delayPeriod: FiniteDuration, maxDelay: FiniteDuration) {
/**
* Next retry policy with delayPeriod multiplied times two, if maxDelay is not yet reached
*/
def next: Backoff[E] =
if (delayPeriod == maxDelay) this
else {
val nextDelay = delayPeriod * 2
if (nextDelay > maxDelay) copy(delayPeriod = maxDelay) else copy(delayPeriod = nextDelay)
}
def retry[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T], onError: EE F[Unit]): F[T] =
fn.value.flatMap {
case Right(value) Applicative[F].pure(value)
case Left(err)
onError(err) *> Timer[F].sleep(delayPeriod) *> next.retry(fn, onError)
}
def apply[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T]): F[T] =
retry(fn, (_: EE) Applicative[F].unit)
}
object Backoff {
def default[E]: Backoff[E] = Backoff(1.second, 1.minute)
}

View File

@ -0,0 +1,62 @@
package hackhack
import java.nio.file.{Path, StandardOpenOption}
import java.util.concurrent.Executors
import cats.effect.{Concurrent, ContextShift, Resource, Sync}
import cats.syntax.functor._
import fs2.io.Watcher
import fs2.{Pull, text}
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds
class FileStream[F[_]: Sync](
path: Path,
blocking: ExecutionContextExecutor) {
private val ChunkSize = 4096
def stream: fs2.Stream[F, String] = {
val events = fs2.io.file.watch(path, Seq(Watcher.EventType.Modified))
fs2.Stream
.eval(fileSize)
.flatMap(size =>
events.evalScan(0L -> size) {
case ((_, offset), _) => fileSize.map(offset -> _)
})
.flatMap { case (start, end) => readRange(start, end) }
}
private def readRange(start: Long, end: Long) =
fs2.io.file
.readRange[F](path, blocking, ChunkSize, start, end)
.through(text.utf8Decode)
.through(text.lines)
private def fileSize =
fs2.io.file.pulls
.fromPath[F](path, blocking, Seq(StandardOpenOption.READ))
.flatMap(c => Pull.eval(c.resource.size).flatMap(Pull.output1))
.stream
.compile
.toList
.map(_.head)
}
object FileStream {
def stream[F[_]: Sync: ContextShift: Concurrent](
path: Path): fs2.Stream[F, String] = {
val blocking: Resource[F, ExecutionContextExecutor] =
Resource
.make(
Sync[F].delay(Executors.newCachedThreadPool())
)(tp => Sync[F].delay(tp.shutdown()))
.map(ExecutionContext.fromExecutor)
fs2.Stream
.resource(blocking)
.flatMap(b => new FileStream[F](path, b).stream)
}
}

View File

@ -0,0 +1,11 @@
package hackhack
import scala.language.higherKinds
case class Log(height: Long, hash: String, expectedHash: Option[String], correct: Boolean)
class LogParser[F[_]] {
def stream: fs2.Stream[F, Log] = ???
private def retrieveLogs = ???
}

View File

@ -0,0 +1,71 @@
package hackhack
import java.nio.file.{Path, Paths}
import java.util.concurrent.Executors
import cats.data.EitherT
import cats.effect._
import cats.effect.concurrent.Ref
import cats.syntax.applicativeError._
import cats.syntax.functor._
import cats.{Defer, Monad}
import hackhack.docker.params.{DockerImage, DockerParams}
import hackhack.ipfs.{IpfsError, IpfsStore}
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext
import scala.language.{higherKinds, postfixOps}
case class Runner[
F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent: ContextShift](
ipfsStore: IpfsStore[F],
lastPort: Ref[F, Short],
blockingCtx: ExecutionContext =
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
) {
// nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656
private def nextPortThousand =
lastPort.modify(p => (p + 1 toShort, p * 1000 toShort))
private def dockerCmd(image: String,
name: String,
peer: String,
binaryPath: Path) =
for {
portThousand <- nextPortThousand
process = DockerParams
.build()
.port(30656 + portThousand toShort, 26656)
.port(30657 + portThousand toShort, 26657)
.port(30658 + portThousand toShort, 26658)
.option("--name", name)
.option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script
.volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS
.prepared(DockerImage(image, "latest"))
.daemonRun()
.process
} yield process
def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = {
ipfsStore
.fetch(hash)
.flatMap(
_.flatMap(bb fs2.Stream.chunk(fs2.Chunk.byteBuffer(bb)))
.through(fs2.io.file.writeAll(dest, blockingCtx))
.compile
.drain
.attemptT
)
}
def run(image: String, name: String, peer: String, ipfsHash: ByteVector) =
for {
path <- EitherT(IO(Paths.get(s"/tmp/$name")).attempt.to[F])
binary <- fetchTo(ipfsHash, path)
container <- EitherT.liftF(
Concurrent[F].start(IO(dockerCmd(image, name, peer, path)).to[F]))
} yield ???
}

View File

@ -0,0 +1,142 @@
package hackhack
import java.nio.file.{Files, Paths}
import java.util.concurrent.Executors
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, Resource, _}
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import fs2.{Pull, Stream}
import fs2.concurrent.SignallingRef
import fs2.io.Watcher
import fs2.io.file.pulls
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.syntax._
import io.circe.{Decoder, Encoder}
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
import org.http4s.implicits._
import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.middleware.{CORS, CORSConfig}
import org.http4s.server.websocket.WebSocketBuilder
import org.http4s.websocket.WebSocketFrame
import org.http4s.websocket.WebSocketFrame.Text
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration._
import scala.language.higherKinds
import scala.sys.process.Process
case class Event(line: String)
object Event {
implicit val encodeEvent: Encoder[Event] = deriveEncoder
implicit val decodeEvent: Decoder[Event] = deriveDecoder
}
case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
streams: Ref[F, Map[String, fs2.Stream[F, Event]]],
signal: SignallingRef[F, Boolean]
) extends Http4sDsl[F] {
private def randomStream: fs2.Stream[F, Event] =
fs2.Stream
.fromIterator[F, String](
Process("cat /dev/urandom").lineStream_!.iterator)
.map(s => Event(s.take(10)))
private def routes(): HttpRoutes[F] =
HttpRoutes.of[F] {
case GET -> Root / "websocket" / "start" / "random" / key =>
println(s"Starting streaming for random $key")
for {
stream <- streams.modify { map =>
map
.get(key)
.fold {
val s = randomStream
.evalTap(e => Sync[F].delay(println(s"event: $e")))
.interruptWhen(signal)
map.updated(key, s) -> s
}(map -> _)
}
frames = stream.map(e => Text(e.asJson.noSpaces))
ws <- WebSocketBuilder[F].build(
frames,
_.evalMap(e => Sync[F].delay(println(s"from $key: $e"))))
} yield ws
case GET -> Root / "websocket" / "start" / "file" / key =>
streams.get.flatMap { map =>
map.get(key) match {
case None =>
BadRequest(s"There's no stream for $key, create it first")
case Some(stream) =>
WebSocketBuilder[F].build(
stream.map(e => Text(e.asJson.noSpaces)),
_.evalMap(e => Sync[F].delay(println(s"from $key: $e")))
)
}
}
case (GET | POST) -> Root / "create" / key =>
val stream =
fs2.Stream
.eval(Sync[F].delay(Files.createTempFile("websocket", key)))
.evalTap(path => Sync[F].delay(println(s"created $path")))
.flatMap(FileStream.stream[F])
.map(Event(_))
.evalTap(e => Sync[F].delay(println(s"event $key $e")))
Sync[F].delay(println(s"Creating stream for $key")) >>
streams.update(map =>
map.get(key).fold(map.updated(key, stream)) { _ =>
println(s"Stream for $key already exists")
map
}) >>
Ok()
}
def close(): F[Unit] = signal.set(true)
def start(port: Int): Stream[F, ExitCode] =
for {
exitCode <- Stream.eval(Ref[F].of(ExitCode.Success))
server <- BlazeServerBuilder[F]
.bindHttp(port)
.withHttpApp(CORS[F, F](routes().orNotFound, corsConfig))
.serveWhile(signal, exitCode)
} yield server
val corsConfig = CORSConfig(
anyOrigin = true,
anyMethod = true,
allowedMethods = Some(Set("GET", "POST")),
allowCredentials = true,
maxAge = 1.day.toSeconds
)
}
object WebsocketServer {
import cats.syntax.flatMap._
import cats.syntax.functor._
def make[F[_]: Timer: ContextShift](port: Int)(
implicit F: ConcurrentEffect[F])
: Resource[F, (WebsocketServer[F], Fiber[F, Unit])] =
Resource.make(
for {
streams <- Ref.of[F, Map[String, fs2.Stream[F, Event]]](Map.empty)
signal <- SignallingRef[F, Boolean](false)
server = WebsocketServer(streams, signal)
fiber <- Concurrent[F].start(Backoff.default {
server
.start(port)
.compile
.drain
.attemptT
})
} yield (server, fiber)
) { case (s, f) => s.close() >> f.cancel }
}

View File

@ -0,0 +1,24 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker
/**
* Docker container wrapper
*
* @param containerId Running Container ID
*/
case class DockerContainer(containerId: String) extends AnyVal

View File

@ -0,0 +1,13 @@
package hackhack.docker
sealed trait DockerError extends Throwable
case class DockerCommandError(cmd: String, cause: Throwable)
extends Exception(s"Failed to execute shell command: `$cmd`", cause)
with DockerError
case class DockerException(message: String, cause: Throwable)
extends Exception(message, cause)
with DockerError
case class DockerContainerStopped(startedAt: Long) extends DockerError

View File

@ -0,0 +1,273 @@
package hackhack.docker
import java.time.Instant
import java.util.concurrent.{ExecutorService, Executors}
import cats.data.EitherT
import cats.effect._
import cats.syntax.applicativeError._
import cats.syntax.functor._
import cats.{Defer, Monad, ~>}
import hackhack.docker.params.DockerParams
import scala.concurrent.ExecutionContext
import scala.language.higherKinds
import scala.sys.process._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
case class DockerContainer(containerId: String) extends AnyVal
class DockerIO[F[_]: Monad: LiftIO: ContextShift: Defer](
ctx: ExecutionContext,
defaultStopTimeout: Int
) {
private val liftCtx: IO ~> F = new (IO ~> F) {
override def apply[A](fa: IO[A]): F[A] =
ContextShift[F].evalOn(ctx)(fa.to[F])
}
/**
* Run shell command
*/
private def runShell(cmd: String): EitherT[IO, DockerError, String] =
IO {
println(s"Running Docker command: `$cmd`")
cmd.!!.trim
}.attemptT.leftMap(DockerCommandError(cmd, _))
/**
* Run shell command and raise error on non-zero exit code. Command response is dropped
*/
private def runShellVoid(cmd: String): IO[Unit] = {
IO(cmd.!).flatMap {
case exit if exit != 0 =>
println(s"`$cmd` exited with code: $exit")
IO.raiseError[Unit](new Exception(s"`$cmd` exited with code: $exit"))
case _ => IO.unit
}
}
private def getNameIO(containerId: String): EitherT[IO, DockerError, String] =
runShell(s"""docker ps -af id=$containerId --format "{{.Names}}" """)
.map(_.trim.replace("\"", ""))
/**
* Get Docker container's name, if it's possible
*
* @param container Docker Container
*/
def getName(container: DockerContainer): EitherT[F, DockerError, String] =
getNameIO(container.containerId)
.mapK(liftCtx)
/**
* Runs a temporary docker container with custom executable. Returns stdout of execution as a string.
* Caller is responsible for container removal.
*
* @param params parameters for Docker container
* @return a string with execution stdout
*/
def exec(params: DockerParams.ExecParams): EitherT[F, DockerError, String] =
IO {
println(s"Executing docker command: ${params.command.mkString(" ")}")
params.process.!!
}.map(_.trim)
.attemptT
.leftMap[DockerError](DockerCommandError(params.command.mkString(" "), _))
.mapK(liftCtx)
/**
* Runs a daemonized docker container, providing a single String with the container ID.
* Calls `docker rm -f` on that ID when stream is over.
*
* @param params parameters for Docker container, must start with `docker run -d`
* @param stopTimeout Container clean up timeout: SIGTERM is sent, and if container is still alive after timeout, SIGKILL produced
* @return a stream that produces a docker container ID
*/
def run(
params: DockerParams.DaemonParams,
stopTimeout: Int = defaultStopTimeout): Resource[F, DockerContainer] = {
val runContainer: IO[Either[Throwable, String]] =
IO {
println(s"Running docker daemon: ${params.command.mkString(" ")}")
params.process.!!
}.map(_.trim)
.attemptT
.leftMap { err
println("Cannot run docker container: " + err, err)
err
}
.value
def tryStopContainer(name: String,
dockerId: String,
exitCase: ExitCase[Throwable]): IO[Try[Int]] =
IO {
println(
s"Going to stop container $name $dockerId, exit case: $exitCase")
val t = Try(s"docker stop -t $stopTimeout $dockerId".!)
// TODO should we `docker kill` if Cancel is triggered while stopping?
println(s"Stop result: $t")
t
}
def rmOnGracefulStop(name: String, dockerId: String): IO[Unit] =
IO {
println(
s"Container $dockerId with name $name stopped gracefully, going to rm -v it")
val containerLogs =
s"docker logs --tail 100 $dockerId".!!.replaceAll("(?m)^", s"$name ")
if (containerLogs.trim.nonEmpty)
println(Console.CYAN + containerLogs + Console.RESET)
else
println(Console.CYAN + s"$name: empty logs." + Console.RESET)
s"docker rm -v $dockerId".!
}.void
def forceRmWhenCannotStop(name: String,
dockerId: String,
err: Throwable): IO[Unit] =
IO {
println(
s"Stopping docker container $name $dockerId errored due to $err, going to rm -v -f it",
err)
s"docker rm -v -f $dockerId".!
}.void
def forceRmWhenStopNonZero(name: String,
dockerId: String,
code: Int): IO[Unit] =
IO {
println(
s"Stopping docker container $name $dockerId failed, exit code = $code, going to rm -v -f it")
s"docker rm -v -f $dockerId".!
}.void
Resource
.makeCase(runContainer) {
case (Right(dockerId), exitCase)
getNameIO(dockerId)
.getOrElse("(name is unknown)")
.flatMap { name
tryStopContainer(name, dockerId, exitCase).flatMap {
case Success(0)
rmOnGracefulStop(name, dockerId)
case Failure(err)
forceRmWhenCannotStop(name, dockerId, err)
case Success(x)
forceRmWhenStopNonZero(name, dockerId, x)
}
}
.handleError { err
println(s"Error cleaning up container $dockerId: $err", err)
}
case (Left(err), _)
println(
s"Cannot cleanup the docker container as it's failed to launch: $err",
err)
IO.unit
}
.flatMap[DockerContainer] {
case Right(dockerId)
Resource.pure(DockerContainer(dockerId))
case Left(err)
println(s"Resource cannot be acquired, error raised $err", err)
Resource.liftF[IO, DockerContainer](IO.raiseError(err))
}
.mapK(liftCtx)
}
/**
* Inspect the docker container to find out its running status
*
* @param container Docker container
* @return DockerRunning or any error found on the way
*/
def checkContainer(
container: DockerContainer): EitherT[F, DockerError, DockerRunning] = {
import java.time.format.DateTimeFormatter
val format = DateTimeFormatter.ISO_DATE_TIME
val dockerId = container.containerId
for {
status runShell(
s"docker inspect -f {{.State.Running}},{{.State.StartedAt}} $dockerId")
timeIsRunning IO {
val running :: started :: Nil = status.trim.split(',').toList
// TODO get any reason of why container is stopped
println(
s"Docker container $dockerId status = [$running], startedAt = [$started]")
Instant.from(format.parse(started)).getEpochSecond running.contains(
"true")
}.attemptT.leftMap(DockerException(
s"Cannot parse container status: $status",
_): DockerError)
} yield timeIsRunning
}.mapK(liftCtx).subflatMap {
case (time, true) Right(DockerRunning(time))
case (time, false) Left(DockerContainerStopped(time))
}
/**
* Create docker network as a resource. Network is deleted after resource is used.
*/
def makeNetwork(name: String): Resource[F, DockerNetwork] =
Resource
.make(
runShellVoid(s"docker network create $name").as(DockerNetwork(name))) {
case DockerNetwork(n) =>
IO(println(s"removing network $n"))
.flatMap(_ => runShellVoid(s"docker network rm $n"))
.handleError {
case NonFatal(err)
println(s"Trying to remove network $n, got error $err", err)
}
}
.mapK(liftCtx)
/**
* Join (connect to) docker network as a resource. Container will be disconnected from network after resource is used.
*/
def joinNetwork(container: DockerContainer,
network: DockerNetwork): Resource[F, Unit] =
Resource
.make(
runShellVoid(
s"docker network connect ${network.name} ${container.containerId}"))(
_ =>
IO(println(
s"disconnecting container ${container.containerId} from network ${network.name} "))
.flatMap(_ =>
runShellVoid(
s"docker network disconnect ${network.name} ${container.containerId}"))
.handleError {
case NonFatal(err)
println(
s"Trying to disconnect container ${container.containerId}" +
s"from network ${network.name}, got error $err"
)
}
)
.mapK(liftCtx)
}
object DockerIO {
def apply[F[_]](implicit dio: DockerIO[F]): DockerIO[F] = dio
def make[F[_]: Monad: LiftIO: ContextShift: Defer](
ex: ExecutorService = Executors.newSingleThreadExecutor(),
defaultStopTimeout: Int = 10
): Resource[F, DockerIO[F]] =
Resource
.make(IO(ExecutionContext.fromExecutorService(ex)).to[F])(
ctx IO(ctx.shutdown()).to[F]
)
.map(ctx new DockerIO[F](ctx, defaultStopTimeout))
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker
import cats.effect.Resource
import scala.language.higherKinds
/**
* Represents docker network, defined by it's name
*/
case class DockerNetwork(name: String) extends AnyVal
object DockerNetwork extends slogging.LazyLogging {
/**
* Create docker network as a resource. Network is deleted after resource is used.
*/
def make[F[_]: DockerIO](name: String): Resource[F, DockerNetwork] =
DockerIO[F].makeNetwork(name)
/**
* Join (connect to) docker network as a resource. Container will be disconnected from network after resource is used.
*/
def join[F[_]: DockerIO](container: DockerContainer, network: DockerNetwork): Resource[F, Unit] =
DockerIO[F].joinNetwork(container, network)
}

View File

@ -0,0 +1,19 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker
case class DockerRunning(startedAt: Long)

View File

@ -0,0 +1,27 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker.params
/**
* Representation of a docker image
*
* @param name Fully qualified name of an image, including a repository and a name. E.g., fluencelabs/worker
* @param tag Tag of the image, will be appended to [[name]] after a colon
*/
case class DockerImage(name: String, tag: String) {
val imageName = s"$name:$tag"
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker.params
/**
* Limits on cpu and memory of a container
*
* @see [[DockerParams.cpus]], [[DockerParams.memory]], [[DockerParams.memoryReservation]]
* @param cpus Fraction number of max cores available to a container.
* @param memoryMb A hard limit on maximum amount of memory available to a container
* @param memoryReservationMb Amount of memory guaranteed to be allocated for a container
*/
case class DockerLimits(cpus: Option[Double], memoryMb: Option[Int], memoryReservationMb: Option[Int])

View File

@ -0,0 +1,184 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.docker.params
import scala.collection.immutable.Queue
import scala.sys.process._
/**
* Builder for basic `docker run` command parameters.
*
* @param params Current command' params
*/
case class DockerParams private (params: Queue[String]) {
/**
* Adds a single param to command.
*
* @param param option
*/
def add(param: String): DockerParams =
copy(params.enqueue(param))
/**
* Adds a named option to command.
*
* @param optionName option name
* @param optionValue option value
*/
def option(optionName: String, optionValue: String): DockerParams =
add(optionName).add(optionValue)
def option(optionName: String, optionValue: Option[String]): DockerParams = {
optionValue.fold(this)(v => add(optionName).add(v))
}
/**
* Adds a port mapping.
*
* @param hostPort port number on host
* @param containerPort mapped port number in container
*/
def port(hostPort: Short, containerPort: Short): DockerParams =
option("-p", s"$hostPort:$containerPort")
/**
* Adds a volume mapping.
*
* @param hostVolume volume directory on host
* @param containerVolume mounted volume location in container
*/
def volume(hostVolume: String, containerVolume: String): DockerParams =
option("-v", s"$hostVolume:$containerVolume")
/**
* Specifies a user on whose behalf commands will be run
*
* @param user user login or uid
*/
def user(user: String): DockerParams =
option("--user", user)
/**
* Specifies a hard limit on maximum amount of cpu that can be utilized by a container
*
* @param limit Fraction specifying number of cores. E.g., 0.5 to limit usage to a half of a core.
*/
def cpus(limit: Double): DockerParams =
option("--cpus", limit.toString)
/**
* Specifies a hard limit on maximum amount of memory available to a container
*
* @param limitMb Amount of memory in megabytes
*/
def memory(limitMb: Int): DockerParams =
option("--memory", s"${limitMb}M")
/**
* Guarantees to allocate at lest this much memory to a container
*
* @param megabytes Amount of memory in megabytes
* @return
*/
def memoryReservation(megabytes: Int): DockerParams =
option("--memory-reservation", s"${megabytes}M")
/**
* Sets CPU and memory limits on a docker container
*/
def limits(limits: DockerLimits): DockerParams = {
// TODO: rewrite this with State monad
type Mut = DockerParams => DockerParams
type MutOpt = Option[Mut]
val withCpus: MutOpt = limits.cpus.map(limit => _.cpus(limit))
val withMemory: MutOpt = limits.memoryMb.map(limit => _.memory(limit))
val withMemoryReservation: MutOpt = limits.memoryMb.map(limit => _.memoryReservation(limit))
Seq(withCpus, withMemory, withMemoryReservation).flatten.foldLeft(this) { case (dp, f) => f(dp) }
}
/**
* Builds the current command to a representation ready to pass in [[scala.sys.process.Process]].
*
* @param image Container image
*/
def prepared(image: DockerImage): DockerParams.Prepared =
DockerParams.Prepared(params, image)
}
object DockerParams {
// Represents finalized docker command that's ready to be ran and can't be changed anymore
sealed trait SealedParams {
def command: Seq[String]
def process: ProcessBuilder = Process(command)
}
// Represents a command for daemonized container run, i.e., anything with "docker run -d"
case class DaemonParams(command: Seq[String]) extends SealedParams
// Represents a command for a single command execution, presumably with `--rm` flag
case class ExecParams(command: Seq[String]) extends SealedParams
private val daemonParams = Seq("docker", "run", "-d")
private val runParams = Seq("docker", "run", "--user", "", "--rm", "-i")
// Represents a docker run command with specified image name, ready to be specialized to Daemon or Exec params
case class Prepared(params: Seq[String], image: DockerImage) {
/**
* Builds a command starting with `docker run -d` wrapped in DaemonParams, so
* container will be deleted automatically by [[DockerIO.run]]
*
* @param cmd Command to run inside the container. If not present, the default command will be executed
*/
def daemonRun(cmd: String = null): DaemonParams =
DaemonParams(Option(cmd).foldLeft(daemonParams ++ params :+ image.imageName)(_ :+ _))
/**
* Builds a `docker run` command running custom executable.
*
* `--rm` flag is specified, so container will be removed automatically after executable exit
* Resulting command will be like the following
* `docker run --user "" --rm -i --entrypoint executable imageName execParams`
*
* @param entrypoint An executable to be run in container, must be callable by container (i.e. be in $PATH)
* @param execParams Parameters passed to `executable`
*/
def run(entrypoint: String, execParams: String*): ExecParams =
ExecParams(
(runParams :+ "--entrypoint" :+ entrypoint) ++ params ++ (image.imageName +: execParams)
)
/**
* Builds a `docker run` command running custom executable.
*
* `--rm` flag is specified, so container will be removed automatically after executable exit
* Resulting command will be like the following
* `docker run --user "" --rm -i imageName execParams`
*
* @param execParams Parameters passed to `executable`
*/
def runExec(execParams: String*): ExecParams =
ExecParams(
runParams ++ params ++ (image.imageName +: execParams)
)
}
// Builds an empty docker command, ready for adding options
def build(): DockerParams = DockerParams(Queue())
}

View File

@ -0,0 +1,308 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.ipfs
import java.nio.ByteBuffer
import cats.Traverse.ops._
import cats.data.EitherT
import cats.instances.either._
import cats.instances.list._
import cats.syntax.either._
import cats.{Applicative, Monad}
import com.softwaremill.sttp.Uri.QueryFragment.KeyValue
import com.softwaremill.sttp.circe.asJson
import com.softwaremill.sttp.{Multipart, SttpBackend, Uri, asStream, sttp, _}
import fs2.RaiseThrowable
import io.circe.{Decoder, DecodingFailure}
import scodec.bits.ByteVector
import scala.collection.immutable
import scala.language.higherKinds
// TODO move somewhere else
object ResponseOps {
import cats.data.EitherT
import com.softwaremill.sttp.Response
implicit class RichResponse[F[_], T, EE <: Throwable](
resp: EitherT[F, Throwable, Response[T]])(
implicit F: Monad[F]
) {
val toEitherT: EitherT[F, String, T] =
resp.leftMap(_.getMessage).subflatMap(_.body)
def toEitherT[E](errFunc: String => E): EitherT[F, E, T] =
toEitherT.leftMap(errFunc)
}
}
class IpfsClient[F[_]: Monad](ipfsUri: Uri)(
implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?],
fs2.Stream[F, ByteBuffer]]
) {
import IpfsClient._
import IpfsLsResponse._
import ResponseOps._
object Multihash {
// https://github.com/multiformats/multicodec/blob/master/table.csv
val SHA256 = ByteVector(0x12, 32) // 0x12 => SHA256; 32 = 256 bits in bytes
}
// URI for downloading data
private val CatUri = ipfsUri.path("/api/v0/cat")
// URI for listing data if it has nested resources
private val LsUri = ipfsUri.path("/api/v0/ls")
private val UploadUri = ipfsUri.path("/api/v0/add")
// Converts 256-bits hash to a base58 IPFS address, prepending multihash bytes
private def toAddress(hash: ByteVector): String =
(Multihash.SHA256 ++ hash).toBase58
// Converts base58 IPFS address to a 256-bits hash
private def fromAddress(str: String): Either[String, ByteVector] =
ByteVector.fromBase58Descriptive(str).map(_.drop(2))
private def lsCall(
hash: ByteVector): EitherT[F, IpfsError, IpfsLsResponse] = {
val address = toAddress(hash)
val uri = LsUri.param("arg", address)
for {
_ <- EitherT.pure[F, IpfsError](println(s"IPFS `ls` started $uri"))
response <- sttp
.response(asJson[IpfsLsResponse])
.get(uri)
.send()
.toEitherT { er =>
val errorMessage = s"IPFS 'ls' error $uri: $er"
IpfsError(errorMessage)
}
.subflatMap(_.left.map { er =>
println(s"IPFS 'ls' deserialization error: $er")
IpfsError(s"IPFS 'ls' deserialization error $uri", Some(er.error))
})
.map { r =>
println(s"IPFS 'ls' finished $uri")
r
}
.leftMap(identity[IpfsError])
} yield response
}
/**
* Generates URI for uploading to IPFS.
*
* @param onlyHash If true, IPFS will calculates the hash, without saving a data to IPFS
* @param canBeMultiple If true, IPFS will wrap the list of files with directory and return a hash of this directory
* @return
*/
private def uploadUri(onlyHash: Boolean, canBeMultiple: Boolean) = {
val multipleStr = canBeMultiple.toString
UploadUri
.queryFragment(KeyValue("pin", "true"))
.queryFragment(KeyValue("path", ""))
.queryFragment(KeyValue("only-hash", onlyHash.toString))
.queryFragment(KeyValue("recursive", multipleStr))
.queryFragment(KeyValue("wrap-with-directory", multipleStr))
}
/**
* `add` operation. Wraps files with a directory if there are multiple files.
*
* @param data uploads to IPFS
* @param onlyHash If true, only calculates the hash, without saving a data to IPFS
*/
private def add[A: IpfsData](
data: A,
onlyHash: Boolean
): EitherT[F, IpfsError, ByteVector] = {
val uri = uploadUri(onlyHash, IpfsData[A].wrapInDirectory)
for {
_ <- EitherT.pure[F, IpfsError](println(s"IPFS 'add' started $uri"))
multiparts <- IpfsData[A].toMultipart[F](data)
responses <- addCall(uri, multiparts)
_ <- assert[F](responses.nonEmpty, "IPFS 'add': Empty response")
hash <- EitherT.fromEither[F](getParentHash(responses))
} yield hash
}
/**
* HTTP call to add multiparts to IPFS.
*
*/
private def addCall(uri: Uri, multiparts: immutable.Seq[Multipart])
: EitherT[F, IpfsError, List[UploadResponse]] =
// raw response: {upload-response-object}\n{upload-response-object}...
sttp
.response(asListJson[UploadResponse])
.post(uri)
.multipartBody(multiparts)
.send()
.toEitherT { er =>
val errorMessage = s"IPFS 'add' error $uri: $er"
IpfsError(errorMessage)
}
.subflatMap(_.left.map { er =>
println(s"IPFS 'add' deserialization error: $er")
IpfsError(s"IPFS 'add' deserialization error $uri", Some(er))
})
.map { r =>
println(s"IPFS 'add' finished $uri")
r
}
.leftMap(identity[IpfsError])
/**
* Returns hash of element with empty name. It is a wrapping directory's name.
* If only one file was uploaded, a list has one element and a hash of this element will be returned.
*
* @param responses list of JSON responses from IPFS
*/
private def getParentHash(
responses: List[UploadResponse]): Either[IpfsError, ByteVector] = {
for {
namesWithHashes <- responses
.map(
r =>
fromAddress(r.Hash).map(h => r.Name -> h).leftMap { e =>
println(s"IPFS 'add' hash ${r.Hash} is not correct")
IpfsError(e)
}
)
.sequence[Either[IpfsError, ?], (String, ByteVector)]
hash <- if (namesWithHashes.length == 1) Right(namesWithHashes.head._2)
else {
// if there is more then one JSON objects
// find an object with an empty name - it will be an object with a directory
namesWithHashes
.find(_._1.isEmpty)
.map(_._2)
.toRight(
IpfsError(
s"IPFS 'add' error: incorrect response, expected at least 1 response with empty name, found 0. " +
s"Check 'wrap-with-directory' query flag in URI"
): IpfsError
)
}
} yield hash
}
/**
* Returns hash of files from directory.
* If hash belongs to file, returns the same hash.
*
* @param hash Content's hash
*/
def ls(hash: ByteVector): EitherT[F, IpfsError, List[ByteVector]] =
for {
rawResponse <- lsCall(hash)
_ <- assert[F](
rawResponse.Objects.size == 1,
s"Expected a single object, got ${rawResponse.Objects.size}. Response: $rawResponse"
)
rawHashes = {
val headObject = rawResponse.Objects.head
if (headObject.Links.forall(_.Name.isEmpty)) List(headObject.Hash)
else headObject.Links.map(_.Hash)
}
hashes <- rawHashes
.map { h =>
EitherT
.fromEither[F](fromAddress(h))
.leftMap(err =>
IpfsError(s"Cannot parse '$h' hex: $err"): IpfsError)
}
.sequence[EitherT[F, IpfsError, ?], ByteVector]
} yield {
println(s"IPFS 'ls' hashes: ${hashes.mkString(" ")}")
hashes
}
/**
* Downloads data from IPFS.
*
* @param hash data address in IPFS
* @return
*/
def download(
hash: ByteVector): EitherT[F, IpfsError, fs2.Stream[F, ByteBuffer]] = {
val address = toAddress(hash)
val uri = CatUri.param("arg", address)
for {
_ <- EitherT.pure[F, IpfsError](println(s"IPFS 'download' started $uri"))
response <- sttp
.response(asStream[fs2.Stream[F, ByteBuffer]])
.get(uri)
.send()
.toEitherT { er =>
val errorMessage = s"IPFS 'download' error $uri: $er"
IpfsError(errorMessage)
}
.map { r =>
println(s"IPFS 'download' finished $uri")
r
}
.leftMap(identity[IpfsError])
} yield response
}
/**
* Only calculates hash - no data will be persisted on IPFS.
*
* @return hash of data
*/
def calculateHash[A: IpfsData](data: A): EitherT[F, IpfsError, ByteVector] =
add(data, onlyHash = true)
/**
* Uploads data to IPFS
*
* @return hash of data
*/
def upload[A: IpfsData](data: A): EitherT[F, IpfsError, ByteVector] =
add(data, onlyHash = false)
}
object IpfsClient {
import io.circe.fs2.stringStreamParser
private[ipfs] def assert[F[_]: Applicative](
test: Boolean,
errorMessage: String): EitherT[F, IpfsError, Unit] =
EitherT.fromEither[F](Either.cond(test, (), IpfsError(errorMessage)))
// parses application/json+stream like {object1}\n{object2}...
private[ipfs] def asListJson[B: Decoder: IsOption]
: ResponseAs[Decoder.Result[List[B]], Nothing] = {
implicit val rt = new RaiseThrowable[fs2.Pure] {}
asString
.map(fs2.Stream.emit)
.map(
_.through(stringStreamParser[fs2.Pure]).attempt
.map(_.leftMap {
case e: DecodingFailure => e
case e: Throwable => DecodingFailure(e.getLocalizedMessage, Nil)
})
.toList
.map(_.map(_.as[B]).flatMap(identity))
.sequence
)
}
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.ipfs
import java.nio.file.{Files, Path}
import cats.Monad
import cats.data.EitherT
import com.softwaremill.sttp.{Multipart, _}
import fluence.effects.ipfs.IpfsClient.assert
import scodec.bits.ByteVector
import scala.collection.immutable
import scala.language.higherKinds
trait IpfsData[A] {
def toMultipart[F[_]: Monad](data: A): EitherT[F, IpfsError, immutable.Seq[Multipart]]
val wrapInDirectory: Boolean
}
object IpfsData {
implicit val chunksData: IpfsData[List[ByteVector]] = new IpfsData[List[ByteVector]] {
def toMultipart[F[_]: Monad](data: List[ByteVector]): EitherT[F, IpfsError, immutable.Seq[Multipart]] = {
EitherT.pure(data.zipWithIndex.map {
case (chunk, idx) =>
multipart(idx.toString, ByteArrayBody(chunk.toArray))
})
}
override val wrapInDirectory: Boolean = true
}
implicit val bytesData: IpfsData[ByteVector] = new IpfsData[ByteVector] {
override def toMultipart[F[_]: Monad](data: ByteVector): EitherT[F, IpfsError, immutable.Seq[Multipart]] =
EitherT.pure(immutable.Seq(multipart("", ByteArrayBody(data.toArray))))
override val wrapInDirectory: Boolean = false
}
/**
* Uploads files to IPFS node. Supports only one file or files in one directory, without nested directories.
*/
implicit val pathData: IpfsData[Path] = new IpfsData[Path] {
/**
* Returns incoming path if it is a file, return a list of files, if the incoming path is a directory.
* Validates if the directory doesn't have nested directories.
*/
private def listPaths[F[_]: Monad](path: Path): EitherT[F, IpfsError, immutable.Seq[Path]] = {
import scala.collection.JavaConverters._
if (Files.isDirectory(path)) {
val allFiles = Files.list(path).iterator().asScala.to[immutable.Seq]
val allFilesIsRegular = allFiles.forall(p => Files.isRegularFile(p))
assert(
allFilesIsRegular,
s"IPFS 'listPaths' error: expected flat directory, found nested directories in ${path.getFileName}"
).map(_ => allFiles)
} else EitherT.pure(immutable.Seq(path))
}
override def toMultipart[F[_]: Monad](path: Path): EitherT[F, IpfsError, immutable.Seq[Multipart]] = {
for {
_ <- assert(Files.exists(path), s"IPFS 'add' error: file '${path.getFileName}' does not exist")
pathsList <- listPaths(path)
parts = pathsList.map(p => multipartFile("", p))
_ <- assert(parts.nonEmpty, s"IPFS 'add' error: directory ${path.getFileName} is empty")
} yield parts
}
override val wrapInDirectory: Boolean = true
}
def apply[A](implicit id: IpfsData[A]): IpfsData[A] = id
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.ipfs
case class IpfsError(message: String, causedBy: Option[Throwable] = None) extends Throwable {
override def getMessage: String = message
override def getCause: Throwable = causedBy getOrElse super.getCause
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.ipfs
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
case class UploadResponse(Name: String, Hash: String, Size: Int)
object UploadResponse {
implicit val encodeUploadResponse: Encoder[UploadResponse] = deriveEncoder
implicit val decodeUploadResponse: Decoder[UploadResponse] = deriveDecoder
}
/**
* File in IPFS.
*/
case class FileManifest(Name: String, Hash: String, Size: Int, Type: Int)
/**
* IPFS object. Represents hash of object with links to contained files.
*
*/
case class IpfsObject(Hash: String, Links: List[FileManifest])
/**
* `ls` response from IPFS
*/
case class IpfsLsResponse(Objects: List[IpfsObject])
object IpfsLsResponse {
implicit val encodeFileManifest: Encoder[FileManifest] = deriveEncoder
implicit val decodeFileManifest: Decoder[FileManifest] = deriveDecoder
implicit val encodeIpfsObject: Encoder[IpfsObject] = deriveEncoder
implicit val decodeIpfsObject: Decoder[IpfsObject] = deriveDecoder
implicit val encodeIpfsLs: Encoder[IpfsLsResponse] = deriveEncoder
implicit val decodeIpfsLs: Decoder[IpfsLsResponse] = deriveDecoder
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2018 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hackhack.ipfs
import java.nio.ByteBuffer
import cats.Monad
import cats.data.EitherT
import com.softwaremill.sttp.{SttpBackend, Uri}
import scodec.bits.ByteVector
import scala.language.higherKinds
/**
* Implementation of IPFS downloading mechanism
*
* @param client to interact with IPFS nodes
*/
class IpfsStore[F[_]](client: IpfsClient[F]) {
def fetch(
hash: ByteVector): EitherT[F, IpfsError, fs2.Stream[F, ByteBuffer]] =
client.download(hash)
/**
* Returns hash of files from directory.
* If hash belongs to file, returns the same hash.
*
* @param hash Content's hash
*/
def ls(hash: ByteVector): EitherT[F, IpfsError, List[ByteVector]] =
client.ls(hash)
}
object IpfsStore {
def apply[F[_]](
address: Uri
)(implicit F: Monad[F],
sttpBackend: SttpBackend[EitherT[F, Throwable, ?],
fs2.Stream[F, ByteBuffer]]): IpfsStore[F] =
new IpfsStore(new IpfsClient[F](address))
}

View File

@ -0,0 +1,29 @@
package hackhack.utils
import cats.effect.concurrent.Deferred
import cats.effect.{Concurrent, Fiber, Resource}
import cats.syntax.flatMap._
import cats.syntax.functor._
import scala.language.higherKinds
object MakeResource {
/**
* Uses the resource concurrently in a separate fiber, until the given F[Unit] resolves.
*
* @param resource release use => resource
* @tparam F Effect
* @return Delayed action of using the resource
*/
def useConcurrently[F[_]: Concurrent](resource: F[Unit] Resource[F, _]): F[Unit] =
for {
completeDef Deferred[F, Unit]
fiberDef Deferred[F, Fiber[F, Unit]]
fiber Concurrent[F].start(
resource(
completeDef.complete(()) >> fiberDef.get.flatMap(_.join)
).use(_ completeDef.get)
)
_ fiberDef.complete(fiber)
} yield ()
}