This commit is contained in:
folex 2019-06-15 22:00:38 +02:00
parent 5ea70dfe32
commit d5caf69dfd
5 changed files with 134 additions and 18 deletions

View File

@ -4,10 +4,12 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.Executors
import cats.Monad
import cats.{Monad, Traverse}
import cats.data.EitherT
import cats.effect._
import cats.syntax.functor._
import cats.syntax.applicative._
import cats.instances.list._
import cats.syntax.flatMap._
import cats.syntax.applicative._
import cats.effect.concurrent.{Deferred, Ref}
@ -45,6 +47,32 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?],
fs2.Stream[F, ByteBuffer]]) {
private def putApp(app: App) =
for {
d <- Deferred[F, App]
_ <- d.complete(app)
_ <- apps.update(
map =>
map
.get(app.name)
.fold(map.updated(app.name, d))(_ => map))
} yield ()
def loadExistingContainers(): EitherT[F, Throwable, Unit] = {
(for {
existingApps <- runner.listFishermenContainers
_ = existingApps.foreach(a => println(s"Existing app: $a"))
_ <- EitherT.liftF[F, Throwable, Unit] {
Traverse[List]
.sequence(existingApps.map(putApp))
.void
}
} yield ()).leftMap { e =>
println(s"Error loadExistingContainers: $e")
new Exception(s"Error on loading existing containers: $e", e)
}
}
def stream(name: String): EitherT[F, Throwable, fs2.Stream[F, Log]] =
for {
app <- EitherT(getApp(name))
@ -80,12 +108,14 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
.toAbsolutePath).attempt.to[F])
_ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F])
genesisPath = baseDir.resolve("genesis.json")
_ <- EitherT(IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F])
_ <- EitherT(
IO(Files.write(genesisPath, genesis.getBytes())).attempt.to[F])
_ <- log(s"$name saved genesis -> $genesisPath")
binaryHash <- EitherT.fromEither(
binaryHash <- EitherT.fromEither[F](
ByteVector
.fromBase58Descriptive(hash).map(_.drop(2))
.fromBase58Descriptive(hash)
.map(_.drop(2))
.leftMap(e =>
new Exception(s"Failed to decode binary hash from base64: $e"): Throwable))
@ -96,7 +126,12 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
status <- status(name, peer)
_ <- log(s"$name got peer status")
containerId <- runner.run(name, p2pPeer(peer, status), binaryPath, genesisPath)
containerId <- runner.run(name,
p2pPeer(peer, status),
peer.rpcPort,
binaryHash,
binaryPath,
genesisPath)
_ <- log(s"$name container started $containerId")
app = App(name, containerId, peer, binaryHash, binaryPath)
@ -125,7 +160,8 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
private def p2pPeer(peer: Peer, status: TendermintStatus) = {
val id = status.node_info.id
val port = status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head
val port =
status.node_info.listen_addr.replace("tcp://", "").split(":").tail.head
s"$id@${peer.host}:$port"
}

View File

@ -16,18 +16,23 @@ object Log {
/*
I[2019-06-13|01:07:31.417] Committed state module=state height=7096 txs=1 appHash=4A936D7C00A37D66C7CE38A1118E0BC31C97BEE68C70BA8A9365EE0126079DAE
E[2019-06-13|01:07:33.678] CONSENSUS FAILURE!!! module=consensus err="Panicked on a Consensus Failure: +2/3 committed an invalid block: Wrong Block.Header.AppHash. Expected 4A936D7C00A37D66C7CE38A1118E0BC31C97BEE68C70BA8A9365EE0126079DAE, got 0113492DE4944A90AB6AB3B64D04EAB2D9257D8613FABCACB3CB341CAF79D490"
panic: Failed to process committed block (7078:EF9EDA6F850812E04B8DB99BA70351084A613F3E3C30CCEB0C9913C736AE789D): Wrong Block.Header.AppHash. Expected 193246169176A238ADB9BD29130879B3B105B62C1B0DA7AC8D3E420FD1C3D959, got AEACE20159424C49BDD1FE769278D78CE8B412F04A34099ADF0016D5D7636CB4
*/
private val commitRegex =
"""(?s).*?Committed state\s+module=state height=(\d+) txs=\d+ appHash=(\S+).*""".r
private val consensusFailedRegex =
"""(?s).*?CONSENSUS FAILURE.*?Expected ([A-Z0-9]+), got ([A-Z0-9]+).*""".r
private val syncAppHashFailedRegex =
"""(?s).*Failed to process committed block \((\d+):.*?Expected (\S+), got (\S+)""".r
def apply(appName: String, line: String): Option[Log] = line match {
case commitRegex(height, hash) =>
new Log(appName, Try(height.toLong).toOption, hash, None, true).some
case consensusFailedRegex(expected, got) =>
new Log(appName, None, got, Some(expected), false).some
new Log(appName, None, got, expected.some, false).some
case syncAppHashFailedRegex(height, expected, got) =>
new Log(appName, height.toLong.some, got, Some(expected), false).some
case str =>
println(s"skipping $str")
None

View File

@ -33,6 +33,7 @@ object Main extends IOApp {
for {
runner <- Runner.make[IO]
appRegistry <- AppRegistry.make[IO](ipfsStore, runner)
_ <- appRegistry.loadExistingContainers().value.map(IO.fromEither)
_ <- WebsocketServer
.make[IO](8080, appRegistry)
.use {

View File

@ -6,18 +6,21 @@ import java.util.concurrent.Executors
import cats.data.EitherT
import cats.effect._
import cats.effect.concurrent.{Deferred, Ref}
import cats.instances.list._
import cats.syntax.applicativeError._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.{Defer, Monad}
import cats.{Defer, Monad, Traverse}
import hackhack.docker.params.{DockerImage, DockerParams}
import io.circe.Json
import io.circe.parser.parse
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext
import scala.language.{higherKinds, postfixOps}
import scala.sys.process._
import scala.util.Try
case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
lastPort: Ref[F, Short],
@ -32,7 +35,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
private def nextPortThousand =
lastPort.modify(p => (p + 1 toShort, p * 1000 toShort))
private def dockerCmd(name: String, peer: String, binaryPath: Path, genesisPath: Path) =
private def dockerCmd(name: String,
peer: String,
rpcPort: Short,
binaryHash: ByteVector,
binaryPath: Path,
genesisPath: Path) =
for {
portThousand <- nextPortThousand
@ -41,9 +49,12 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
.port(30656 + portThousand toShort, 26656)
.port(30657 + portThousand toShort, 26657)
.port(30658 + portThousand toShort, 26658)
.option("--name", name)
.option("--name", s"fisherman-$name")
.option("-e", s"PEER=$peer")
.option("-e", s"RPCPORT=$rpcPort")
.option("-e", s"MONIKER=$name")
.option("-e", s"BINARY_HASH=${binaryHash.toHex}")
.option("-e", s"BINARY_PATH=${binaryPath.toAbsolutePath.toString}")
.volume(binaryPath.toAbsolutePath.toString, "/binary")
.volume(genesisPath.toAbsolutePath.toString, "/root/genesis.json")
.prepared(DockerImage(ContainerImage, "ubuntu"))
@ -52,7 +63,7 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
process = cmd.process
} yield process
private def getLogPath(containerId: String): EitherT[F, Throwable, Path] =
private def inspect(containerId: String): EitherT[F, Throwable, Json] =
EitherT(IO(s"docker inspect $containerId".!!).attempt.to[F])
.subflatMap { inspect =>
parse(inspect)
@ -65,6 +76,9 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
.asLeft[Json]
)(_.asRight)
}
private def getLogPath(containerId: String): EitherT[F, Throwable, Path] =
inspect(containerId)
.subflatMap { json =>
json.hcursor.get[String]("LogPath")
}
@ -72,14 +86,60 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
EitherT(IO(Paths.get(p)).attempt.to[F])
}
def listFishermenContainers: EitherT[F, Throwable, List[App]] =
for {
ps <- EitherT(
IO(
Seq("docker",
"ps",
"-a",
"-f",
"name=fisherman",
"--format",
"{{.Names}} {{.ID}}").!!).attempt.to[F])
(names, ids) = ps
.split("\n")
.filter(_.nonEmpty)
.toList
.map(_.split(" "))
.map(a => a.head -> a.last)
.unzip
inspects <- Traverse[List].sequence(ids.map(inspect))
envs <- Traverse[List]
.sequence(inspects.map(
_.hcursor.downField("Config").get[List[String]]("Env").toEitherT[F]))
.leftMap(identity[Throwable])
envMap <- Try(
envs.map(_.map(_.split("=")).map(a => a.head -> a.last).toMap)).toEither
.toEitherT[F]
apps <- Try {
ids.zip(names).zip(envMap).map {
case ((id, name), env) =>
val peer = env
.get("PEER")
.map(_.split(Array('@', ':')))
.map(a => Peer(a(1), a(2).toShort))
.get
val binaryHash = ByteVector.fromValidHex(env("BINARY_HASH"))
val binaryPath = Paths.get(env("BINARY_PATH"))
val cleanName = name.replace("fisherman-", "")
App(cleanName, id, peer, binaryHash, binaryPath)
}
}.toEither.toEitherT[F]
} yield apps
private def log(str: String) = IO(println(str)).to[F]
def run(name: String,
peer: String,
rpcPort: Short,
binaryHash: ByteVector,
binaryPath: Path,
genesisPath: Path): EitherT[F, Throwable, String] = {
val containerId = for {
cmd <- dockerCmd(name, peer, binaryPath, genesisPath)
cmd <- dockerCmd(name, peer, rpcPort, binaryHash, binaryPath, genesisPath)
_ <- log(s"$name got dockerCmd")
idPromise <- Deferred[F, Either[Throwable, String]]
_ <- Concurrent[F].start(
@ -108,13 +168,30 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent](
EitherT(containerId)
}
def streamLog(
def streamFileLog(
containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] =
for {
logPath <- getLogPath(containerId)
_ = println(s"$containerId logPath: $logPath")
stream = FileStream.stream[F](logPath)
} yield stream
def streamLog(
containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = {
val errors = scala.collection.mutable.ArrayBuffer.empty[String]
for {
lines <- EitherT(
IO(s"docker logs -f $containerId".lineStream(new ProcessLogger {
override def out(s: => String): Unit = {}
override def err(s: => String): Unit = errors += s
override def buffer[T](f: => T): T = f
})).attempt.to[F])
stream = fs2.Stream.fromIterator(lines.iterator) ++ fs2.Stream.emits(
errors)
} yield stream
}
}
object Runner {

View File

@ -1,14 +1,11 @@
package hackhack
import java.nio.file.Paths
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, Resource, _}
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import fs2.Stream
import fs2.concurrent.SignallingRef
import hackhack.ipfs.{IpfsClient, IpfsStore}
import io.circe.syntax._
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
@ -17,7 +14,6 @@ import org.http4s.server.blaze.BlazeServerBuilder
import org.http4s.server.middleware.{CORS, CORSConfig}
import org.http4s.server.websocket.WebSocketBuilder
import org.http4s.websocket.WebSocketFrame.Text
import io.circe.parser.parse
import scala.concurrent.duration._
import scala.language.higherKinds
@ -32,6 +28,7 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
case GET -> Root / "websocket" / appName =>
appRegistry.stream(appName).value.flatMap {
case Left(e) =>
println(s"Error while getting stream for $appName: $e")
InternalServerError(s"Error while getting stream for $appName: $e")
case Right(stream) =>
@ -48,6 +45,7 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
.value
.flatMap {
case Left(e) =>
println(s"Error while running app $appName: $e")
InternalServerError(s"Error while running app $appName: $e")
case Right(height) =>
@ -59,7 +57,6 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
}
// TODO: list of registered apps
// TODO: endpoint for consensusHeight
}
def close(): F[Unit] = signal.set(true)