Publishing
Let’s publish some Blobs.
import co.upvest.google4s.gpubsub.Publisher
import co.upvest.google4s.gpubsub.Messageable
// Some instances of Messageble
import co.upvest.google4s.gpubsub.instances._
import co.upvest.google4s.core._
import akka.stream.scaladsl.{Source, Sink}
case class Blob(s: String)
// Here the decoder/encoder typeclass.
implicit val blobConverter = Messageable.of[Blob](
b => fromBytes.asMsg(b.blub.getBytes),
pm => Blob(pm.getData.toStringUtf8)
)
// Config
val c: Publisher.Config = Publisher.Config(...)
// Create a Flow
val pFlow = Publisher.flow[Blob](c)
// or a Client
val client = Publisher(c)(IdLift.liftId)
// the flow constructor finds the implicit in the context
// to convert the Blobs.
Source(Blob("a") :: Blob("b") :: Nil).via(pFlow).to(Sink.seq)
// or publish directly from client in F of your choice.
client.publish(Blob("bolb"), IdLift.liftId)
Subscribing
import co.upvest.google4s.gpubsub.Subscriber
import akka.stream.scaladsl.Sink
val config: Subscriber.Config = Subscriber.Config(...)
// Create a source
val source = Subscriber(config)
// Run the source into a flow/sink of choice.
source.runWith(Sink.foreach(println))