最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

scala - Why is my actor ask getting dead letters? - Stack Overflow

programmeradmin4浏览0评论

Background:

I have the following flow process:

  • Peer Actor on one instance generated and sends a PeerRequest proto (via http) to another instance
  • The PeerServer receives and process the request
  • It asks (using ?) the PeerManager actor to process the request
  • The PeerManager asks (?) the Peer Actor (via its ID, a field of PeerRequest) to process the PeerRequest.PeerChat message
  • The Peer does some stuff (decryption, adding the chat to the DB, encrypting a response) and responds with a PeerChatResponse

This process looks like this:

PeerServer handling:

      else if (peerRequest.request.isPeerChat) {
        // forward the PeerChat to PeerManager
        val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
        
        try {
          val result = Await.result(future, timeout.duration)
          resultingRoute = complete(result)
        } catch {
          case e: TimeoutException =>
            resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
        }
      }

PeerManager case for PeerChat:

    case peerChat@PeerChat(_, serverID, _, _) =>
      val peerEntry = peerMap.get(serverID)

      if (peerEntry.isDefined) {
        try {
          val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)

          if (state != null && state.name == "WaitMessages") {
            val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
            sender() ! PeerResponse().withPeerChatResponse(peerResponse)
          }
        } catch {
          case e: TimeoutException =>
            sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
        }
      } else {
        sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

Peer case for handling (ActorRef, PeerChat):

    case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
      system.log.info("processing PeerChat")
      try {
        // decrypt and store the chat in the database
        val decodedData = Base64.getDecoder.decode(encryptedByteString)
        val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
        // re-form the bytes back into a ChatRequest
        val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
        // verify we don't already have this chat in the database

        val database = DatabaseUtil.getInstance
        val chats = database.queryByFields(classOf[Chat], Map(
          "serverID" -> Seq(chatRequest.serverID),
          "timestamp" -> Seq(chatRequest.timestamp),
          "text" -> Seq(chatRequest.chatText)
        ))

        if (chats.isEmpty) {
          // store the chat
          val chat = new Chat(chatRequest)
          chat.sent = true
          chat.confirmed = true
          database.addWithGeneratedID(chat)
        }
        // tell the sender it was successful
        val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
        val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
        val peerChatResponse = PeerChatResponse(encodedData)

        system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")

        sender ! peerChatResponse
      } catch {
        case e: Exception =>
          system.log.error(e, "Failed processing PeerChat")
      }

I would like to point out, I am passing the PeerManager self because when the Peer receives this message, the receive behaviour is defined in a State class and therefore does not have direct access to the normal sender().

The Problem:

When the Peer receives the (ActorRef, PeerChat) from the PeerManager, I get the error:

Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.

I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result seems to handle the response from the Peer. However, for this specific process with PeerChat, it seems the response is being sent directly to PeerManager and not handled by the Await.result future. The process is identical except the other process passes down a SessionRequest message.

Is there any suggestions as to why or if I am doing this wrong?

Background:

I have the following flow process:

  • Peer Actor on one instance generated and sends a PeerRequest proto (via http) to another instance
  • The PeerServer receives and process the request
  • It asks (using ?) the PeerManager actor to process the request
  • The PeerManager asks (?) the Peer Actor (via its ID, a field of PeerRequest) to process the PeerRequest.PeerChat message
  • The Peer does some stuff (decryption, adding the chat to the DB, encrypting a response) and responds with a PeerChatResponse

This process looks like this:

PeerServer handling:

      else if (peerRequest.request.isPeerChat) {
        // forward the PeerChat to PeerManager
        val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
        
        try {
          val result = Await.result(future, timeout.duration)
          resultingRoute = complete(result)
        } catch {
          case e: TimeoutException =>
            resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
        }
      }

PeerManager case for PeerChat:

    case peerChat@PeerChat(_, serverID, _, _) =>
      val peerEntry = peerMap.get(serverID)

      if (peerEntry.isDefined) {
        try {
          val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)

          if (state != null && state.name == "WaitMessages") {
            val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
            sender() ! PeerResponse().withPeerChatResponse(peerResponse)
          }
        } catch {
          case e: TimeoutException =>
            sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
        }
      } else {
        sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

Peer case for handling (ActorRef, PeerChat):

    case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
      system.log.info("processing PeerChat")
      try {
        // decrypt and store the chat in the database
        val decodedData = Base64.getDecoder.decode(encryptedByteString)
        val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
        // re-form the bytes back into a ChatRequest
        val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
        // verify we don't already have this chat in the database

        val database = DatabaseUtil.getInstance
        val chats = database.queryByFields(classOf[Chat], Map(
          "serverID" -> Seq(chatRequest.serverID),
          "timestamp" -> Seq(chatRequest.timestamp),
          "text" -> Seq(chatRequest.chatText)
        ))

        if (chats.isEmpty) {
          // store the chat
          val chat = new Chat(chatRequest)
          chat.sent = true
          chat.confirmed = true
          database.addWithGeneratedID(chat)
        }
        // tell the sender it was successful
        val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
        val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
        val peerChatResponse = PeerChatResponse(encodedData)

        system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")

        sender ! peerChatResponse
      } catch {
        case e: Exception =>
          system.log.error(e, "Failed processing PeerChat")
      }

I would like to point out, I am passing the PeerManager self because when the Peer receives this message, the receive behaviour is defined in a State class and therefore does not have direct access to the normal sender().

The Problem:

When the Peer receives the (ActorRef, PeerChat) from the PeerManager, I get the error:

Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.

I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result seems to handle the response from the Peer. However, for this specific process with PeerChat, it seems the response is being sent directly to PeerManager and not handled by the Await.result future. The process is identical except the other process passes down a SessionRequest message.

Is there any suggestions as to why or if I am doing this wrong?

Share Improve this question asked Feb 17 at 0:12 Kris RiceKris Rice 1,0991 gold badge11 silver badges28 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

I have solved this issue by using the Actor forward functionality.

The updated PeerManager:

      val peerEntry = peerMap.get(serverID)
      val savedSender = sender()

      if (peerEntry.isDefined) {
        try {
          val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)

          if (state != null && state.name == "WaitMessages") {
            peerEntry.get.forward((savedSender, peerChat))
          }
        } catch {
          case e: TimeoutException =>
            savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
        }
      } else {
        savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
      }

Then in the Peer case for managing the PeerChat, I respond with a full ApiResponse, as the future in the server expects.

This ensures that the server Future receives the PeerResponse by saving the original sender and reserving it down the process chain.

发布评论

评论列表(0)

  1. 暂无评论