This commit is contained in:
folex 2019-06-15 19:39:17 +02:00
parent b2eee488f7
commit 5ea70dfe32
3 changed files with 35 additions and 23 deletions

View File

@ -79,9 +79,9 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
.get(System.getProperty("user.home"), s".salmon/$name") .get(System.getProperty("user.home"), s".salmon/$name")
.toAbsolutePath).attempt.to[F]) .toAbsolutePath).attempt.to[F])
_ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F]) _ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F])
path = baseDir.resolve("genesis.json") genesisPath = baseDir.resolve("genesis.json")
_ <- EitherT(IO(Files.write(path, genesis.getBytes())).attempt.to[F]) _ <- EitherT(IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F])
_ <- log(s"$name saved genesis -> $path") _ <- log(s"$name saved genesis -> $genesisPath")
binaryHash <- EitherT.fromEither( binaryHash <- EitherT.fromEither(
ByteVector ByteVector
@ -96,7 +96,7 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
status <- status(name, peer) status <- status(name, peer)
_ <- log(s"$name got peer status") _ <- log(s"$name got peer status")
containerId <- runner.run(name, p2pPeer(status), binaryPath) containerId <- runner.run(name, p2pPeer(peer, status), binaryPath, genesisPath)
_ <- log(s"$name container started $containerId") _ <- log(s"$name container started $containerId")
app = App(name, containerId, peer, binaryHash, binaryPath) app = App(name, containerId, peer, binaryHash, binaryPath)
@ -123,10 +123,10 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
) )
} }
private def p2pPeer(status: TendermintStatus) = { private def p2pPeer(peer: Peer, status: TendermintStatus) = {
val id = status.node_info.id val id = status.node_info.id
val endpoint = status.node_info.listen_addr.replace("tcp://", "") val port = status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head
s"$id@$endpoint" s"$id@${peer.host}:$port"
} }
private def dumpGenesis(appName: String, private def dumpGenesis(appName: String,

View File

@ -26,7 +26,7 @@ object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = { override def run(args: List[String]): IO[ExitCode] = {
val ipfsUri = Uri("ipfs.fluence.one", 5001) val ipfsUri = Uri("ipfs2.fluence.one", 5001)
sttpResource.use { implicit sttp => sttpResource.use { implicit sttp =>
val ipfsStore = IpfsStore[IO](ipfsUri) val ipfsStore = IpfsStore[IO](ipfsUri)

View File

@ -27,48 +27,59 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
// nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656 // nsd --log_level "debug" start --moniker stage-02 --address tcp://0.0.0.0:26658 --p2p.laddr tcp://0.0.0.0:26656 --rpc.laddr tcp://0.0.0.0:26657 --p2p.persistent_peers d53cf2cb91514edb41441503e8a11f004023f2ee@207.154.210.151:26656
val ContainerImage = "cosmos-runner" val ContainerImage = "folexflu/cosmos-runner"
private def nextPortThousand = private def nextPortThousand =
lastPort.modify(p => (p + 1 toShort, p * 1000 toShort)) lastPort.modify(p => (p + 1 toShort, p * 1000 toShort))
private def dockerCmd(name: String, peer: String, binaryPath: Path) = private def dockerCmd(name: String, peer: String, binaryPath: Path, genesisPath: Path) =
for { for {
portThousand <- nextPortThousand portThousand <- nextPortThousand
process = DockerParams
cmd = DockerParams
.build() .build()
.port(30656 + portThousand toShort, 26656) .port(30656 + portThousand toShort, 26656)
.port(30657 + portThousand toShort, 26657) .port(30657 + portThousand toShort, 26657)
.port(30658 + portThousand toShort, 26658) .port(30658 + portThousand toShort, 26658)
.option("--name", name) .option("--name", name)
.option("-e", s"PEER=$peer") //TODO: Add $PEER usage to docker script .option("-e", s"PEER=$peer")
.volume(binaryPath.toAbsolutePath.toString, "/binary") //TODO: download binary from IPFS .option("-e", s"MONIKER=$name")
.prepared(DockerImage(ContainerImage, "latest")) .volume(binaryPath.toAbsolutePath.toString, "/binary")
.volume(genesisPath.toAbsolutePath.toString, "/root/genesis.json")
.prepared(DockerImage(ContainerImage, "ubuntu"))
.daemonRun() .daemonRun()
.process _ = println(s"Running docker ${cmd.command.mkString(" ")}")
process = cmd.process
} yield process } yield process
private def getLogPath(containerId: String): EitherT[F, Throwable, Path] = private def getLogPath(containerId: String): EitherT[F, Throwable, Path] =
EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F]) EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F])
.subflatMap(inspect => parse(inspect)) .subflatMap { inspect =>
.subflatMap( parse(inspect)
_.asArray }
.subflatMap { json =>
json.asArray
.flatMap(_.headOption) .flatMap(_.headOption)
.fold( .fold(
new Exception(s"Can't parse array from docker inspect $containerId") new Exception(s"Can't parse array from docker inspect $containerId")
.asLeft[Json] .asLeft[Json]
)(_.asRight) )(_.asRight)
) }
.subflatMap(_.as[String]) .subflatMap { json =>
.flatMap(p => EitherT(IO(Paths.get(p)).attempt.to[F])) json.hcursor.get[String]("LogPath")
}
.flatMap { p =>
EitherT(IO(Paths.get(p)).attempt.to[F])
}
private def log(str: String) = IO(println(str)).to[F] private def log(str: String) = IO(println(str)).to[F]
def run(name: String, def run(name: String,
peer: String, peer: String,
binaryPath: Path): EitherT[F, Throwable, String] = { binaryPath: Path,
genesisPath: Path): EitherT[F, Throwable, String] = {
val containerId = for { val containerId = for {
cmd <- dockerCmd(name, peer, binaryPath) cmd <- dockerCmd(name, peer, binaryPath, genesisPath)
_ <- log(s"$name got dockerCmd") _ <- log(s"$name got dockerCmd")
idPromise <- Deferred[F, Either[Throwable, String]] idPromise <- Deferred[F, Either[Throwable, String]]
_ <- Concurrent[F].start( _ <- Concurrent[F].start(
@ -101,6 +112,7 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] =
for { for {
logPath <- getLogPath(containerId) logPath <- getLogPath(containerId)
_ = println(s"$containerId logPath: $logPath")
stream = FileStream.stream[F](logPath) stream = FileStream.stream[F](logPath)
} yield stream } yield stream
} }