diff --git a/build.sbt b/build.sbt index ff7248b..78b813d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,9 +1,40 @@ +import org.apache.ivy.core.module.descriptor.License + name := "http-scala-api" version := "0.1" scalaVersion := "2.12.8" +resolvers += Resolver.sonatypeRepo("releases") +addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.10.0") + +licenses += ("Apache-2.0", new URL("https://www.apache.org/licenses/LICENSE-2.0.txt")) +resolvers += Resolver.bintrayRepo("fluencelabs", "releases") +// see good explanation https://gist.github.com/djspiewak/7a81a395c461fd3a09a6941d4cd040f2 +scalacOptions ++= Seq("-Ypartial-unification", "-deprecation") +addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.0") + +libraryDependencies ++= Seq( + fs2, + fs2rx, + fs2io, + sttp, + sttpCirce, + sttpFs2Backend, + sttpCatsBackend, + http4sDsl, + http4sServer, + http4sCirce, + circeCore, + circeGeneric, + circeGenericExtras, + circeParser, + circeFs2, + cats, + catsEffect +) + val fs2Version = "1.0.4" val fs2 = "co.fs2" %% "fs2-core" % fs2Version val fs2rx = "co.fs2" %% "fs2-reactive-streams" % fs2Version @@ -30,26 +61,3 @@ val circeFs2 = "io.circe" %% "circe-fs2" % "0.11.0" val catsVersion = "1.6.0" val cats = "org.typelevel" %% "cats-core" % catsVersion val catsEffect = "org.typelevel" %% "cats-effect" % "1.3.0" - -resolvers += Resolver.sonatypeRepo("releases") -addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.10.0") - -libraryDependencies ++= Seq( - fs2, - fs2rx, - fs2io, - sttp, - sttpCirce, - sttpFs2Backend, - sttpCatsBackend, - http4sDsl, - http4sServer, - http4sCirce, - circeCore, - circeGeneric, - circeGenericExtras, - circeParser, - circeFs2, - cats, - catsEffect -) diff --git a/src/main/scala/hackhack/Api.scala b/src/main/scala/hackhack/Api.scala deleted file mode 100644 index 0fb83dd..0000000 --- a/src/main/scala/hackhack/Api.scala +++ /dev/null @@ -1,22 +0,0 @@ -package hackhack - -import cats.effect.{ExitCode, IO, IOApp} -import cats.syntax.functor._ -import cats.syntax.flatMap._ - -/** - * 1. Read docker logs, parse, push to fs2.Stream - * 2. Serve events from that stream to a websocket - */ -object Api extends IOApp { - override def run(args: List[String]): IO[ExitCode] = { - WebsocketServer - .make[IO](8080) - .use { - case (_, f) => - IO(println("Started websocket server")) >> - f.join as ExitCode.Success - } - } - -} diff --git a/src/main/scala/hackhack/AppRegistry.scala b/src/main/scala/hackhack/AppRegistry.scala index 833faed..13d37b1 100644 --- a/src/main/scala/hackhack/AppRegistry.scala +++ b/src/main/scala/hackhack/AppRegistry.scala @@ -7,7 +7,10 @@ import java.util.concurrent.Executors import cats.Monad import cats.data.EitherT import cats.effect._ -import cats.effect.concurrent.Ref +import cats.syntax.functor._ +import cats.syntax.flatMap._ +import cats.syntax.applicative._ +import cats.effect.concurrent.{Deferred, Ref} import cats.syntax.applicativeError._ import cats.syntax.either._ import cats.syntax.option._ @@ -22,88 +25,137 @@ import scala.language.higherKinds case class Peer( host: String, - p2pPort: Short, rpcPort: Short, - p2pKey: String ) { - val RpcUri = Uri(s"http://$host:$rpcPort") - val P2pSeed = s"$p2pKey@$host:$p2pPort" + val RpcUri = Uri(host, rpcPort) } case class App(name: String, containerId: String, - rpcPort: Short, - seed: String, + peer: Peer, binaryHash: ByteVector, binaryPath: Path) class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( ipfsStore: IpfsStore[F], runner: Runner[F], - apps: Ref[F, Map[String, App]], + apps: Ref[F, Map[String, Deferred[F, App]]], blockingCtx: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()))( implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?], fs2.Stream[F, ByteBuffer]]) { - def log(str: String) = EitherT(IO(println(str)).attempt.to[F]) + def stream(name: String): EitherT[F, Throwable, fs2.Stream[F, Log]] = + for { + app <- EitherT(getApp(name)) + stream <- runner.streamLog(app.containerId) + } yield + stream + .evalTap(line => Sync[F].delay(println(s"line $name $line"))) + .map(Log(name, _)) + .unNone + .evalTap(log => Sync[F].delay(println(s"log $name $log"))) // Returns consensusHeight - def run(name: String, - peer: Peer, - hash: ByteVector): EitherT[F, Throwable, Long] = + def run(name: String, peer: Peer, hash: String): EitherT[F, Throwable, Long] = for { + deferred <- EitherT.liftF(Deferred[F, App]) + _ <- EitherT( + apps.modify( + map => + map + .get(name) + .fold(map.updated(name, deferred) -> ().asRight[Throwable])(_ => + map -> + new Exception(s"app $name was already started").asLeft[Unit])) + ) + genesis <- dumpGenesis(name, peer) _ <- log(s"$name dumped genesis") + baseDir <- EitherT( IO( Paths .get(System.getProperty("user.home"), s".salmon/$name") .toAbsolutePath).attempt.to[F]) - _ <- EitherT(IO(baseDir.toFile.mkdir()).attempt.to[F]) + _ <- EitherT(IO(Files.createDirectories(baseDir)).attempt.to[F]) path = baseDir.resolve("genesis.json") _ <- EitherT(IO(Files.write(path, genesis.getBytes())).attempt.to[F]) - _ <- log(s"$name saved genesis") - binaryPath = baseDir.resolve("binary") - _ <- fetchTo(hash, binaryPath).leftMap(identity[Throwable]) - _ <- log(s"$name binary downloaded $binaryPath") - height <- consensusHeight(name, peer) - } yield height + _ <- log(s"$name saved genesis -> $path") - def consensusHeight(appName: String, - peer: Peer): EitherT[F, Throwable, Long] = { + binaryHash <- EitherT.fromEither( + ByteVector + .fromBase58Descriptive(hash).map(_.drop(2)) + .leftMap(e => + new Exception(s"Failed to decode binary hash from base64: $e"): Throwable)) + + binaryPath = baseDir.resolve("binary") + _ <- fetchTo(binaryHash, binaryPath).leftMap(identity[Throwable]) + _ <- log(s"$name binary downloaded $binaryPath") + + status <- status(name, peer) + _ <- log(s"$name got peer status") + + containerId <- runner.run(name, p2pPeer(status), binaryPath) + _ <- log(s"$name container started $containerId") + + app = App(name, containerId, peer, binaryHash, binaryPath) + _ <- EitherT.liftF(deferred.complete(app)) + } yield status.sync_info.latest_block_height + + private def log(str: String) = EitherT(IO(println(str)).attempt.to[F]) + + private def getApp(name: String): F[Either[Throwable, App]] = + for { + map <- apps.get + appOpt = map.get(name) + app <- appOpt.fold( + new Exception(s"There is no app $name").asLeft[App].pure[F])( + _.get.map(_.asRight)) + } yield app + + private def status(appName: String, + peer: Peer): EitherT[F, Throwable, TendermintStatus] = { rpc(appName, peer, "/status").subflatMap( _.hcursor .downField("result") - .downField("sync_info") - .get[Long]("latest_block_height") + .as[TendermintStatus] ) } - def dumpGenesis(appName: String, - peer: Peer): EitherT[F, Throwable, String] = { - rpc(appName, peer, "/genesis").subflatMap( - _.hcursor.downField("result").get[String]("genesis")) + private def p2pPeer(status: TendermintStatus) = { + val id = status.node_info.id + val endpoint = status.node_info.listen_addr.replace("tcp://", "") + s"$id@$endpoint" } - def rpc(appName: String, - peer: Peer, - path: String): EitherT[F, Throwable, Json] = Backoff.default.retry( - sttp - .get(peer.RpcUri.path(path)) - .send[EitherT[F, Throwable, ?]] - .subflatMap( - _.body - .leftMap(e => - new Exception(s"Error RPC $path $appName: ${peer.RpcUri}: $e")) - .flatMap(s => parse(s)) - ), - (e: Throwable) => - Monad[F].pure(println(s"Error RPC $path $appName: ${peer.RpcUri}: $e")), - max = 10 - ) + private def dumpGenesis(appName: String, + peer: Peer): EitherT[F, Throwable, String] = { + rpc(appName, peer, "/genesis").subflatMap { json => + json.hcursor.downField("result").get[Json]("genesis").map(_.spaces2) + } + } - def fetchTo(hash: ByteVector, dest: Path): EitherT[F, IpfsError, Unit] = { + private def rpc(appName: String, + peer: Peer, + path: String): EitherT[F, Throwable, Json] = + Backoff.default.retry( + sttp + .get(peer.RpcUri.path(path)) + .send[EitherT[F, Throwable, ?]] + .subflatMap( + _.body + .leftMap(e => + new Exception(s"Error RPC $path $appName: ${peer.RpcUri}: $e")) + .flatMap(s => parse(s)) + ), + (e: Throwable) => + Monad[F].pure(println(s"Error RPC $path $appName: ${peer.RpcUri}: $e")), + max = 10 + ) + + private def fetchTo(hash: ByteVector, + dest: Path): EitherT[F, IpfsError, Unit] = { ipfsStore .fetch(hash) .flatMap( @@ -116,3 +168,15 @@ class AppRegistry[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( ) } } + +object AppRegistry { + def make[F[_]: Monad: Concurrent: ContextShift: Timer: LiftIO]( + ipfsStore: IpfsStore[F], + runner: Runner[F])( + implicit sttpBackend: SttpBackend[EitherT[F, Throwable, ?], + fs2.Stream[F, ByteBuffer]]) + : F[AppRegistry[F]] = + for { + ref <- Ref.of[F, Map[String, Deferred[F, App]]](Map.empty) + } yield new AppRegistry[F](ipfsStore, runner, ref) +} diff --git a/src/main/scala/hackhack/Main.scala b/src/main/scala/hackhack/Main.scala new file mode 100644 index 0000000..e11e448 --- /dev/null +++ b/src/main/scala/hackhack/Main.scala @@ -0,0 +1,46 @@ +package hackhack + +import java.nio.ByteBuffer +import java.nio.file.Files + +import cats.data.EitherT +import cats.effect.{ExitCode, IO, IOApp, Resource} +import cats.syntax.functor._ +import cats.syntax.either._ +import cats.syntax.flatMap._ +import com.softwaremill.sttp.{SttpBackend, Uri} +import hackhack.ipfs.IpfsStore +import hackhack.utils.EitherTSttpBackend + +/** + * 1. Read docker logs, parse, push to fs2.Stream + * 2. Serve events from that stream to a websocket + */ +object Main extends IOApp { + type STTP = SttpBackend[EitherT[IO, Throwable, ?], fs2.Stream[IO, ByteBuffer]] + private val sttpResource: Resource[ + IO, + SttpBackend[EitherT[IO, Throwable, ?], fs2.Stream[IO, ByteBuffer]]] = + Resource.make(IO(EitherTSttpBackend[IO]()))(sttpBackend ⇒ + IO(sttpBackend.close())) + + override def run(args: List[String]): IO[ExitCode] = { + + val ipfsUri = Uri("ipfs.fluence.one", 5001) + + sttpResource.use { implicit sttp => + val ipfsStore = IpfsStore[IO](ipfsUri) + for { + runner <- Runner.make[IO] + appRegistry <- AppRegistry.make[IO](ipfsStore, runner) + _ <- WebsocketServer + .make[IO](8080, appRegistry) + .use { + case (_, f) => + IO(println("Started websocket server")) >> f.join + } + } yield ExitCode.Success + } + } + +} diff --git a/src/main/scala/hackhack/Runner.scala b/src/main/scala/hackhack/Runner.scala index e26ef25..12184bb 100644 --- a/src/main/scala/hackhack/Runner.scala +++ b/src/main/scala/hackhack/Runner.scala @@ -10,7 +10,7 @@ import cats.syntax.applicativeError._ import cats.syntax.either._ import cats.syntax.flatMap._ import cats.syntax.functor._ -import cats.{Applicative, Defer, Monad} +import cats.{Defer, Monad} import hackhack.docker.params.{DockerImage, DockerParams} import io.circe.Json import io.circe.parser.parse @@ -66,28 +66,46 @@ case class Runner[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]( def run(name: String, peer: String, - binaryPath: Path): EitherT[F, Throwable, fs2.Stream[F, String]] = { - val container = for { + binaryPath: Path): EitherT[F, Throwable, String] = { + val containerId = for { cmd <- dockerCmd(name, peer, binaryPath) _ <- log(s"$name got dockerCmd") - idPromise <- Deferred[F, String] + idPromise <- Deferred[F, Either[Throwable, String]] _ <- Concurrent[F].start( IO(cmd.!!).attempt .onError { case e => IO(println(s"Failed to run container $name: $e")) } .to[F] - .flatMap(_.fold(_ => Applicative[F].unit, idPromise.complete))) + .flatMap( + _.fold( + e => + idPromise.complete( + new Exception(s"$name failed to start docker container: $e", + e).asLeft), + id => idPromise.complete(id.asRight) + ) + ) + ) _ <- log(s"$name signaled docker container to start") containerId <- idPromise.get - _ <- log(s"$name docker container started $containerId") + _ <- containerId.fold( + e => log(s"$name failed to start docker container: $e"), + id => log(s"$name docker container started $id")) } yield containerId + EitherT(containerId) + } + + def streamLog( + containerId: String): EitherT[F, Throwable, fs2.Stream[F, String]] = for { - containerId <- EitherT.liftF(container) logPath <- getLogPath(containerId) stream = FileStream.stream[F](logPath) } yield stream - } - +} + +object Runner { + def make[F[_]: Monad: LiftIO: ContextShift: Defer: Concurrent]: F[Runner[F]] = + Ref.of[F, Short](0).map(new Runner(_)) } diff --git a/src/main/scala/hackhack/TendermintStatus.scala b/src/main/scala/hackhack/TendermintStatus.scala new file mode 100644 index 0000000..a148d18 --- /dev/null +++ b/src/main/scala/hackhack/TendermintStatus.scala @@ -0,0 +1,59 @@ +package hackhack + +import hackhack.TendermintStatus.{NodeInfo, SyncInfo, ValidatorInfo} +import io.circe.generic.extras.Configuration +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} +import io.circe.{Decoder, Encoder} + +case class TendermintStatus(node_info: NodeInfo, + sync_info: SyncInfo, + validator_info: ValidatorInfo) + +object TendermintStatus { + case class ProtocolVersion(p2p: String, block: String, app: String) + case class OtherInfo(tx_index: String, rpc_address: String) + + case class NodeInfo( + id: String, + listen_addr: String, + network: String, + version: String, + channels: String, + moniker: String, + other: OtherInfo, + protocol_version: ProtocolVersion + ) + + case class SyncInfo( + latest_block_hash: String, + latest_app_hash: String, + latest_block_height: Long, + latest_block_time: String, + catching_up: Boolean + ) + + case class PubKey(`type`: String, value: String) + + case class ValidatorInfo(address: String, + pub_key: PubKey, + voting_power: String) + + private implicit val configuration: Configuration = + Configuration.default.withSnakeCaseMemberNames.withSnakeCaseConstructorNames + + implicit val decodeProtocolVersion: Decoder[ProtocolVersion] = deriveDecoder + implicit val decodeOtherInfo: Decoder[OtherInfo] = deriveDecoder + implicit val decodeNodeInfo: Decoder[NodeInfo] = deriveDecoder + implicit val decodeSyncInfo: Decoder[SyncInfo] = deriveDecoder + implicit val decodePubKey: Decoder[PubKey] = deriveDecoder + implicit val decodeValidatorInfo: Decoder[ValidatorInfo] = deriveDecoder + implicit val decodeCheck: Decoder[TendermintStatus] = deriveDecoder + + implicit val encodeProtocolVersion: Encoder[ProtocolVersion] = deriveEncoder + implicit val encodeOtherInfo: Encoder[OtherInfo] = deriveEncoder + implicit val encodeNodeInfo: Encoder[NodeInfo] = deriveEncoder + implicit val encodeSyncInfo: Encoder[SyncInfo] = deriveEncoder + implicit val encodePubKey: Encoder[PubKey] = deriveEncoder + implicit val encodeValidatorInfo: Encoder[ValidatorInfo] = deriveEncoder + implicit val encodeCheck: Encoder[TendermintStatus] = deriveEncoder +} diff --git a/src/main/scala/hackhack/WebsocketServer.scala b/src/main/scala/hackhack/WebsocketServer.scala index c48b2bc..1161bbe 100644 --- a/src/main/scala/hackhack/WebsocketServer.scala +++ b/src/main/scala/hackhack/WebsocketServer.scala @@ -8,6 +8,7 @@ import cats.syntax.applicativeError._ import cats.syntax.flatMap._ import fs2.Stream import fs2.concurrent.SignallingRef +import hackhack.ipfs.{IpfsClient, IpfsStore} import io.circe.syntax._ import org.http4s.HttpRoutes import org.http4s.dsl.Http4sDsl @@ -22,53 +23,41 @@ import scala.concurrent.duration._ import scala.language.higherKinds case class WebsocketServer[F[_]: ConcurrentEffect: Timer: ContextShift]( - streams: Ref[F, Map[String, fs2.Stream[F, Log]]], + appRegistry: AppRegistry[F], signal: SignallingRef[F, Boolean] ) extends Http4sDsl[F] { private def routes(): HttpRoutes[F] = HttpRoutes.of[F] { - case GET -> Root / "websocket" / "start" / "file" / key => - streams.get.flatMap { map => - map.get(key) match { - case None => - BadRequest(s"There's no stream for $key, create it first") - case Some(stream) => - WebSocketBuilder[F].build( - stream.map(e => Text(e.asJson.noSpaces)), - _.evalMap(e => Sync[F].delay(println(s"from $key: $e"))) - ) - } + case GET -> Root / "websocket" / appName => + appRegistry.stream(appName).value.flatMap { + case Left(e) => + InternalServerError(s"Error while getting stream for $appName: $e") + + case Right(stream) => + WebSocketBuilder[F].build( + stream.map(e => Text(e.asJson.noSpaces)), + _.evalMap(e => Sync[F].delay(println(s"from $appName: $e"))) + ) } - case (GET | POST) -> Root / "create" / appName => - val stream = - fs2.Stream -// .eval(Sync[F].delay(Files.createTempFile("websocket", appName))) - .eval(Sync[F].delay(Paths.get("/tmp/stage-04-ns.log"))) - .evalTap(path => Sync[F].delay(println(s"created $path"))) - .flatMap(FileStream.stream[F]) - .map(parse(_).toOption) - .unNone - .map(_.hcursor.get[String]("log").toOption) - .unNone - .filter(_.nonEmpty) - .evalTap(line => Sync[F].delay(println(s"line $appName $line"))) - .map(Log(appName, _)) - .unNone - .evalTap(log => Sync[F].delay(println(s"log $appName $log"))) + case (GET | POST) -> Root / "create" / appName / seedHost / LongVar( + seedPort) / hash => + appRegistry + .run(appName, Peer(seedHost, seedPort.toShort), hash) + .value + .flatMap { + case Left(e) => + InternalServerError(s"Error while running app $appName: $e") - Sync[F].delay(println(s"Creating stream for $appName")) >> - streams.update(map => - map.get(appName).fold(map.updated(appName, stream)) { _ => - println(s"Stream for $appName already exists") - map - }) >> Ok(""" - |{ - | "consensusHeight": 150 - |} - """.stripMargin) + case Right(height) => + Ok(s""" + |{ + | "consensusHeight": $height + |} + """.stripMargin) + } // TODO: list of registered apps // TODO: endpoint for consensusHeight } @@ -97,14 +86,13 @@ object WebsocketServer { import cats.syntax.flatMap._ import cats.syntax.functor._ - def make[F[_]: Timer: ContextShift](port: Int)( + def make[F[_]: Timer: ContextShift](port: Int, appRegistry: AppRegistry[F])( implicit F: ConcurrentEffect[F]) : Resource[F, (WebsocketServer[F], Fiber[F, Unit])] = Resource.make( for { - streams <- Ref.of[F, Map[String, fs2.Stream[F, Log]]](Map.empty) signal <- SignallingRef[F, Boolean](false) - server = WebsocketServer(streams, signal) + server = WebsocketServer(appRegistry, signal) fiber <- Concurrent[F].start(Backoff.default { server .start(port) diff --git a/src/main/scala/hackhack/ipfs/IpfsStore.scala b/src/main/scala/hackhack/ipfs/IpfsStore.scala index 37c6c29..051176c 100644 --- a/src/main/scala/hackhack/ipfs/IpfsStore.scala +++ b/src/main/scala/hackhack/ipfs/IpfsStore.scala @@ -33,7 +33,7 @@ import scala.language.higherKinds class IpfsStore[F[_]: Functor](client: IpfsClient[F]) { def fetch( hash: ByteVector): EitherT[F, IpfsError, fs2.Stream[F, ByteBuffer]] = - client.download(hash).leftMap(e => IpfsError("fetch", Some(e))) + client.download(hash).leftMap(e => IpfsError(s"Error on fetch: $e", Some(e))) } object IpfsStore { diff --git a/src/main/scala/hackhack/utils/EitherTSttpBackend.scala b/src/main/scala/hackhack/utils/EitherTSttpBackend.scala new file mode 100644 index 0000000..7cec30c --- /dev/null +++ b/src/main/scala/hackhack/utils/EitherTSttpBackend.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2018 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package hackhack.utils + +import java.nio.ByteBuffer + +import cats.arrow.FunctionK +import cats.{~>, Monad} +import cats.data.EitherT +import cats.effect.ConcurrentEffect +import cats.implicits._ +import com.softwaremill.sttp.{MonadError => ME, _} + +import scala.language.{higherKinds, implicitConversions} +import com.softwaremill.sttp.SttpBackend +import com.softwaremill.sttp.asynchttpclient.fs2.AsyncHttpClientFs2Backend +import com.softwaremill.sttp.impl.cats.implicits._ + + +/** + * Async sttp backend that will return EitherT. + */ +object EitherTSttpBackend { + + def apply[F[_]: ConcurrentEffect](): SttpBackend[EitherT[F, Throwable, ?], fs2.Stream[F, ByteBuffer]] = { + val sttp: SttpBackend[F, fs2.Stream[F, ByteBuffer]] = AsyncHttpClientFs2Backend[F]() + + val eitherTArrow: F ~> EitherT[F, Throwable, ?] = new FunctionK[F, EitherT[F, Throwable, ?]] { + override def apply[A](fa: F[A]): EitherT[F, Throwable, A] = { + EitherT(fa.attempt) + } + } + + val eitherTSttp: SttpBackend[EitherT[F, Throwable, ?], fs2.Stream[F, ByteBuffer]] = + sttp.mapK(eitherTArrow) + + eitherTSttp + } +} + +/** + * sttp MonadError for EitherT + */ +class EitherTMonad[F[_]](implicit F: Monad[F]) extends ME[EitherT[F, Throwable, ?]] { + type R[T] = EitherT[F, Throwable, T] + + override def unit[T](t: T): R[T] = + EitherT.right[Throwable](F.pure(t)) + + override def map[T, T2](fa: R[T])(f: T => T2): R[T2] = + fa.map(f) + + override def flatMap[T, T2](fa: R[T])(f: T => R[T2]): R[T2] = + fa.flatMap(f) + + override def error[T](t: Throwable): R[T] = + EitherT.left[T](F.pure(t)) + + override protected def handleWrappedError[T](rt: R[T])(h: PartialFunction[Throwable, R[T]]): R[T] = + rt.handleErrorWith(h) +}