Background:
I have the following flow process:
- Peer Actor on one instance generated and sends a
PeerRequest
proto (via http) to another instance - The
PeerServer
receives and process the request - It
asks
(using?
) thePeerManager
actor to process the request - The
PeerManager
asks
(?
) thePeer
Actor (via its ID, a field ofPeerRequest
) to process thePeerRequest.PeerChat
message - The
Peer
does some stuff (decryption, adding the chat to the DB, encrypting a response) and responds with aPeerChatResponse
This process looks like this:
PeerServer
handling:
else if (peerRequest.request.isPeerChat) {
// forward the PeerChat to PeerManager
val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
try {
val result = Await.result(future, timeout.duration)
resultingRoute = complete(result)
} catch {
case e: TimeoutException =>
resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
}
}
PeerManager
case for PeerChat
:
case peerChat@PeerChat(_, serverID, _, _) =>
val peerEntry = peerMap.get(serverID)
if (peerEntry.isDefined) {
try {
val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)
if (state != null && state.name == "WaitMessages") {
val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
sender() ! PeerResponse().withPeerChatResponse(peerResponse)
}
} catch {
case e: TimeoutException =>
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
Peer
case for handling (ActorRef, PeerChat)
:
case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
system.log.info("processing PeerChat")
try {
// decrypt and store the chat in the database
val decodedData = Base64.getDecoder.decode(encryptedByteString)
val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
// re-form the bytes back into a ChatRequest
val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
// verify we don't already have this chat in the database
val database = DatabaseUtil.getInstance
val chats = database.queryByFields(classOf[Chat], Map(
"serverID" -> Seq(chatRequest.serverID),
"timestamp" -> Seq(chatRequest.timestamp),
"text" -> Seq(chatRequest.chatText)
))
if (chats.isEmpty) {
// store the chat
val chat = new Chat(chatRequest)
chat.sent = true
chat.confirmed = true
database.addWithGeneratedID(chat)
}
// tell the sender it was successful
val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
val peerChatResponse = PeerChatResponse(encodedData)
system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")
sender ! peerChatResponse
} catch {
case e: Exception =>
system.log.error(e, "Failed processing PeerChat")
}
I would like to point out, I am passing the PeerManager
self
because when the Peer
receives this message, the receive
behaviour is defined in a State class and therefore does not have direct access to the normal sender()
.
The Problem:
When the Peer
receives the (ActorRef, PeerChat)
from the PeerManager
, I get the error:
Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.
I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result
seems to handle the response from the Peer
. However, for this specific process with PeerChat
, it seems the response is being sent directly to PeerManager
and not handled by the Await.result
future. The process is identical except the other process passes down a SessionRequest
message.
Is there any suggestions as to why or if I am doing this wrong?
Background:
I have the following flow process:
- Peer Actor on one instance generated and sends a
PeerRequest
proto (via http) to another instance - The
PeerServer
receives and process the request - It
asks
(using?
) thePeerManager
actor to process the request - The
PeerManager
asks
(?
) thePeer
Actor (via its ID, a field ofPeerRequest
) to process thePeerRequest.PeerChat
message - The
Peer
does some stuff (decryption, adding the chat to the DB, encrypting a response) and responds with aPeerChatResponse
This process looks like this:
PeerServer
handling:
else if (peerRequest.request.isPeerChat) {
// forward the PeerChat to PeerManager
val future: Future[PeerResponse] = (peerManager ? peerRequest.getPeerChat).mapTo[PeerResponse]
try {
val result = Await.result(future, timeout.duration)
resultingRoute = complete(result)
} catch {
case e: TimeoutException =>
resultingRoute = complete(PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse))
}
}
PeerManager
case for PeerChat
:
case peerChat@PeerChat(_, serverID, _, _) =>
val peerEntry = peerMap.get(serverID)
if (peerEntry.isDefined) {
try {
val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)
if (state != null && state.name == "WaitMessages") {
val peerResponse = Await.result((peerEntry.get ? (self, peerChat)).mapTo[PeerChatResponse], timeout.duration)
sender() ! PeerResponse().withPeerChatResponse(peerResponse)
}
} catch {
case e: TimeoutException =>
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
sender() ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
Peer
case for handling (ActorRef, PeerChat)
:
case (sender: ActorRef, peerChat@PeerChat(chatID, _, encryptedByteString, _)) =>
system.log.info("processing PeerChat")
try {
// decrypt and store the chat in the database
val decodedData = Base64.getDecoder.decode(encryptedByteString)
val unencryptedBytes = peer.getEncryptionManager.decrypt(decodedData)
// re-form the bytes back into a ChatRequest
val chatRequest: ChatRequest = ChatRequest.parseFrom(unencryptedBytes)
// verify we don't already have this chat in the database
val database = DatabaseUtil.getInstance
val chats = database.queryByFields(classOf[Chat], Map(
"serverID" -> Seq(chatRequest.serverID),
"timestamp" -> Seq(chatRequest.timestamp),
"text" -> Seq(chatRequest.chatText)
))
if (chats.isEmpty) {
// store the chat
val chat = new Chat(chatRequest)
chat.sent = true
chat.confirmed = true
database.addWithGeneratedID(chat)
}
// tell the sender it was successful
val encryptedResponse = peer.getEncryptionManager.encrypt(ByteBuffer.allocate(4).putInt(chatID).array())
val encodedData = Base64.getEncoder.encodeToString(encryptedResponse)
val peerChatResponse = PeerChatResponse(encodedData)
system.log.info(s"Replying with PeerChatResponse:${peerChatResponse.toProtoString}")
sender ! peerChatResponse
} catch {
case e: Exception =>
system.log.error(e, "Failed processing PeerChat")
}
I would like to point out, I am passing the PeerManager
self
because when the Peer
receives this message, the receive
behaviour is defined in a State class and therefore does not have direct access to the normal sender()
.
The Problem:
When the Peer
receives the (ActorRef, PeerChat)
from the PeerManager
, I get the error:
Message [peer.chat.PeerChatResponse.PeerChatResponse] to Actor[akka://system-actor/user/peer_manager#1099549428] was unhandled.
I would also add that this process works for other states, with the exact same flow. When this happens with the other messages, the Await.result
seems to handle the response from the Peer
. However, for this specific process with PeerChat
, it seems the response is being sent directly to PeerManager
and not handled by the Await.result
future. The process is identical except the other process passes down a SessionRequest
message.
Is there any suggestions as to why or if I am doing this wrong?
Share Improve this question asked Feb 17 at 0:12 Kris RiceKris Rice 1,0991 gold badge11 silver badges28 bronze badges1 Answer
Reset to default 1I have solved this issue by using the Actor forward
functionality.
The updated PeerManager
:
val peerEntry = peerMap.get(serverID)
val savedSender = sender()
if (peerEntry.isDefined) {
try {
val state = Await.result((peerEntry.get ? QueryState()).mapTo[PeerState], timeout.duration)
if (state != null && state.name == "WaitMessages") {
peerEntry.get.forward((savedSender, peerChat))
}
} catch {
case e: TimeoutException =>
savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_MANAGER_TIMEOUT.getStatusResponse)
}
} else {
savedSender ! PeerResponse().withStatusResponse(HydraStatusCodes.PEER_NOT_READY.getStatusResponse)
}
Then in the Peer
case for managing the PeerChat
, I respond with a full ApiResponse
, as the future in the server expects.
This ensures that the server Future receives the PeerResponse
by saving the original sender and reserving it down the process chain.