diff --git a/src/main/scala/hackhack/AppRegistry.scala b/src/main/scala/hackhack/AppRegistry.scala new file mode 100644 index 0000000..833faed --- /dev/null +++ b/src/main/scala/hackhack/AppRegistry.scala @@ -0,0 +1,118 @@ +package hackhack + +import java.nio.ByteBuffer +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.Executors + +import cats.Monad +import cats.data.EitherT +import cats.effect._ +import cats.effect.concurrent.Ref +import cats.syntax.applicativeError._ +import cats.syntax.either._ +import cats.syntax.option._ +import com.softwaremill.sttp.{SttpBackend, Uri, sttp} +import hackhack.ipfs.{IpfsError, IpfsStore} +import io.circe.Json +import io.circe.parser.parse +import scodec.bits.ByteVector + +import scala.concurrent.ExecutionContext +import scala.language.higherKinds + +case class Peer( + host: String, + p2pPort: Short, + rpcPort: Short, + p2pKey: String +) { + val RpcUri = Uri(s"http://$host:$rpcPort") + val P2pSeed = s"$p2pKey@$host:$p2pPort" +} + +case class App(name: String, + containerId: String, + rpcPort: Short, + seed: String, + binaryHash: ByteVector, + binaryPath: Path) + +class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( + ipfsStore: IpfsStore[F], + runner: Runner[F], + apps: Ref[F, Map[String, App]], + blockingCtx: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()))( + implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?], + fs2.Stream[F, ByteBuffer]]) { + + def log(str: String) = EitherT(IO(println(str)).attempt.to[F]) + + // Returns consensusHeight + def run(name: String, + peer: Peer, + hash: ByteVector): EitherT[F, Throwable, Long] = + for { + genesis <- dumpGenesis(name, peer) + _ <- log(s"$name dumped genesis") + baseDir <- EitherT( + IO( + Paths + .get(System.getProperty("user.home"), s".salmon/$name") + .toAbsolutePath).attempt.to[F]) + _ <- EitherT(IO(baseDir.toFile.mkdir()).attempt.to[F]) + path = baseDir.resolve("genesis.json") + _ <- EitherT(IO(Files.write(path, genesis.getBytes())).attempt.to[F]) + _ <- log(s"$name saved genesis") + binaryPath = baseDir.resolve("binary") + _ <- fetchTo(hash, binaryPath).leftMap(identity[Throwable]) + _ <- log(s"$name binary downloaded $binaryPath") + height <- consensusHeight(name, peer) + } yield height + + def consensusHeight(appName: String, + peer: Peer): EitherT[F, Throwable, Long] = { + rpc(appName, peer, "/status").subflatMap( + _.hcursor + .downField("result") + .downField("sync_info") + .get[Long]("latest_block_height") + ) + } + + def dumpGenesis(appName: String, + peer: Peer): EitherT[F, Throwable, String] = { + rpc(appName, peer, "/genesis").subflatMap( + _.hcursor.downField("result").get[String]("genesis")) + } + + def rpc(appName: String, + peer: Peer, + path: String): EitherT[F, Throwable, Json] = Backoff.default.retry( + sttp + .get(peer.RpcUri.path(path)) + .send[EitherT[F, Throwable, ?]] + .subflatMap( + _.body + .leftMap(e => + new Exception(s"Error RPC $path $appName: ${peer.RpcUri}: $e")) + .flatMap(s => parse(s)) + ), + (e: Throwable) => + Monad[F].pure(println(s"Error RPC $path $appName: ${peer.RpcUri}: $e")), + max = 10 + ) + + def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = { + ipfsStore + .fetch(hash) + .flatMap( + _.flatMap(bb ⇒ fs2.Stream.chunk(fs2.Chunk.byteBuffer(bb))) + .through(fs2.io.file.writeAll[F](dest, blockingCtx)) + .compile + .drain + .attemptT + .leftMap(e => IpfsError("fetchTo", e.some)) + ) + } +} diff --git a/src/main/scala/hackhack/Backoff.scala b/src/main/scala/hackhack/Backoff.scala index 37bcdc1..ee51f5b 100644 --- a/src/main/scala/hackhack/Backoff.scala +++ b/src/main/scala/hackhack/Backoff.scala @@ -1,41 +1,64 @@ package hackhack -import cats.{Applicative, Monad} +import cats.{Applicative, Functor, Monad} import cats.data.EitherT import cats.effect.Timer import cats.syntax.flatMap._ import cats.syntax.apply._ +import cats.syntax.applicative._ +import cats.syntax.either._ +import cats.instances.either._ +import cats.effect.syntax.all._ import scala.concurrent.duration._ import scala.language.higherKinds /** - * Exponential backoff delays. - * - * @param delayPeriod will be applied next time - * @param maxDelay upper bound for a single delay - */ + * Exponential backoff delays. + * + * @param delayPeriod will be applied next time + * @param maxDelay upper bound for a single delay + */ case class Backoff[E](delayPeriod: FiniteDuration, maxDelay: FiniteDuration) { /** - * Next retry policy with delayPeriod multiplied times two, if maxDelay is not yet reached - */ + * Next retry policy with delayPeriod multiplied times two, if maxDelay is not yet reached + */ def next: Backoff[E] = if (delayPeriod == maxDelay) this else { val nextDelay = delayPeriod * 2 - if (nextDelay > maxDelay) copy(delayPeriod = maxDelay) else copy(delayPeriod = nextDelay) + if (nextDelay > maxDelay) copy(delayPeriod = maxDelay) + else copy(delayPeriod = nextDelay) } - def retry[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T], onError: EE ⇒ F[Unit]): F[T] = + def retry[F[_]: Timer: Monad: Functor, EE <: E, T]( + fn: EitherT[F, EE, T], + onError: EE ⇒ F[Unit], + max: Int): EitherT[F, EE, T] = + fn.biflatMap( + err => + if (max == 0) EitherT(err.asLeft.pure[F]) + else + EitherT + .right[EE](onError(err) *> Timer[F].sleep(delayPeriod)) + .flatMap(_ => next.retry(fn, onError, max - 1)), + EitherT.pure(_) + ) + + def retryForever[F[_]: Timer: Monad, EE <: E, T]( + fn: EitherT[F, EE, T], + onError: EE ⇒ F[Unit]): F[T] = fn.value.flatMap { case Right(value) ⇒ Applicative[F].pure(value) case Left(err) ⇒ - onError(err) *> Timer[F].sleep(delayPeriod) *> next.retry(fn, onError) + onError(err) *> Timer[F].sleep(delayPeriod) *> next.retryForever( + fn, + onError) } def apply[F[_]: Timer: Monad, EE <: E, T](fn: EitherT[F, EE, T]): F[T] = - retry(fn, (_: EE) ⇒ Applicative[F].unit) + retryForever(fn, (_: EE) ⇒ Applicative[F].unit) } diff --git a/src/main/scala/hackhack/Runner.scala b/src/main/scala/hackhack/Runner.scala index 9e920ae..e26ef25 100644 --- a/src/main/scala/hackhack/Runner.scala +++ b/src/main/scala/hackhack/Runner.scala @@ -5,20 +5,21 @@ import java.util.concurrent.Executors import cats.data.EitherT import cats.effect._ -import cats.effect.concurrent.Ref +import cats.effect.concurrent.{Deferred, Ref} import cats.syntax.applicativeError._ -import cats.syntax.option._ +import cats.syntax.either._ +import cats.syntax.flatMap._ import cats.syntax.functor._ -import cats.{Defer, Monad} +import cats.{Applicative, Defer, Monad} import hackhack.docker.params.{DockerImage, DockerParams} -import hackhack.ipfs.{IpfsError, IpfsStore} -import scodec.bits.ByteVector +import io.circe.Json +import io.circe.parser.parse import scala.concurrent.ExecutionContext import scala.language.{higherKinds, postfixOps} +import scala.sys.process._ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( - ipfsStore: IpfsStore[F], lastPort: Ref[F, Short], blockingCtx: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) @@ -26,13 +27,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( // nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656 + val ContainerImage = "cosmos-runner" + private def nextPortThousand = lastPort.modify(p => (p + 1 toShort, p * 1000 toShort)) - private def dockerCmd(image: String, - name: String, - peer: String, - binaryPath: Path) = + private def dockerCmd(name: String, peer: String, binaryPath: Path) = for { portThousand <- nextPortThousand process = DockerParams @@ -43,31 +43,51 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( .option("--name", name) .option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script .volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS - .prepared(DockerImage(image, "latest")) + .prepared(DockerImage(ContainerImage, "latest")) .daemonRun() .process } yield process - def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = { - ipfsStore - .fetch(hash) - .flatMap( - _.flatMap(bb ⇒ fs2.Stream.chunk(fs2.Chunk.byteBuffer(bb))) - .through(fs2.io.file.writeAll[F](dest, blockingCtx)) - .compile - .drain - .attemptT - .leftMap(e => IpfsError("fetchTo", e.some)) + private def getLogPath(containerId: String): EitherT[F, Throwable, Path] = + EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F]) + .subflatMap(inspect => parse(inspect)) + .subflatMap( + _.asArray + .flatMap(_.headOption) + .fold( + new Exception(s"Can't parse array from docker inspect $containerId") + .asLeft[Json] + )(_.asRight) ) + .subflatMap(_.as[String]) + .flatMap(p => EitherT(IO(Paths.get(p)).attempt.to[F])) + + private def log(str: String) = IO(println(str)).to[F] + + def run(name: String, + peer: String, + binaryPath: Path): EitherT[F, Throwable, fs2.Stream[F, String]] = { + val container = for { + cmd <- dockerCmd(name, peer, binaryPath) + _ <- log(s"$name got dockerCmd") + idPromise <- Deferred[F, String] + _ <- Concurrent[F].start( + IO(cmd.!!).attempt + .onError { + case e => IO(println(s"Failed to run container $name: $e")) + } + .to[F] + .flatMap(_.fold(_ => Applicative[F].unit, idPromise.complete))) + _ <- log(s"$name signaled docker container to start") + containerId <- idPromise.get + _ <- log(s"$name docker container started $containerId") + } yield containerId + + for { + containerId <- EitherT.liftF(container) + logPath <- getLogPath(containerId) + stream = FileStream.stream[F](logPath) + } yield stream } - def run(image: String, name: String, peer: String, ipfsHash: ByteVector) = -// for { -// path <- EitherT(IO(Paths.get(s"/tmp/$name")).attempt.to[F]) -// binary <- fetchTo(ipfsHash, path) -// container <- EitherT.liftF( -// Concurrent[F].start(IO(dockerCmd(image, name, peer, path)).to[F])) -// } yield ??? - ??? - }