From 795689cc928b1924ed5fa31ac254fc88d5696a32 Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Sun, 16 Jun 2019 10:25:27 +0200 Subject: [PATCH] /apps, /file & /block endpoints --- src/main/scala/hackhack/AppRegistry.scala | 49 ++++++++++++++----- src/main/scala/hackhack/WebsocketServer.scala | 38 +++++++++----- 2 files changed, 63 insertions(+), 24 deletions(-) diff --git a/src/main/scala/hackhack/AppRegistry.scala b/src/main/scala/hackhack/AppRegistry.scala index b812132..58bb3dc 100644 --- a/src/main/scala/hackhack/AppRegistry.scala +++ b/src/main/scala/hackhack/AppRegistry.scala @@ -4,23 +4,23 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Path, Paths} import java.util.concurrent.Executors -import cats.{Monad, Traverse} import cats.data.EitherT import cats.effect._ -import cats.syntax.functor._ -import cats.syntax.applicative._ -import cats.instances.list._ -import cats.syntax.flatMap._ -import cats.syntax.applicative._ import cats.effect.concurrent.{Deferred, Ref} +import cats.instances.list._ +import cats.instances.option._ +import cats.syntax.applicative._ import cats.syntax.applicativeError._ import cats.syntax.either._ +import cats.syntax.flatMap._ +import cats.syntax.functor._ import cats.syntax.option._ +import cats.{Monad, Traverse} import com.softwaremill.sttp.{SttpBackend, Uri, sttp} import hackhack.ipfs.{IpfsError, IpfsStore} -import io.circe.{Decoder, Encoder, Json, ObjectEncoder} import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.parser.parse +import io.circe.{Decoder, Encoder, Json, ObjectEncoder} import scodec.bits.ByteVector import scala.concurrent.ExecutionContext @@ -45,11 +45,24 @@ case class App(name: String, binaryPath: Path) object App { - private implicit val encbc: Encoder[ByteVector] = Encoder.encodeString.contramap(_.toHex) - private implicit val encPath: Encoder[Path] = Encoder.encodeString.contramap(_.toString) + private implicit val encbc: Encoder[ByteVector] = + Encoder.encodeString.contramap(_.toHex) + private implicit val encPath: Encoder[Path] = + Encoder.encodeString.contramap(_.toString) implicit val encodeApp: ObjectEncoder[App] = deriveEncoder } +case class AppInfo(name: String, + network: String, + binaryHash: ByteVector, + consensusHeight: Long) + +object AppInfo { + private implicit val encbc: Encoder[ByteVector] = + Encoder.encodeString.contramap(_.toHex) + implicit val encodeAppInfo: Encoder[AppInfo] = deriveEncoder +} + class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( ipfsStore: IpfsStore[F], runner: Runner[F], @@ -150,7 +163,7 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( _ <- EitherT.liftF(deferred.complete(app)) } yield status.sync_info.latest_block_height - def getAllApps: EitherT[F, Throwable, List[(App, Long)]] = { + def getAllApps: EitherT[F, Throwable, List[AppInfo]] = { for { appList <- EitherT.right( apps.get.flatMap(map => @@ -160,10 +173,24 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( status(app.name, app.peer))) } yield appList.zip(statuses).map { - case (app, status) => (app, status.sync_info.latest_block_height) + case (app, status) => + AppInfo(app.name, + status.node_info.network, + app.binaryHash, + status.sync_info.latest_block_height) } } + def getBlock(name: String, height: Long) = + for { + appOpt <- EitherT.right(apps.get.flatMap(map => + Traverse[Option].sequence(map.get(name).map(_.get)))) + app <- appOpt + .fold(new Exception(s"There is no app $name").asLeft[App])(_.asRight) + .toEitherT[F] + block <- rpc(name, app.peer, s"/block?height=$height") + } yield block.spaces2 + private def log(str: String) = EitherT(IO(println(str)).attempt.to[F]) private def getApp(name: String): F[Either[Throwable, App]] = diff --git a/src/main/scala/hackhack/WebsocketServer.scala b/src/main/scala/hackhack/WebsocketServer.scala index 44ac739..b8ff3dd 100644 --- a/src/main/scala/hackhack/WebsocketServer.scala +++ b/src/main/scala/hackhack/WebsocketServer.scala @@ -1,12 +1,13 @@ package hackhack +import java.nio.file.Paths + import cats.effect.concurrent.Ref import cats.effect.{ExitCode, Resource, _} import cats.syntax.applicativeError._ import cats.syntax.flatMap._ import fs2.Stream import fs2.concurrent.SignallingRef -import io.circe.JsonLong import io.circe.syntax._ import org.http4s.HttpRoutes import org.http4s.dsl.Http4sDsl @@ -26,6 +27,19 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( private def routes(): HttpRoutes[F] = HttpRoutes.of[F] { + case GET -> Root / "file" => + val stream = FileStream + .stream[F](Paths.get("/tmp/stream.log")) + .evalTap(line => Sync[F].delay(println(s"line $line"))) + .map(Log("file", _)) + .unNone + .evalTap(log => Sync[F].delay(println(s"log $log"))) + + WebSocketBuilder[F].build( + stream.map(e => Text(e.asJson.noSpaces)), + _.evalMap(e => Sync[F].delay(println(s"from file: $e"))) + ) + case GET -> Root / "websocket" / appName => appRegistry.stream(appName).value.flatMap { case Left(e) => @@ -61,21 +75,19 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( case GET -> Root / "apps" => (for { apps <- appRegistry.getAllApps - json = apps - .map { - case (app, height) => - app.asJsonObject - .add("consensusHeight", height.asJson) - .toMap - .asJson - } - .asJson - .spaces2 + json = apps.asJson.spaces2 } yield json).value.flatMap { - case Left(e) => InternalServerError(s"Error on /apps: $e") + case Left(e) => InternalServerError(s"Error on /apps: $e") case Right(json) => Ok(json) } - // TODO: list of registered apps + + case GET -> Root / "block" / name / LongVar(height) => + appRegistry.getBlock(name, height).value.flatMap { + case Left(e) => + InternalServerError( + s"Error while getting block $height for $name: $e") + case Right(block) => Ok(block) + } } def close(): F[Unit] = signal.set(true)