From 5ea70dfe322836347e190c7daac2fde817eccac5 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Sat, 15 Jun 2019 19:39:17 +0200 Subject: [PATCH] wip --- src/main/scala/hackhack/AppRegistry.scala | 14 ++++---- src/main/scala/hackhack/Main.scala | 2 +- src/main/scala/hackhack/Runner.scala | 42 +++++++++++++++-------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/src/main/scala/hackhack/AppRegistry.scala b/src/main/scala/hackhack/AppRegistry.scala index 13d37b1..b685271 100644 --- a/src/main/scala/hackhack/AppRegistry.scala +++ b/src/main/scala/hackhack/AppRegistry.scala @@ -79,9 +79,9 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( .get(System.getProperty("user.home"), s".salmon/$name") .toAbsolutePath).attempt.to[F]) _ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F]) - path = baseDir.resolve("genesis.json") - _ <- EitherT(IO(Files.write(path, genesis.getBytes())).attempt.to[F]) - _ <- log(s"$name saved genesis -> $path") + genesisPath = baseDir.resolve("genesis.json") + _ <- EitherT(IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F]) + _ <- log(s"$name saved genesis -> $genesisPath") binaryHash <- EitherT.fromEither( ByteVector @@ -96,7 +96,7 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( status <- status(name, peer) _ <- log(s"$name got peer status") - containerId <- runner.run(name, p2pPeer(status), binaryPath) + containerId <- runner.run(name, p2pPeer(peer, status), binaryPath, genesisPath) _ <- log(s"$name container started $containerId") app = App(name, containerId, peer, binaryHash, binaryPath) @@ -123,10 +123,10 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( ) } - private def p2pPeer(status: TendermintStatus) = { + private def p2pPeer(peer: Peer, status: TendermintStatus) = { val id = status.node_info.id - val endpoint = status.node_info.listen_addr.replace("tcp://", "") - s"$id@$endpoint" + val port = status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head + s"$id@${peer.host}:$port" } private def dumpGenesis(appName: String, diff --git a/src/main/scala/hackhack/Main.scala b/src/main/scala/hackhack/Main.scala index e11e448..4ed852b 100644 --- a/src/main/scala/hackhack/Main.scala +++ b/src/main/scala/hackhack/Main.scala @@ -26,7 +26,7 @@ object Main extends IOApp { override def run(args: List[String]): IO[ExitCode] = { - val ipfsUri = Uri("ipfs.fluence.one", 5001) + val ipfsUri = Uri("ipfs2.fluence.one", 5001) sttpResource.use { implicit sttp => val ipfsStore = IpfsStore[IO](ipfsUri) diff --git a/src/main/scala/hackhack/Runner.scala b/src/main/scala/hackhack/Runner.scala index 12184bb..8d1c672 100644 --- a/src/main/scala/hackhack/Runner.scala +++ b/src/main/scala/hackhack/Runner.scala @@ -27,48 +27,59 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( // nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656 - val ContainerImage = "cosmos-runner" + val ContainerImage = "folexflu/cosmos-runner" private def nextPortThousand = lastPort.modify(p => (p + 1 toShort, p * 1000 toShort)) - private def dockerCmd(name: String, peer: String, binaryPath: Path) = + private def dockerCmd(name: String, peer: String, binaryPath: Path, genesisPath: Path) = for { portThousand <- nextPortThousand - process = DockerParams + + cmd = DockerParams .build() .port(30656 + portThousand toShort, 26656) .port(30657 + portThousand toShort, 26657) .port(30658 + portThousand toShort, 26658) .option("--name", name) - .option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script - .volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS - .prepared(DockerImage(ContainerImage, "latest")) + .option("-e", s"PEER=$peer") + .option("-e", s"MONIKER=$name") + .volume(binaryPath.toAbsolutePath.toString, "/binary") + .volume(genesisPath.toAbsolutePath.toString, "/root/genesis.json") + .prepared(DockerImage(ContainerImage, "ubuntu")) .daemonRun() - .process + _ = println(s"Running docker ${cmd.command.mkString(" ")}") + process = cmd.process } yield process private def getLogPath(containerId: String): EitherT[F, Throwable, Path] = EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F]) - .subflatMap(inspect => parse(inspect)) - .subflatMap( - _.asArray + .subflatMap { inspect => + parse(inspect) + } + .subflatMap { json => + json.asArray .flatMap(_.headOption) .fold( new Exception(s"Can't parse array from docker inspect $containerId") .asLeft[Json] )(_.asRight) - ) - .subflatMap(_.as[String]) - .flatMap(p => EitherT(IO(Paths.get(p)).attempt.to[F])) + } + .subflatMap { json => + json.hcursor.get[String]("LogPath") + } + .flatMap { p => + EitherT(IO(Paths.get(p)).attempt.to[F]) + } private def log(str: String) = IO(println(str)).to[F] def run(name: String, peer: String, - binaryPath: Path): EitherT[F, Throwable, String] = { + binaryPath: Path, + genesisPath: Path): EitherT[F, Throwable, String] = { val containerId = for { - cmd <- dockerCmd(name, peer, binaryPath) + cmd <- dockerCmd(name, peer, binaryPath, genesisPath) _ <- log(s"$name got dockerCmd") idPromise <- Deferred[F, Either[Throwable, String]] _ <- Concurrent[F].start( @@ -101,6 +112,7 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = for { logPath <- getLogPath(containerId) + _ = println(s"$containerId logPath: $logPath") stream = FileStream.stream[F](logPath) } yield stream }