我想向一个包含参与者的服务器发出一个异步请求。假设我有两个演员:
class SessionRetriever extends Actor {
import SessionRetriever._
def receiver = {
Get =>
val s = getSessionIdFromServer() // 1
sender ! Result(s) // 2
}
def getSessionIdFromServer(): String = { ... } // 3
}
object SessionRetriever {
object Get
object Result(s: String)
}
和
class RequestSender extends Actor {
val sActor = context actorOf Props[SessionRetriever]
def receiver = {
// get session id
val sesId = sActor ! SessionRetriever.Get
val res = sendRequestToServer(sesId)
logToFile(res)
context shutdown sActor
}
def sendRequestToServer(sessionId: String): String = { .... }
}
我的问题是:
val s = getSessionIdFromServer() // 1
sender ! Result(s) // 2
1) getSessionIdFromServer()向服务器发出同步请求。我认为异步请求会好得多,对吗?所以它将返回Future[String]而不是一个普通的字符串。
2)我如何使异步:通过使用AsyncHttpClient(如果我没记错的话)或将其同步体包装到Future { } 中?
3)我应该使用阻塞{}块吗?如果是,那么确切的位置:在它的主体内部还是这里val s=阻塞{getSessionIdFromServer()}
?
附言:在这一点上,我不想使用异步{}
和等待{},
因为它们是相当高级的函数,毕竟它们是建立在Futures之上的
。
你可以试试这种非阻塞的方式
def receive = {
Get =>
//assume getSessionIdFromServer() run aysnchronize
val f: Future[String] = getSessionIdFromServer()
val client = sender //keep it local to use when future back
f onComplete {
case Success(rep) => client ! Result(rep)
case Failure(ex) => client ! Failed(ex)
}
}
1) 如果getSessionIdFromServer()
被阻塞,那么您应该从接收
函数异步执行它,否则您的参与者将在每次接收新请求时阻塞,并将始终等到接收到新会话后再处理下一个请求。
2) 使用 Future 会将阻塞操作“移动”到不同的线程。因此,您的Actor不会被阻止,并且能够继续处理传入的请求 - 这很好 - 但是您仍然阻塞了线程 - 不是那么好。使用 AsyncHttpClient 是一个好主意。您可以探索其他非阻塞httpClient,例如PlayWebService。
3)我对< code>blocking不太熟悉,所以不确定我是否应该在这里给出任何建议。据我所知,它将告诉线程池操作正在阻塞,它应该产生一个临时的新线程来处理它——这避免了所有的工作线程都被阻塞。同样,如果您这样做,您的actor将不会被阻塞,但是您在从服务器获取会话时仍然阻塞了一个线程。
总结一下:如果可能的话,只需在getSessionIdFromServer
中使用异步http客户端。否则,请使用 Future{}
或 block
。
要使用AsyncHttpClient进行异步调用,您可以通过scalapromise处理java的未来。
import scala.concurrent.Future
import com.ning.http.client.AsyncHttpClient
import scala.concurrent.Promise
import java.util.concurrent.Executor
object WebClient {
private val client = new AsyncHttpClient
case class BadStatus(status: Int) extends RuntimeException
def get(url: String)(implicit exec: Executor): Future[String] = {
val f = client.prepareGet(url).execute();
val p = Promise[String]()
f.addListener(new Runnable {
def run = {
val response = f.get
if (response.getStatusCode / 100 < 4)
p.success(response.getResponseBodyExcerpt(131072))
else p.failure(BadStatus(response.getStatusCode))
}
}, exec)
p.future
}
def shutdown(): Unit = client.close()
}
object WebClientTest extends App {
import scala.concurrent.ExecutionContext.Implicits.global
WebClient get "http://www.google.com/" map println foreach (_ => WebClient.shutdown())
}
然后通过回调处理未来的完成。
代码归功于Coursera令人敬畏的反应式编程课程。