最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

scala - Pass a wildcard class as part of actor message with Akka actors? - Stack Overflow

programmeradmin5浏览0评论

I am trying to simplify the handling of messages to actors in my application. It has become quite large and messy, so I'm refactoring how I pass around actor references.

The plan is to:

  • Have a master actor that you "register" a spawned actor with
  • Pass commands to this master actor to pass onto the appropriate registered actor

This would mean I would only need to pass around a single actor reference in my application.

So far, I have this (it is unfinished):

package core

import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill}
import akka.pattern.ask
import akka.util.Timeout
import core.ActorManager.{AskTimeout, Forward, RegisterActor}

import scala.concurrent.Await

object ActorManager {
  case object Command
  case class Forward(actorName: String, command: Command)

  case class AskTimeout(actorName: String, command: Command, timeout: Timeout, classRef: Class[?], blocking: Boolean = false)

  case class RegisterActor(actorRef: ActorRef, name: String)
}

class ActorManager extends Actor {

  private val system: ActorSystem = context.system

  private var classicActors: Map[String, ActorRef] = Map.empty

  override def receive: Receive = {
    case RegisterActor(actorRef, name) =>
      if (classicActors.contains(name)) {
        system.log.warning(s"ActorSystem with name $name already registered. It will be overwritten.")
      }
      classicActors += name -> actorRef

    case ft@Forward(name, command) =>
      val classicActor = classicActors.get(name)
      if (classicActor.isDefined) {
        classicActor.get.forward(command)
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${ft.toString}] was not processed.")
      }

    case at@AskTimeout(name, command, timeout, classRef, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[classRef], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

    case PoisonPill =>
      // shutdown all actors

  }

}

I seem to have a problem with the classRef field. The ask pattern needs to know what type of class to map the response into, which should be configurable at the point of creating the message to pass to this actor.

I tried:

  case class AskTimeout[T](actorName: String, command: Command, timeout: Timeout, blocking: Boolean = false)

...

  override def receive: Receive = {
    case at@AskTimeout[T](name, command, timeout, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[T], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

  }

}

But it seems I cannot pass the class reference this way either.

Is my approach completely wrong here or is there a correct way of achieving this?

I am trying to simplify the handling of messages to actors in my application. It has become quite large and messy, so I'm refactoring how I pass around actor references.

The plan is to:

  • Have a master actor that you "register" a spawned actor with
  • Pass commands to this master actor to pass onto the appropriate registered actor

This would mean I would only need to pass around a single actor reference in my application.

So far, I have this (it is unfinished):

package core

import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill}
import akka.pattern.ask
import akka.util.Timeout
import core.ActorManager.{AskTimeout, Forward, RegisterActor}

import scala.concurrent.Await

object ActorManager {
  case object Command
  case class Forward(actorName: String, command: Command)

  case class AskTimeout(actorName: String, command: Command, timeout: Timeout, classRef: Class[?], blocking: Boolean = false)

  case class RegisterActor(actorRef: ActorRef, name: String)
}

class ActorManager extends Actor {

  private val system: ActorSystem = context.system

  private var classicActors: Map[String, ActorRef] = Map.empty

  override def receive: Receive = {
    case RegisterActor(actorRef, name) =>
      if (classicActors.contains(name)) {
        system.log.warning(s"ActorSystem with name $name already registered. It will be overwritten.")
      }
      classicActors += name -> actorRef

    case ft@Forward(name, command) =>
      val classicActor = classicActors.get(name)
      if (classicActor.isDefined) {
        classicActor.get.forward(command)
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${ft.toString}] was not processed.")
      }

    case at@AskTimeout(name, command, timeout, classRef, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[classRef], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

    case PoisonPill =>
      // shutdown all actors

  }

}

I seem to have a problem with the classRef field. The ask pattern needs to know what type of class to map the response into, which should be configurable at the point of creating the message to pass to this actor.

I tried:

  case class AskTimeout[T](actorName: String, command: Command, timeout: Timeout, blocking: Boolean = false)

...

  override def receive: Receive = {
    case at@AskTimeout[T](name, command, timeout, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        if (blocking) {
          val result = Await.result((classicActor.get ? command).mapTo[T], timeout.duration)
        } else {

        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }

  }

}

But it seems I cannot pass the class reference this way either.

Is my approach completely wrong here or is there a correct way of achieving this?

Share Improve this question asked Mar 20 at 11:30 Kris RiceKris Rice 8791 gold badge11 silver badges29 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I have found a solution.

One of the issues was the use of Class[?] which I have replaced with Class[_]

Now, when the actor receives a AskTimeout, it explicitly casts the result to the provided class type:

    case at@AskTimeout(name, command, timeout, classRef, blocking) =>
      val classicActor = classicActors.get(name)

      implicit val actorTimeout: Timeout = timeout

      if (classicActor.isDefined) {
        val future = classicActor.get ? command
        if (blocking) {
          val result = Await.result(future, timeout.duration)
          if (!classRef.isInstance(result)) {
            throw new ClassCastException(s"Response is not of type ${classRef.getName}")
          }
          val casted = classRef.cast(result)
          sender() ! casted
        } else {
          future.foreach { result =>
            if (classRef.isInstance(result)) {
              val casted = classRef.cast(result)
              sender() ! casted
            } else {
              system.log.error(s"Received response not matching type ${classRef.getName}")
            }
          }(context.dispatcher)
        }
      } else {
        system.log.error(s"Actor by the name of: $name is not registered. Command [${at.toString}] was not processed.")
      }
发布评论

评论列表(0)

  1. 暂无评论