I am able to connect GRPC server using scala and fetchResponse by calling the subscribe api using grpc StreamObserver. With the same configuration I tried using ZIO framework I am not reciving the response.
I have below configuration host: api.salesforce.pubsub port : 7443 tenantId : "xxx" accessToken : "yyy" instanceUrl : ";
I tried with this configuration to build ManagedChannel and call subscribe Api by passing FetchRequest(topicName) i am successfully getting the FetchResponse using StreamObserver.
With the same configuration I am not able to receive the FetchResponse and not even getting an error. Below is the ZIO-GRPC implementation.
object Main extends ZIOAppDefault{
def managedChannel():ZManagedChannel={
val managedChannelBuilder: ManagedChannelBuilder[_] = ManagedChannelBuilder.forAddress(host,port)
val interceptor = Seq(ZClientInterceptor.headersReplacer{
(_,_)=> SafeMetadata.make(
("tenantid",tenantId),
("accesstoken",accessToken),
("instanceurl",URI(instanceUrl).resolve("/").toString))
})
ZManagedChannel(managedChannelBuilder, interceptor)
}
def requestBody() ={
val fetchRequest: zio.stream.Stream[StatusException,FetchRequest]=ZStream.succeed(FetchRequest(topicName= "topic", replayPresent = EARLIEST))
fetchRequest
}
def subcribeAndFetch():ZStream[PubSubClient, StatusException,Option[FetchResponse]]=
for{
_ <- ZStream.logInfo(s"fetching..")
response <- PubSubClient.subscribe(requestBody()).map(response => Some(response))
.catchAll(ex => ZStream.logError(s"the subscribe request failed: ${ex.getStatus}").as(None))
} yield response
override def run ={
subcribeAndFetch
.runDrain
.repeat(Schedule.spaced(2.second))
.provideLayer(PubSubClient.live(managedChannel()))
}
}
Thanks in advance.