Push Notifications with coroutines

Автор: Ivan Osipov
Дата публикации: 2019-11-07
В категориях: Development, Backend, Kotlin
Тэги: development, backend, kotlin

В определенный момент жизненного цикла любого более менее серьезного веб приложения появляется потребность в отправке push нотификаций в браузер. Исторически такие нотификации можно получить на стороне браузера разными способами. Это может быть широко известный websocket, который у всех на слуху, это может быть реализовано на основе long или short polling, когда клиент либо подвешивает запрос или непрерывно отпрашивает сервер короткоживущими запросами. Ну и на конец, либо это Server Sent Event о котором мы сегодня и поговорим, а затем реализуем при помощи корутин и Kotlin.

Сама по себе технология Server Sent Event не нова и напоминает реактивный обмен даннными. Клиент подвешивает соединение в которое со стороны сервера стримится информация по кусочкам.

Создаем проект

Как обычно, заходи на start.spring.io и генерируй spring boot проект со следующией конфигурацией.

  • Gradle
  • Kotlin
  • Spring Boot 2.2.1+
  • Dependencies: Reactive

Обрати внимание, что уже при такой минимальной конфигурации в зависимостях проекта окажется одна, интересная нам - org.jetbrains.kotlinx:kotlinx-coroutines-reactor

Прежде чем что-либо делать

  • Разархивируй проект
  • Открой в любимой IDE
  • Добавь в файл src/main/resource/application.preperties server.port=9090 для того, чтобы не законфликтовать с каким-нибудь уже запущенным локально приложением

Клиент

Создать клиент довольно просто, с него мы и начнем.

Создай где угодно файл index.html со следующим содержимым

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Sse Test</title>
</head>
<body>
<div id="content"></div>
<script>
    const eventSource = new EventSource("http://localhost:9090/events/stream");
    eventSource.onmessage = (ev) => {
        document.getElementById("content").innerHTML += ev.data + '</br>';
    }
</script>
</body>
</html>

Всё что делает этот кусочек html + js - создает event source и пытается подписаться на события по указанному URL http://localhost:9090/events/stream (ты же читал внимательно и поменял порт у spring boot приложения, да?)

Если ты посмотришь в Network в своём браузере, то увидишь, что такой эндпоинт не достижим. Давай его создадим

Сервер

Настала пора наполнить пустой сгенерированный проект чем-то полезным.

Создаём рядом с Application классом контроллер, можешь назвать его SseController или EventsController, или PushNotificationController, в общем, придумай крутое название для этого кусочка кода

@CrossOrigin
@RestController
@RequestMapping("/events")
class EventsController {
    @RequestMapping("/stream")
    fun eventStream() = Flux.just("a", "b", "c")
}

Обрати внимание, что Сors никто не отменял, так что есть аннотация @CrossOrigin. Реактивные стримы тоже никуда не уходят, так как всё работает на их основе. Для начала мы напишем знакомую реализацию на основе Flux и понаблюдаем за поведением клиента.

Фактически нам доступны два вида клиентов. Первый - тот, что внутри index.html. Второй - браузер, когда мы заходим напрямую по ссылке http://localhost:9090/events/stream. Понаблюдаем за их поведением.

Как работает прямой запрос

Если мы открываем в браузере эндпоинт http://localhost:9090/events/stream, то всё что мы увидим - это ‘abc’ как результат запроса, т.е. стрим вернул все значения которые в нём были и закрылся.

Как работает клиент

Если мы откроем написанный выше клиент в браузере, то увидим, что строчки a b и c появляются многократно, буквально каждую секунду. Довольно странное поведение, но вполне логичное. Одно из важных преимуществ SSE перед Websocket - SSE прямо из коробки умеет востанавливать соединение, т.е. если по какой-то причине сервер завершил стриминг, то клиент пересоздаст соединение. Это и происходит. В нашем случае сервер посылает a, b и c, а затем завершает поток информации, на что голодный клиент, просит ещё и получет те же самые данные. На этом примере становится ясно, что либо ты должен никогда не завершать стриминг со стороны сервера, либо не посылать дублирующие события при реконнекте, если конечно, это не часть твоей бизнес логики.

Корутины

Flow

Начиная со Spring Boot 2.2 фреймворк полноценно поддерживает корутины, а если у тебя уже есть код, который работает в reactive стиле, то не составит труда сынтегрировать два подхода. Перепишем метод eventStream() сразу на корутины. В библиотеке поддерживается тип аналогичный Flux, который называется Flow. Обеспечить переход от одного типа к другому довольно просто есть extension функция asFlow() на типе Flux. Пускай наша реализация стримит текущее время, выглядеть она может вот так

@GetMapping("/stream")
fun eventStream() = flow {
  repeat(times = 1000) {
    delay(1000)
    emit(Date())
  }
}

Теперь открывая index.html видим вновь и вновь поступающие события, которые оповещают нас текущим временем.

Launch

Добавим аннотацию @EnableScheduling на тот же класс, что помечен как @SpringBootApplication или на любую другую конфигурацию, для того, чтобы иметь возможность имитировать ивенты, которые приходят к нам из вне, например, из очереди.

Для простоты реализации добавим прямо в контроллер Scheduled Job

@Scheduled(fixedDelay = 1000)
fun onEvent() {
  GlobalScope.launch {
    //...
  }
}

Каждую секунду будет запускаться запланированный вызов и благодаря функции launch будет запускаться корутина в контексте которой и будет происходить фактическое выполнение. В этой статье мы не будем говорить ни о видах скоупов, ни о том что такое Structural Concurrency, всё это в другой раз.

Интересно здесь, что ранее у нас появился метод, который возвращает Flow и теперь есть метод, который имитирует события, которые надо в этот Flow перебросить. Очевидно требуется какой-то способ коммуникации, самое время поговорить о каналах.

Channel

Давайте добавим немного коммуникации к ивентам и представим, что они приходят к нам из какого-то внешнего источника. Внешний источник мы уже иммитируем с помощью Scheduled Job. Для того, чтобы абстрагироваться от него и построить нативный способ коммуникации, воспользуемся каналами.

На каждом запросе на /v1/events/stream будем создавать канал, а затем при разрыве соединения (при закрытии браузера) будем очищать ресурсы.

//...
class EventsController {
  val channels: MutableSet<Channel<String>> = ConcurrentHashMap.newKeySet()
  
  @GetMapping("/stream")
  fun eventStream(): Flow<String> {
      val channel = Channel<String>()
      channel.invokeOnClose {
        channels.remove(channel)
      }
      channels.add(channel)
      return channel.consumeAsFlow()
  }
  
  @Scheduled(fixedDelay = 1000)
  fun onEvent() {
    GlobalScope.launch {
      channels.forEach {
        it.send(Date().toString())
      }
    }
  }
}

Теперь если снова открыть index.html, то каждую секунду будет прилетать событие с указанной датой.

Итого

В ходе статьи ты научился не только нотифицировать пользователей событиями которые происходят внутри системы, но и разобрался как это сделать без взаимодействия с reactive кодом, только на инструментах которые дают корутины (напомню, под капотом reactive никуда не делся).

Обязательно посмотри исходники вот здесь

comments powered by Disqus