/apps, /file & /block endpoints

This commit is contained in:
folex 2019-06-16 10:25:27 +02:00
parent 1055669d75
commit 795689cc92
2 changed files with 63 additions and 24 deletions

View File

@ -4,23 +4,23 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.Executors
import cats.{Monad, Traverse}
import cats.data.EitherT
import cats.effect._
import cats.syntax.functor._
import cats.syntax.applicative._
import cats.instances.list._
import cats.syntax.flatMap._
import cats.syntax.applicative._
import cats.effect.concurrent.{Deferred, Ref}
import cats.instances.list._
import cats.instances.option._
import cats.syntax.applicative._
import cats.syntax.applicativeError._
import cats.syntax.either._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.option._
import cats.{Monad, Traverse}
import com.softwaremill.sttp.{SttpBackend, Uri, sttp}
import hackhack.ipfs.{IpfsError, IpfsStore}
import io.circe.{Decoder, Encoder, Json, ObjectEncoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.parser.parse
import io.circe.{Decoder, Encoder, Json, ObjectEncoder}
import scodec.bits.ByteVector
import scala.concurrent.ExecutionContext
@ -45,11 +45,24 @@ case class App(name: String,
binaryPath: Path)
object App {
private implicit val encbc: Encoder[ByteVector] = Encoder.encodeString.contramap(_.toHex)
private implicit val encPath: Encoder[Path] = Encoder.encodeString.contramap(_.toString)
private implicit val encbc: Encoder[ByteVector] =
Encoder.encodeString.contramap(_.toHex)
private implicit val encPath: Encoder[Path] =
Encoder.encodeString.contramap(_.toString)
implicit val encodeApp: ObjectEncoder[App] = deriveEncoder
}
case class AppInfo(name: String,
network: String,
binaryHash: ByteVector,
consensusHeight: Long)
object AppInfo {
private implicit val encbc: Encoder[ByteVector] =
Encoder.encodeString.contramap(_.toHex)
implicit val encodeAppInfo: Encoder[AppInfo] = deriveEncoder
}
class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
ipfsStore: IpfsStore[F],
runner: Runner[F],
@ -150,7 +163,7 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
_ <- EitherT.liftF(deferred.complete(app))
} yield status.sync_info.latest_block_height
def getAllApps: EitherT[F, Throwable, List[(App, Long)]] = {
def getAllApps: EitherT[F, Throwable, List[AppInfo]] = {
for {
appList <- EitherT.right(
apps.get.flatMap(map =>
@ -160,10 +173,24 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO](
status(app.name, app.peer)))
} yield
appList.zip(statuses).map {
case (app, status) => (app, status.sync_info.latest_block_height)
case (app, status) =>
AppInfo(app.name,
status.node_info.network,
app.binaryHash,
status.sync_info.latest_block_height)
}
}
def getBlock(name: String, height: Long) =
for {
appOpt <- EitherT.right(apps.get.flatMap(map =>
Traverse[Option].sequence(map.get(name).map(_.get))))
app <- appOpt
.fold(new Exception(s"There is no app $name").asLeft[App])(_.asRight)
.toEitherT[F]
block <- rpc(name, app.peer, s"/block?height=$height")
} yield block.spaces2
private def log(str: String) = EitherT(IO(println(str)).attempt.to[F])
private def getApp(name: String): F[Either[Throwable, App]] =

View File

@ -1,12 +1,13 @@
package hackhack
import java.nio.file.Paths
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, Resource, _}
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import fs2.Stream
import fs2.concurrent.SignallingRef
import io.circe.JsonLong
import io.circe.syntax._
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl
@ -26,6 +27,19 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
private def routes(): HttpRoutes[F] =
HttpRoutes.of[F] {
case GET -> Root / "file" =>
val stream = FileStream
.stream[F](Paths.get("/tmp/stream.log"))
.evalTap(line => Sync[F].delay(println(s"line $line")))
.map(Log("file", _))
.unNone
.evalTap(log => Sync[F].delay(println(s"log $log")))
WebSocketBuilder[F].build(
stream.map(e => Text(e.asJson.noSpaces)),
_.evalMap(e => Sync[F].delay(println(s"from file: $e")))
)
case GET -> Root / "websocket" / appName =>
appRegistry.stream(appName).value.flatMap {
case Left(e) =>
@ -61,21 +75,19 @@ case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift](
case GET -> Root / "apps" =>
(for {
apps <- appRegistry.getAllApps
json = apps
.map {
case (app, height) =>
app.asJsonObject
.add("consensusHeight", height.asJson)
.toMap
.asJson
}
.asJson
.spaces2
json = apps.asJson.spaces2
} yield json).value.flatMap {
case Left(e) => InternalServerError(s"Error on /apps: $e")
case Left(e) => InternalServerError(s"Error on /apps: $e")
case Right(json) => Ok(json)
}
// TODO: list of registered apps
case GET -> Root / "block" / name / LongVar(height) =>
appRegistry.getBlock(name, height).value.flatMap {
case Left(e) =>
InternalServerError(
s"Error while getting block $height for $name: $e")
case Right(block) => Ok(block)
}
}
def close(): F[Unit] = signal.set(true)