提问者:小点点

在使用Flatmap和多个订阅服务器时,将Future block多次调用


我一直在我的应用程序中成功地使用BrightFutures,主要用于异步网络请求。 我决定是时候看看我是否可以迁移到组合。 然而,我发现,当我使用flatMap和两个订户组合两个Future时,我的第二个Future代码块被执行两次。 下面是一些将直接在操场上运行的示例代码:

import Combine
import Foundation

extension Publisher {
    func showActivityIndicatorWhileWaiting(message: String) -> AnyCancellable {
        let cancellable = sink(receiveCompletion: { _ in Swift.print("Hide activity indicator") }, receiveValue: { (_) in })
        Swift.print("Busy: \(message)")
        return cancellable
    }
}

enum ServerErrors: Error {
    case authenticationFailed
    case noConnection
    case timeout
}

func authenticate(username: String, password: String) -> Future<Bool, ServerErrors> {
    Future { promise in
        print("Calling server to authenticate")
        DispatchQueue.main.async {
            promise(.success(true))
        }
    }
}

func downloadUserInfo(username: String) -> Future<String, ServerErrors> {
    Future { promise in
        print("Downloading user info")
        DispatchQueue.main.async {
            promise(.success("decoded user data"))
        }
    }
}

func authenticateAndDownloadUserInfo(username: String, password: String) -> some Publisher {
    return authenticate(username: username, password: password).flatMap { (isAuthenticated) -> Future<String, ServerErrors> in
        guard isAuthenticated else {
            return Future {$0(.failure(.authenticationFailed)) }
        }
        return downloadUserInfo(username: username)
    }
}

let future = authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
let cancellable2 = future.showActivityIndicatorWhileWaiting(message: "Please wait downloading")
let cancellable1 = future.sink(receiveCompletion: { (completion) in
    switch completion {
    case .finished:
        print("Completed without errors.")
    case .failure(let error):
        print("received error: '\(error)'")
    }
}) { (output) in
    print("received userInfo: '\(output)'")
}

代码模拟发出两个网络调用,并将它们作为一个单元(flatmap)一起运行,这两个单元要么成功,要么失败。 结果输出为:

调用服务器进行身份验证
忙:请等待下载
正在下载用户信息
正在下载用户信息
正在下载用户信息
正在下载用户信息






----意外的第二个网络调用
隐藏activity指示器

问题是DownloadUserInfo((username:)似乎被调用了两次。如果我只有一个订阅者,那么DownloadUserInfo((username:)只被调用了一次。我有一个难看的解决方案,它将FlatMap封装在另一个Future中,但我觉得我缺少了一些简单的东西。有什么想法吗?


共1个答案

匿名用户

使用LetFuture创建实际发布服务器时,追加.share运算符,以便两个订阅服务器订阅单个拆分管道。

编辑:正如我在我的评论中所说的,我会在你的管道中做一些其他的改变。 这里有一个建议的重写。 其中一些变化是风格上的/装饰性的,这是我如何编写组合代码的一个例子; 你可以接受也可以离开。 但其他的事情都是必要的。 您需要对您的未来进行延迟包装,以防止过早联网(即在订阅发生之前)。 您需要store您的管道,否则它将在网络启动之前不复存在。 我还将.handleevents替换为您的第二个订阅服务器,但是如果您使用上述解决方案中的.share,如果您确实想使用第二个订阅服务器,那么您仍然可以使用第二个订阅服务器。 这是一个完整的例子; 您可以将其复制并粘贴到一个项目中。

class ViewController: UIViewController {
    enum ServerError: Error {
        case authenticationFailed
        case noConnection
        case timeout
    }
    var storage = Set<AnyCancellable>()
    func authenticate(username: String, password: String) -> AnyPublisher<Bool, ServerError> {
        Deferred {
            Future { promise in
                print("Calling server to authenticate")
                DispatchQueue.main.async {
                    promise(.success(true))
                }
            }
        }.eraseToAnyPublisher()
    }
    func downloadUserInfo(username: String) -> AnyPublisher<String, ServerError> {
        Deferred {
            Future { promise in
                print("Downloading user info")
                DispatchQueue.main.async {
                    promise(.success("decoded user data"))
                }
            }
        }.eraseToAnyPublisher()
    }
    func authenticateAndDownloadUserInfo(username: String, password: String) -> AnyPublisher<String, ServerError> {
        let authenticate = self.authenticate(username: username, password: password)
        let pipeline = authenticate.flatMap { isAuthenticated -> AnyPublisher<String, ServerError> in
            if isAuthenticated {
                return self.downloadUserInfo(username: username)
            } else {
                return Fail<String, ServerError>(error: .authenticationFailed).eraseToAnyPublisher()
            }
        }
        return pipeline.eraseToAnyPublisher()
    }
    override func viewDidLoad() {
        super.viewDidLoad()
        authenticateAndDownloadUserInfo(username: "stack", password: "overflow")
            .handleEvents(
                receiveSubscription: { _ in print("start the spinner!") },
                receiveCompletion: { _ in print("stop the spinner!") }
        ).sink(receiveCompletion: {
            switch $0 {
            case .finished:
                print("Completed without errors.")
            case .failure(let error):
                print("received error: '\(error)'")
            }
        }) {
            print("received userInfo: '\($0)'")
        }.store(in: &self.storage)
    }
}

输出:

start the spinner!
Calling server to authenticate
Downloading user info
received userInfo: 'decoded user data'
stop the spinner!
Completed without errors.