diff --git a/src/main/scala/hackhack/AppRegistry.scala b/src/main/scala/hackhack/AppRegistry.scala index b685271..33c4b02 100644 --- a/src/main/scala/hackhack/AppRegistry.scala +++ b/src/main/scala/hackhack/AppRegistry.scala @@ -4,10 +4,12 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Path, Paths} import java.util.concurrent.Executors -import cats.Monad +import cats.{Monad, Traverse} import cats.data.EitherT import cats.effect._ import cats.syntax.functor._ +import cats.syntax.applicative._ +import cats.instances.list._ import cats.syntax.flatMap._ import cats.syntax.applicative._ import cats.effect.concurrent.{Deferred, Ref} @@ -45,6 +47,32 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?], fs2.Stream[F, ByteBuffer]]) { + private def putApp(app: App) = + for { + d <- Deferred[F, App] + _ <- d.complete(app) + _ <- apps.update( + map => + map + .get(app.name) + .fold(map.updated(app.name, d))(_ => map)) + } yield () + + def loadExistingContainers(): EitherT[F, Throwable, Unit] = { + (for { + existingApps <- runner.listFishermenContainers + _ = existingApps.foreach(a => println(s"Existing app: $a")) + _ <- EitherT.liftF[F, Throwable, Unit] { + Traverse[List] + .sequence(existingApps.map(putApp)) + .void + } + } yield ()).leftMap { e => + println(s"Error loadExistingContainers: $e") + new Exception(s"Error on loading existing containers: $e", e) + } + } + def stream(name: String): EitherT[F, Throwable, fs2.Stream[F, Log]] = for { app <- EitherT(getApp(name)) @@ -80,12 +108,14 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( .toAbsolutePath).attempt.to[F]) _ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F]) genesisPath = baseDir.resolve("genesis.json") - _ <- EitherT(IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F]) + _ <- EitherT( + IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F]) _ <- log(s"$name saved genesis -> $genesisPath") - binaryHash <- EitherT.fromEither( + binaryHash <- EitherT.fromEither[F]( ByteVector - .fromBase58Descriptive(hash).map(_.drop(2)) + .fromBase58Descriptive(hash) + .map(_.drop(2)) .leftMap(e => new Exception(s"Failed to decode binary hash from base64: $e"): Throwable)) @@ -96,7 +126,12 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( status <- status(name, peer) _ <- log(s"$name got peer status") - containerId <- runner.run(name, p2pPeer(peer, status), binaryPath, genesisPath) + containerId <- runner.run(name, + p2pPeer(peer, status), + peer.rpcPort, + binaryHash, + binaryPath, + genesisPath) _ <- log(s"$name container started $containerId") app = App(name, containerId, peer, binaryHash, binaryPath) @@ -125,7 +160,8 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( private def p2pPeer(peer: Peer, status: TendermintStatus) = { val id = status.node_info.id - val port = status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head + val port = + status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head s"$id@${peer.host}:$port" } diff --git a/src/main/scala/hackhack/Log.scala b/src/main/scala/hackhack/Log.scala index dfd6f25..4757329 100644 --- a/src/main/scala/hackhack/Log.scala +++ b/src/main/scala/hackhack/Log.scala @@ -16,18 +16,23 @@ object Log { /* I[2019-06-13|01:07:31.417] Committed state module=state height=7096 txs=1 appHash=4A936D7C00A37D66C7CE38A1118E0BC31C97BEE68C70BA8A9365EE0126079DAE E[2019-06-13|01:07:33.678] CONSENSUS FAILURE!!! module=consensus err="Panicked on a Consensus Failure: +2/3 committed an invalid block: Wrong Block.Header.AppHash. Expected 4A936D7C00A37D66C7CE38A1118E0BC31C97BEE68C70BA8A9365EE0126079DAE, got 0113492DE4944A90AB6AB3B64D04EAB2D9257D8613FABCACB3CB341CAF79D490" +panic: Failed to process committed block (7078:EF9EDA6F850812E04B8DB99BA70351084A613F3E3C30CCEB0C9913C736AE789D): Wrong Block.Header.AppHash. Expected 193246169176A238ADB9BD29130879B3B105B62C1B0DA7AC8D3E420FD1C3D959, got AEACE20159424C49BDD1FE769278D78CE8B412F04A34099ADF0016D5D7636CB4 */ private val commitRegex = """(?s).*?Committed state\s+module=state height=(\d+) txs=\d+ appHash=(\S+).*""".r private val consensusFailedRegex = """(?s).*?CONSENSUS FAILURE.*?Expected ([A-Z0-9]+), got ([A-Z0-9]+).*""".r + private val syncAppHashFailedRegex = + """(?s).*Failed to process committed block \((\d+):.*?Expected (\S+), got (\S+)""".r def apply(appName: String, line: String): Option[Log] = line match { case commitRegex(height, hash) => new Log(appName, Try(height.toLong).toOption, hash, None, true).some case consensusFailedRegex(expected, got) => - new Log(appName, None, got, Some(expected), false).some + new Log(appName, None, got, expected.some, false).some + case syncAppHashFailedRegex(height, expected, got) => + new Log(appName, height.toLong.some, got, Some(expected), false).some case str => println(s"skipping $str") None diff --git a/src/main/scala/hackhack/Main.scala b/src/main/scala/hackhack/Main.scala index 4ed852b..805d039 100644 --- a/src/main/scala/hackhack/Main.scala +++ b/src/main/scala/hackhack/Main.scala @@ -33,6 +33,7 @@ object Main extends IOApp { for { runner <- Runner.make[IO] appRegistry <- AppRegistry.make[IO](ipfsStore, runner) + _ <- appRegistry.loadExistingContainers().value.map(IO.fromEither) _ <- WebsocketServer .make[IO](8080, appRegistry) .use { diff --git a/src/main/scala/hackhack/Runner.scala b/src/main/scala/hackhack/Runner.scala index 8d1c672..8af4d39 100644 --- a/src/main/scala/hackhack/Runner.scala +++ b/src/main/scala/hackhack/Runner.scala @@ -6,18 +6,21 @@ import java.util.concurrent.Executors import cats.data.EitherT import cats.effect._ import cats.effect.concurrent.{Deferred, Ref} +import cats.instances.list._ import cats.syntax.applicativeError._ import cats.syntax.either._ import cats.syntax.flatMap._ import cats.syntax.functor._ -import cats.{Defer, Monad} +import cats.{Defer, Monad, Traverse} import hackhack.docker.params.{DockerImage, DockerParams} import io.circe.Json import io.circe.parser.parse +import scodec.bits.ByteVector import scala.concurrent.ExecutionContext import scala.language.{higherKinds, postfixOps} import scala.sys.process._ +import scala.util.Try case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( lastPort: Ref[F, Short], @@ -32,7 +35,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( private def nextPortThousand = lastPort.modify(p => (p + 1 toShort, p * 1000 toShort)) - private def dockerCmd(name: String, peer: String, binaryPath: Path, genesisPath: Path) = + private def dockerCmd(name: String, + peer: String, + rpcPort: Short, + binaryHash: ByteVector, + binaryPath: Path, + genesisPath: Path) = for { portThousand <- nextPortThousand @@ -41,9 +49,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( .port(30656 + portThousand toShort, 26656) .port(30657 + portThousand toShort, 26657) .port(30658 + portThousand toShort, 26658) - .option("--name", name) + .option("--name", s"fisherman-$name") .option("-e", s"PEER=$peer") + .option("-e", s"RPCPORT=$rpcPort") .option("-e", s"MONIKER=$name") + .option("-e", s"BINARY_HASH=${binaryHash.toHex}") + .option("-e", s"BINARY_PATH=${binaryPath.toAbsolutePath.toString}") .volume(binaryPath.toAbsolutePath.toString, "/binary") .volume(genesisPath.toAbsolutePath.toString, "/root/genesis.json") .prepared(DockerImage(ContainerImage, "ubuntu")) @@ -52,7 +63,7 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( process = cmd.process } yield process - private def getLogPath(containerId: String): EitherT[F, Throwable, Path] = + private def inspect(containerId: String): EitherT[F, Throwable, Json] = EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F]) .subflatMap { inspect => parse(inspect) @@ -65,6 +76,9 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( .asLeft[Json] )(_.asRight) } + + private def getLogPath(containerId: String): EitherT[F, Throwable, Path] = + inspect(containerId) .subflatMap { json => json.hcursor.get[String]("LogPath") } @@ -72,14 +86,60 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( EitherT(IO(Paths.get(p)).attempt.to[F]) } + def listFishermenContainers: EitherT[F, Throwable, List[App]] = + for { + ps <- EitherT( + IO( + Seq("docker", + "ps", + "-a", + "-f", + "name=fisherman", + "--format", + "{{.Names}} {{.ID}}").!!).attempt.to[F]) + (names, ids) = ps + .split("\n") + .filter(_.nonEmpty) + .toList + .map(_.split(" ")) + .map(a => a.head -> a.last) + .unzip + inspects <- Traverse[List].sequence(ids.map(inspect)) + envs <- Traverse[List] + .sequence(inspects.map( + _.hcursor.downField("Config").get[List[String]]("Env").toEitherT[F])) + .leftMap(identity[Throwable]) + + envMap <- Try( + envs.map(_.map(_.split("=")).map(a => a.head -> a.last).toMap)).toEither + .toEitherT[F] + + apps <- Try { + ids.zip(names).zip(envMap).map { + case ((id, name), env) => + val peer = env + .get("PEER") + .map(_.split(Array('@', ':'))) + .map(a => Peer(a(1), a(2).toShort)) + .get + val binaryHash = ByteVector.fromValidHex(env("BINARY_HASH")) + val binaryPath = Paths.get(env("BINARY_PATH")) + val cleanName = name.replace("fisherman-", "") + App(cleanName, id, peer, binaryHash, binaryPath) + } + }.toEither.toEitherT[F] + } yield apps + private def log(str: String) = IO(println(str)).to[F] def run(name: String, peer: String, + rpcPort: Short, + binaryHash: ByteVector, binaryPath: Path, genesisPath: Path): EitherT[F, Throwable, String] = { val containerId = for { - cmd <- dockerCmd(name, peer, binaryPath, genesisPath) + cmd <- dockerCmd(name, peer, rpcPort, binaryHash, binaryPath, genesisPath) _ <- log(s"$name got dockerCmd") idPromise <- Deferred[F, Either[Throwable, String]] _ <- Concurrent[F].start( @@ -108,13 +168,30 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( EitherT(containerId) } - def streamLog( + def streamFileLog( containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = for { logPath <- getLogPath(containerId) _ = println(s"$containerId logPath: $logPath") stream = FileStream.stream[F](logPath) } yield stream + + def streamLog( + containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = { + val errors = scala.collection.mutable.ArrayBuffer.empty[String] + for { + lines <- EitherT( + IO(s"docker logs -f $containerId".lineStream(new ProcessLogger { + override def out(s: => String): Unit = {} + + override def err(s: => String): Unit = errors += s + + override def buffer[T](f: => T): T = f + })).attempt.to[F]) + stream = fs2.Stream.fromIterator(lines.iterator) ++ fs2.Stream.emits( + errors) + } yield stream + } } object Runner { diff --git a/src/main/scala/hackhack/WebsocketServer.scala b/src/main/scala/hackhack/WebsocketServer.scala index 1161bbe..448b8fa 100644 --- a/src/main/scala/hackhack/WebsocketServer.scala +++ b/src/main/scala/hackhack/WebsocketServer.scala @@ -1,14 +1,11 @@ package hackhack -import java.nio.file.Paths - import cats.effect.concurrent.Ref import cats.effect.{ExitCode, Resource, _} import cats.syntax.applicativeError._ import cats.syntax.flatMap._ import fs2.Stream import fs2.concurrent.SignallingRef -import hackhack.ipfs.{IpfsClient, IpfsStore} import io.circe.syntax._ import org.http4s.HttpRoutes import org.http4s.dsl.Http4sDsl @@ -17,7 +14,6 @@ import org.http4s.server.blaze.BlazeServerBuilder import org.http4s.server.middleware.{CORS, CORSConfig} import org.http4s.server.websocket.WebSocketBuilder import org.http4s.websocket.WebSocketFrame.Text -import io.circe.parser.parse import scala.concurrent.duration._ import scala.language.higherKinds @@ -32,6 +28,7 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( case GET -> Root / "websocket" / appName => appRegistry.stream(appName).value.flatMap { case Left(e) => + println(s"Error while getting stream for $appName: $e") InternalServerError(s"Error while getting stream for $appName: $e") case Right(stream) => @@ -48,6 +45,7 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( .value .flatMap { case Left(e) => + println(s"Error while running app $appName: $e") InternalServerError(s"Error while running app $appName: $e") case Right(height) => @@ -59,7 +57,6 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( } // TODO: list of registered apps - // TODO: endpoint for consensusHeight } def close(): F[Unit] = signal.set(true)