I am trying to simplify the handling of messages to actors in my application. It has become quite large and messy, so I'm refactoring how I pass around actor references.
The plan is to:
- Have a master actor that you "register" a spawned actor with
- Pass commands to this master actor to pass onto the appropriate registered actor
This would mean I would only need to pass around a single actor reference in my application.
So far, I have this (it is unfinished):
package core
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill}
import akka.pattern.ask
import akka.util.Timeout
import core.ActorManager.{AskTimeout, Forward, RegisterActor}
import scala.concurrent.Await
object ActorManager {
case object Command
case class Forward(actorName: String, command: Command)
case class AskTimeout(actorName: String, command: Command, timeout: Timeout, classRef: Class[?], blocking: Boolean = false)
case class RegisterActor(actorRef: ActorRef, name: String)
}
class ActorManager extends Actor {
private val system: ActorSystem = context.system
private var classicActors: Map[String, ActorRef] = Map.empty
override def receive: Receive = {
case RegisterActor(actorRef, name) =>
if (classicActors.contains(name)) {
system.log.warning(s"ActorSystem with name $name already registered. It will be overwritten.")
}
classicActors += name -> actorRef
case ft@Forward(name, command) =>
val classicActor = classicActors.get(name)
if (classicActor.isDefined) {
classicActor.get.forward(command)
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${ft.toString}] was not processed.")
}
case at@AskTimeout(name, command, timeout, classRef, blocking) =>
val classicActor = classicActors.get(name)
implicit val actorTimeout: Timeout = timeout
if (classicActor.isDefined) {
if (blocking) {
val result = Await.result((classicActor.get ? command).mapTo[classRef], timeout.duration)
} else {
}
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
}
case PoisonPill =>
// shutdown all actors
}
}
I seem to have a problem with the classRef
field. The ask pattern needs to know what type of class to map the response into, which should be configurable at the point of creating the message to pass to this actor.
I tried:
case class AskTimeout[T](actorName: String, command: Command, timeout: Timeout, blocking: Boolean = false)
...
override def receive: Receive = {
case at@AskTimeout[T](name, command, timeout, blocking) =>
val classicActor = classicActors.get(name)
implicit val actorTimeout: Timeout = timeout
if (classicActor.isDefined) {
if (blocking) {
val result = Await.result((classicActor.get ? command).mapTo[T], timeout.duration)
} else {
}
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
}
}
}
But it seems I cannot pass the class reference this way either.
Is my approach completely wrong here or is there a correct way of achieving this?
I am trying to simplify the handling of messages to actors in my application. It has become quite large and messy, so I'm refactoring how I pass around actor references.
The plan is to:
- Have a master actor that you "register" a spawned actor with
- Pass commands to this master actor to pass onto the appropriate registered actor
This would mean I would only need to pass around a single actor reference in my application.
So far, I have this (it is unfinished):
package core
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill}
import akka.pattern.ask
import akka.util.Timeout
import core.ActorManager.{AskTimeout, Forward, RegisterActor}
import scala.concurrent.Await
object ActorManager {
case object Command
case class Forward(actorName: String, command: Command)
case class AskTimeout(actorName: String, command: Command, timeout: Timeout, classRef: Class[?], blocking: Boolean = false)
case class RegisterActor(actorRef: ActorRef, name: String)
}
class ActorManager extends Actor {
private val system: ActorSystem = context.system
private var classicActors: Map[String, ActorRef] = Map.empty
override def receive: Receive = {
case RegisterActor(actorRef, name) =>
if (classicActors.contains(name)) {
system.log.warning(s"ActorSystem with name $name already registered. It will be overwritten.")
}
classicActors += name -> actorRef
case ft@Forward(name, command) =>
val classicActor = classicActors.get(name)
if (classicActor.isDefined) {
classicActor.get.forward(command)
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${ft.toString}] was not processed.")
}
case at@AskTimeout(name, command, timeout, classRef, blocking) =>
val classicActor = classicActors.get(name)
implicit val actorTimeout: Timeout = timeout
if (classicActor.isDefined) {
if (blocking) {
val result = Await.result((classicActor.get ? command).mapTo[classRef], timeout.duration)
} else {
}
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
}
case PoisonPill =>
// shutdown all actors
}
}
I seem to have a problem with the classRef
field. The ask pattern needs to know what type of class to map the response into, which should be configurable at the point of creating the message to pass to this actor.
I tried:
case class AskTimeout[T](actorName: String, command: Command, timeout: Timeout, blocking: Boolean = false)
...
override def receive: Receive = {
case at@AskTimeout[T](name, command, timeout, blocking) =>
val classicActor = classicActors.get(name)
implicit val actorTimeout: Timeout = timeout
if (classicActor.isDefined) {
if (blocking) {
val result = Await.result((classicActor.get ? command).mapTo[T], timeout.duration)
} else {
}
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
}
}
}
But it seems I cannot pass the class reference this way either.
Is my approach completely wrong here or is there a correct way of achieving this?
Share Improve this question asked Mar 20 at 11:30 Kris RiceKris Rice 8791 gold badge11 silver badges29 bronze badges1 Answer
Reset to default 0I have found a solution.
One of the issues was the use of Class[?]
which I have replaced with Class[_]
Now, when the actor receives a AskTimeout
, it explicitly casts the result to the provided class type:
case at@AskTimeout(name, command, timeout, classRef, blocking) =>
val classicActor = classicActors.get(name)
implicit val actorTimeout: Timeout = timeout
if (classicActor.isDefined) {
val future = classicActor.get ? command
if (blocking) {
val result = Await.result(future, timeout.duration)
if (!classRef.isInstance(result)) {
throw new ClassCastException(s"Response is not of type ${classRef.getName}")
}
val casted = classRef.cast(result)
sender() ! casted
} else {
future.foreach { result =>
if (classRef.isInstance(result)) {
val casted = classRef.cast(result)
sender() ! casted
} else {
system.log.error(s"Received response not matching type ${classRef.getName}")
}
}(context.dispatcher)
}
} else {
system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
}