1
0
mirror of https://github.com/fluencelabs/cosmos-hackathon-backend synced 2025-03-31 19:21:05 +00:00
This commit is contained in:
folex 2019-06-15 15:54:19 +02:00
parent 17abc4f045
commit 4755e16437
3 changed files with 203 additions and 42 deletions

@ -0,0 +1,118 @@
package hackhack
import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.Executors
import cats.Monad
import cats.data.EitherT
import cats.effect._
import cats.effect.concurrent.Ref
import cats.syntax.applicativeError._
import cats.syntax.either._
import cats.syntax.option._
import com.softwaremill.sttp.{SttpBackend, Uri, sttp}
import hackhack.ipfs.{IpfsError, IpfsStore}
import io.circe.Json
import io.circe.parser.parse
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext
import scala.language.higherKinds
case class Peer(
host: String,
p2pPort: Short,
rpcPort: Short,
p2pKey: String
) {
val RpcUri = Uri(s"http://$host:$rpcPort")
val P2pSeed = s"$p2pKey@$host:$p2pPort"
}
case class App(name: String,
containerId: String,
rpcPort: Short,
seed: String,
binaryHash: ByteVector,
binaryPath: Path)
class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
ipfsStore: IpfsStore[F],
runner: Runner[F],
apps: Ref[F, Map[String, App]],
blockingCtx: ExecutionContext =
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()))(
implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?],
fs2.Stream[F, ByteBuffer]]) {
def log(str: String) = EitherT(IO(println(str)).attempt.to[F])
// Returns consensusHeight
def run(name: String,
peer: Peer,
hash: ByteVector): EitherT[F, Throwable, Long] =
for {
genesis <- dumpGenesis(name, peer)
_ <- log(s"$name dumped genesis")
baseDir <- EitherT(
IO(
Paths
.get(System.getProperty("user.home"), s".salmon/$name")
.toAbsolutePath).attempt.to[F])
_ <- EitherT(IO(baseDir.toFile.mkdir()).attempt.to[F])
path = baseDir.resolve("genesis.json")
_ <- EitherT(IO(Files.write(path, genesis.getBytes())).attempt.to[F])
_ <- log(s"$name saved genesis")
binaryPath = baseDir.resolve("binary")
_ <- fetchTo(hash, binaryPath).leftMap(identity[Throwable])
_ <- log(s"$name binary downloaded $binaryPath")
height <- consensusHeight(name, peer)
} yield height
def consensusHeight(appName: String,
peer: Peer): EitherT[F, Throwable, Long] = {
rpc(appName, peer, "/status").subflatMap(
_.hcursor
.downField("result")
.downField("sync_info")
.get[Long]("latest_block_height")
)
}
def dumpGenesis(appName: String,
peer: Peer): EitherT[F, Throwable, String] = {
rpc(appName, peer, "/genesis").subflatMap(
_.hcursor.downField("result").get[String]("genesis"))
}
def rpc(appName: String,
peer: Peer,
path: String): EitherT[F, Throwable, Json] = Backoff.default.retry(
sttp
.get(peer.RpcUri.path(path))
.send[EitherT[F, Throwable, ?]]
.subflatMap(
_.body
.leftMap(e =>
new Exception(s"Error RPC $path $appName: ${peer.RpcUri}: $e"))
.flatMap(s => parse(s))
),
(e: Throwable) =>
Monad[F].pure(println(s"Error RPC $path $appName: ${peer.RpcUri}: $e")),
max = 10
)
def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = {
ipfsStore
.fetch(hash)
.flatMap(
_.flatMap(bb fs2.Stream.chunk(fs2.Chunk.byteBuffer(bb)))
.through(fs2.io.file.writeAll[F](dest, blockingCtx))
.compile
.drain
.attemptT
.leftMap(e => IpfsError("fetchTo", e.some))
)
}
}

@ -1,41 +1,64 @@
package hackhack package hackhack
import cats.{Applicative, Monad} import cats.{Applicative, Functor, Monad}
import cats.data.EitherT import cats.data.EitherT
import cats.effect.Timer import cats.effect.Timer
import cats.syntax.flatMap._ import cats.syntax.flatMap._
import cats.syntax.apply._ import cats.syntax.apply._
import cats.syntax.applicative._
import cats.syntax.either._
import cats.instances.either._
import cats.effect.syntax.all._
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.higherKinds import scala.language.higherKinds
/** /**
* Exponential backoff delays. * Exponential backoff delays.
* *
* @param delayPeriod will be applied next time * @param delayPeriod will be applied next time
* @param maxDelay upper bound for a single delay * @param maxDelay upper bound for a single delay
*/ */
case class Backoff[E](delayPeriod: FiniteDuration, maxDelay: FiniteDuration) { case class Backoff[E](delayPeriod: FiniteDuration, maxDelay: FiniteDuration) {
/** /**
* Next retry policy with delayPeriod multiplied times two, if maxDelay is not yet reached * Next retry policy with delayPeriod multiplied times two, if maxDelay is not yet reached
*/ */
def next: Backoff[E] = def next: Backoff[E] =
if (delayPeriod == maxDelay) this if (delayPeriod == maxDelay) this
else { else {
val nextDelay = delayPeriod * 2 val nextDelay = delayPeriod * 2
if (nextDelay > maxDelay) copy(delayPeriod = maxDelay) else copy(delayPeriod = nextDelay) if (nextDelay > maxDelay) copy(delayPeriod = maxDelay)
else copy(delayPeriod = nextDelay)
} }
def retry[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T], onError: EE F[Unit]): F[T] = def retry[F[_]: Timer: Monad: Functor, EE <: E, T](
fn: EitherT[F, EE, T],
onError: EE F[Unit],
max: Int): EitherT[F, EE, T] =
fn.biflatMap(
err =>
if (max == 0) EitherT(err.asLeft.pure[F])
else
EitherT
.right[EE](onError(err) *> Timer[F].sleep(delayPeriod))
.flatMap(_ => next.retry(fn, onError, max - 1)),
EitherT.pure(_)
)
def retryForever[F[_]: Timer: Monad, EE <: E, T](
fn: EitherT[F, EE, T],
onError: EE F[Unit]): F[T] =
fn.value.flatMap { fn.value.flatMap {
case Right(value) Applicative[F].pure(value) case Right(value) Applicative[F].pure(value)
case Left(err) case Left(err)
onError(err) *> Timer[F].sleep(delayPeriod) *> next.retry(fn, onError) onError(err) *> Timer[F].sleep(delayPeriod) *> next.retryForever(
fn,
onError)
} }
def apply[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T]): F[T] = def apply[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T]): F[T] =
retry(fn, (_: EE) Applicative[F].unit) retryForever(fn, (_: EE) Applicative[F].unit)
} }

@ -5,20 +5,21 @@ import java.util.concurrent.Executors
import cats.data.EitherT import cats.data.EitherT
import cats.effect._ import cats.effect._
import cats.effect.concurrent.Ref import cats.effect.concurrent.{Deferred, Ref}
import cats.syntax.applicativeError._ import cats.syntax.applicativeError._
import cats.syntax.option._ import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._ import cats.syntax.functor._
import cats.{Defer, Monad} import cats.{Applicative, Defer, Monad}
import hackhack.docker.params.{DockerImage, DockerParams} import hackhack.docker.params.{DockerImage, DockerParams}
import hackhack.ipfs.{IpfsError, IpfsStore} import io.circe.Json
import scodec.bits.ByteVector import io.circe.parser.parse
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.language.{higherKinds, postfixOps} import scala.language.{higherKinds, postfixOps}
import scala.sys.process._
case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
ipfsStore: IpfsStore[F],
lastPort: Ref[F, Short], lastPort: Ref[F, Short],
blockingCtx: ExecutionContext = blockingCtx: ExecutionContext =
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
@ -26,13 +27,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
// nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656 // nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656
val ContainerImage = "cosmos-runner"
private def nextPortThousand = private def nextPortThousand =
lastPort.modify(p => (p + 1 toShort, p * 1000 toShort)) lastPort.modify(p => (p + 1 toShort, p * 1000 toShort))
private def dockerCmd(image: String, private def dockerCmd(name: String, peer: String, binaryPath: Path) =
name: String,
peer: String,
binaryPath: Path) =
for { for {
portThousand <- nextPortThousand portThousand <- nextPortThousand
process = DockerParams process = DockerParams
@ -43,31 +43,51 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
.option("--name", name) .option("--name", name)
.option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script .option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script
.volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS .volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS
.prepared(DockerImage(image, "latest")) .prepared(DockerImage(ContainerImage, "latest"))
.daemonRun() .daemonRun()
.process .process
} yield process } yield process
def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = { private def getLogPath(containerId: String): EitherT[F, Throwable, Path] =
ipfsStore EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F])
.fetch(hash) .subflatMap(inspect => parse(inspect))
.flatMap( .subflatMap(
_.flatMap(bb fs2.Stream.chunk(fs2.Chunk.byteBuffer(bb))) _.asArray
.through(fs2.io.file.writeAll[F](dest, blockingCtx)) .flatMap(_.headOption)
.compile .fold(
.drain new Exception(s"Can't parse array from docker inspect $containerId")
.attemptT .asLeft[Json]
.leftMap(e => IpfsError("fetchTo", e.some)) )(_.asRight)
) )
.subflatMap(_.as[String])
.flatMap(p => EitherT(IO(Paths.get(p)).attempt.to[F]))
private def log(str: String) = IO(println(str)).to[F]
def run(name: String,
peer: String,
binaryPath: Path): EitherT[F, Throwable, fs2.Stream[F, String]] = {
val container = for {
cmd <- dockerCmd(name, peer, binaryPath)
_ <- log(s"$name got dockerCmd")
idPromise <- Deferred[F, String]
_ <- Concurrent[F].start(
IO(cmd.!!).attempt
.onError {
case e => IO(println(s"Failed to run container $name: $e"))
}
.to[F]
.flatMap(_.fold(_ => Applicative[F].unit, idPromise.complete)))
_ <- log(s"$name signaled docker container to start")
containerId <- idPromise.get
_ <- log(s"$name docker container started $containerId")
} yield containerId
for {
containerId <- EitherT.liftF(container)
logPath <- getLogPath(containerId)
stream = FileStream.stream[F](logPath)
} yield stream
} }
def run(image: String, name: String, peer: String, ipfsHash: ByteVector) =
// for {
// path <- EitherT(IO(Paths.get(s"/tmp/$name")).attempt.to[F])
// binary <- fetchTo(ipfsHash, path)
// container <- EitherT.liftF(
// Concurrent[F].start(IO(dockerCmd(image, name, peer, path)).to[F]))
// } yield ???
???
} }