我试图实现一个服务器发送的事件控制器,用最新的数据更新我的Web浏览器客户端。
这是我当前的控制器,每5秒发送一次我的数据列表。我想在每次将数据保存在其他服务中时发送 SSE。我阅读了有关使用通道的信息,但是如何将其与 Flux 一起使用?
@GetMapping("/images-sse")
fun getImagesAsSSE(
request: HttpServletRequest
): Flux<ServerSentEvent<MutableList<Image>>> {
val subdomain = request.serverName.split(".").first()
return Flux.interval(Duration.ofSeconds(5))
.map {
ServerSentEvent.builder<MutableList<Image>>()
.event("periodic-event")
.data(weddingService.getBySubdomain(subdomain)?.pictures).build()
}
}
控制器的示例代码:
package sk.qpp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.concurrent.atomic.AtomicLong;
@Controller
@Slf4j
public class ReactiveController {
record SomeDTO(String name, String address) {
}
private final Sinks.Many<SomeDTO> eventSink = Sinks.many().multicast().directBestEffort();
@RequestMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<SomeDTO>> sse() {
final AtomicLong counter = new AtomicLong(0);
return eventSink.asFlux()
.map(e -> ServerSentEvent.builder(e)
.id(counter.incrementAndGet() + "")
//.event(e.getClass().getName())
.build());
}
// note, when you want this to work in production, ensure, that http request is not being cached on its way, using POST method for example.
@ResponseStatus(HttpStatus.OK)
@ResponseBody
@GetMapping(path = "/sendSomething", produces = MediaType.TEXT_PLAIN_VALUE)
public String sendSomething() {
this.eventSink.emitNext(
new SomeDTO("name", "address"),
(signalType, emitResult) -> {
log.warn("Some event is being not send to all subscribers. It will vanish...");
// returning false, to not retry emitting given data again.
return false;
}
);
return "Have a look at /sse endpoint (using \"curl http://localhost/sse\" for example), to see events in realtime.";
}
}
Sink被用作一些“自定义通量”,您可以在其中放置任何内容(使用emitNext),并从中获取(使用asFlux()方法)。
在设置了示例控制器之后,在浏览器中打开http://localhost:9091/send something(即,对其执行get请求),并在控制台发出命令< code > curl http://localhost:9091/sse 来查看SSE事件(在每个GET请求之后,应该会出现新的事件)。也可以在chromium浏览器中直接看到sse事件。Firefox尝试下载并保存到文件系统作为文件(也可以)。
我终于成功了。我还使用cookie添加了特定于用户的更新。
这是我的SSE控制器
@RestController
@RequestMapping("/api/sse")
class SSEController {
val imageUpdateSink : Sinks.Many<Wedding> = Sinks.many().multicast().directBestEffort()
@GetMapping("/images")
fun getImagesAsSSE(
request: HttpServletRequest
): Flux<ServerSentEvent<MutableList<Image>>> {
val counter: AtomicLong = AtomicLong(0)
return imageUpdateSink.asFlux()
.filter { wedding ->
val folderId = request.cookies.find {cookie ->
cookie.name == "folderId"
}?.value
folderId == wedding.folderId
}.map { wedding ->
ServerSentEvent.builder<MutableList<Image>>()
.event("new-image")
.data(
wedding.pictures
).id(counter.incrementAndGet().toString())
.build()
}
}
}
在我的服务中,我的数据被更新:
val updatedImageList = weddingRepository.findByFolderId(imageDTO.folderId)
sseController.imageUpdateSink.tryEmitNext(
updatedImageList
)
我的Javascript看起来像这样:
document.cookie = "folderId=" + [[${wedding.folderId}]]
const evtSource = new EventSource("/api/sse/images")
evtSource.addEventListener("new-image", function(alpineContext){
return function (event) {
console.log(event.data)
alpineContext.images = JSON.parse(event.data)
};
}(this))