提问者:小点点

向涉及 akka 的异步服务器发送请求


我想向一个包含参与者的服务器发出一个异步请求。假设我有两个演员:

 class SessionRetriever extends Actor {
  import SessionRetriever._

  def receiver = {
    Get => 
      val s = getSessionIdFromServer() // 1
      sender ! Result(s)               // 2
  }

  def getSessionIdFromServer(): String = { ... } // 3
}

object SessionRetriever {
  object Get
  object Result(s: String)
}

class RequestSender extends Actor {
  val sActor = context actorOf Props[SessionRetriever]

  def receiver = {
    // get session id
    val sesId = sActor ! SessionRetriever.Get 
    val res = sendRequestToServer(sesId)      
    logToFile(res)                            

    context shutdown sActor       
  }


  def sendRequestToServer(sessionId: String): String = { .... } 
}

我的问题是:

val s = getSessionIdFromServer() // 1
sender ! Result(s)               // 2

1) getSessionIdFromServer()向服务器发出同步请求。我认为异步请求会好得多,对吗?所以它将返回Future[String]而不是一个普通的字符串。

2)我如何使异步:通过使用AsyncHttpClient(如果我没记错的话)或将其同步体包装到Future { } 中?

3)我应该使用阻塞{}块吗?如果是,那么确切的位置:在它的主体内部还是这里val s=阻塞{getSessionIdFromServer()}

附言:在这一点上,我不想使用异步{}等待{},因为它们是相当高级的函数,毕竟它们是建立在Futures之上的


共3个答案

匿名用户

你可以试试这种非阻塞的方式

def receive = {
    Get => 
      //assume getSessionIdFromServer() run aysnchronize
      val f: Future[String] = getSessionIdFromServer()
      val client = sender //keep it local to use when future back
      f onComplete {
        case Success(rep) => client ! Result(rep)
        case Failure(ex) => client ! Failed(ex)
      }
 }

匿名用户

1) 如果getSessionIdFromServer()被阻塞,那么您应该从接收函数异步执行它,否则您的参与者将在每次接收新请求时阻塞,并将始终等到接收到新会话后再处理下一个请求。

2) 使用 Future 会将阻塞操作“移动”到不同的线程。因此,您的Actor不会被阻止,并且能够继续处理传入的请求 - 这很好 - 但是您仍然阻塞了线程 - 不是那么好。使用 AsyncHttpClient 是一个好主意。您可以探索其他非阻塞httpClient,例如PlayWebService。

3)我对< code>blocking不太熟悉,所以不确定我是否应该在这里给出任何建议。据我所知,它将告诉线程池操作正在阻塞,它应该产生一个临时的新线程来处理它——这避免了所有的工作线程都被阻塞。同样,如果您这样做,您的actor将不会被阻塞,但是您在从服务器获取会话时仍然阻塞了一个线程。

总结一下:如果可能的话,只需在getSessionIdFromServer中使用异步http客户端。否则,请使用 Future{}block

匿名用户

要使用AsyncHttpClient进行异步调用,您可以通过scalapromise处理java的未来。

    import scala.concurrent.Future
    import com.ning.http.client.AsyncHttpClient
    import scala.concurrent.Promise
    import java.util.concurrent.Executor

    object WebClient {

      private val client = new AsyncHttpClient

      case class BadStatus(status: Int) extends RuntimeException

      def get(url: String)(implicit exec: Executor): Future[String] = {
        val f = client.prepareGet(url).execute();
        val p = Promise[String]()
        f.addListener(new Runnable {
          def run = {
            val response = f.get
            if (response.getStatusCode / 100 < 4)
              p.success(response.getResponseBodyExcerpt(131072))
            else p.failure(BadStatus(response.getStatusCode))
          }
        }, exec)
        p.future
      }

      def shutdown(): Unit = client.close()
 }

 object WebClientTest extends App {
   import scala.concurrent.ExecutionContext.Implicits.global
   WebClient get "http://www.google.com/" map println foreach (_ => WebClient.shutdown())
 }

然后通过回调处理未来的完成。

代码归功于Coursera令人敬畏的反应式编程课程。

相关问题