логотип PurpleSchool
логотип PurpleSchool

Настройка шины событий NATS NSQ в Go

Автор

Олег Марков

Введение

Современные распределённые системы часто требуют быстрой и надёжной доставки сообщений между сервисами. Для этого применяют брокеры сообщений или event-bus — шины событий, которые обеспечивают асинхронную коммуникацию, масштабируемость и отказоустойчивость. Среди наиболее популярных брокеров — NATS и NSQ. Оба решения активно используются в экосистеме Go, предоставляя высокую скорость обмена информацией.

В этой статье я расскажу, как на практике можно настроить шину событий на базе NATS или NSQ в вашем Go-проекте. Вы узнаете, как устанавливать соединение, отправлять сообщения (publish), получать их (subscribe/consume), обрабатывать ошибки и правильно организовывать код с этими библиотеками. Мы разберём сценарии продвинутого применения, дадим рекомендации по безопасности и производительности, а также рассмотрим плюсы и минусы каждого подхода.

Основы событийных шин

Прежде чем переходить к практической части, важно понимать, что такое событийная шина. Это компонент архитектуры, который позволяет распределённым приложениям обмениваться событиями (сообщениями) без прямой зависимости друг от друга. Принцип понятен: одним сервисом сообщение отправляется в шину, а другой сервис его получает и обрабатывает.

NATS и NSQ — это легковесные, быстрые и простые в использовании решения. Оба они позволяют реализовать publish-subscribe и queue-подписку, обе имеют простые библиотеки для Go.

Следующие разделы помогут понять, как выбрать брокер под свои задачи и внедрить его в реальный проект.

NATS: подключение и базовые операции

Почему NATS

NATS — это open-source система обмена сообщениями с минимальной задержкой. Она поддерживает как point-to-point, так и pub/sub коммуникацию. Сильные стороны NATS — простота, высокая скорость, автомасштабирование и поддержка кластеризации.

Установка NATS-сервера

Для тестов NATS-сервер можно поднять у себя локально. Установка проста: скачайте бинарник nats-io/nats-server, либо используйте Docker.

Пример запуска через Docker:

docker run -p 4222:4222 nats

Порт 4222 — стандартный порт сервера NATS.

Установка Go-клиента

Для Go-проектов используется официальный клиент:

go get github.com/nats-io/nats.go

Пример публикации события

Вот минимальный пример публикации события в NATS из Go:

package main

import (
    "log"
    "github.com/nats-io/nats.go"
)

func main() {
    // Подключаемся к серверу
    nc, err := nats.Connect(nats.DefaultURL) // nats://127.0.0.1:4222
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Publish - отправляем сообщение в топик "updates"
    err = nc.Publish("updates", []byte("Привет! Это сообщение от сервиса А."))
    if err != nil {
        log.Fatal(err)
    }

    // Обеспечиваем flush, чтобы гарантировать доставку
    nc.Flush()
}

Пример подписки на события

Теперь пример, как принимать события:

package main

import (
    "log"
    "github.com/nats-io/nats.go"
)

func main() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    // Подписываемся на события в топике "updates"
    _, err = nc.Subscribe("updates", func(m *nats.Msg) {
        log.Printf("Получено сообщение: %s", string(m.Data))
    })
    if err != nil {
        log.Fatal(err)
    }

    // Ожидаем сообщений, не даём завершиться приложению
    select {}
}

В этом примере подписка бессрочная, программа завершится только принудительно.

Работа с очередями

NATS позволяет реализовать логику очередей для воркеров (worker groups). То есть только один из подписчиков группы получит сообщение — полезно для распределения нагрузки.

// Подписка с привязкой к очереди "workers"
nc.QueueSubscribe("updates", "workers", func(m *nats.Msg) {
    log.Printf("Рабочий получил: %s", string(m.Data))
})

Дополнительные возможности NATS

  • Автоматическое переподключение: клиент пытается восстановить соединение сам.
  • Сообщения с подтверждением: NATS Streaming (JetStream) добавляет надежность (но это отдельная тема).
  • Cluster Mode: простое масштабирование в production.
  • TLS/аутентификация: поддержка сертификатов, JWT, паролей.
  • Request-Reply паттерн: запрос-ответ из коробки.

Обработка ошибок

NATS-client предоставляет хендлеры событий подключения:

opts := []nats.Option{
    nats.Name("MyGoService"),
    nats.ReconnectHandler(func(_ *nats.Conn) { log.Println("Соединение восстановлено") }),
    nats.DisconnectHandler(func(_ *nats.Conn) { log.Println("Отключение!") }),
}

nc, err := nats.Connect(nats.DefaultURL, opts...)

Это полезно для продвинутого мониторинга состояния канала.

NSQ: архитектура, настройка, работа из Go

Почему NSQ

NSQ — брокер сообщений, специально спроектированный для горизонтального масштабирования и обеспечения высокой доступности. Отличается высокой надёжностью доставки, минимальной задержкой и быстрой обработкой больших потоков событий. Легко наблюдать очередь, настраивать ретраи, паузы и др.

Установка NSQ компонентов

NSQ состоит из трёх компонентов:

  • nsqd — сервер, принимающий и отдающий сообщения.
  • nsqlookupd — дискавери-сервис для автоматического поиска nsqd.
  • nsqadmin — web-интерфейс для управления.

Обычный порядок запуска:

  1. Запускаем nsqlookupd:
docker run -p 4160:4160 -p 4161:4161 nsqio/nsq nsqlookupd
  1. Запускаем nsqd, указывая url nsqlookupd:
docker run -p 4150:4150 -p 4151:4151 nsqio/nsq nsqd --lookupd-tcp-address=host.docker.internal:4160
  1. Опционально запускаем nsqadmin (web):
docker run -p 4171:4171 nsqio/nsq nsqadmin --lookupd-http-address=host.docker.internal:4161

lokalhost может отличаться; используйте docker networking для удобства.

Добавление Go-клиентов

Устанавливаем библиотеки:

go get github.com/nsqio/go-nsq

Отправка сообщений из Go в NSQ

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
)

func main() {
    // Конфиг по умолчанию
    cfg := nsq.NewConfig()
    
    // Создаём продьюсера
    producer, err := nsq.NewProducer("127.0.0.1:4150", cfg)
    if err != nil {
        log.Fatal(err)
    }

    // Функция публикации сообщения
    err = producer.Publish("events", []byte("Cобытие для слушателей NSQ"))
    if err != nil {
        log.Fatal(err)
    }

    producer.Stop()
}

Подписка / получение сообщений (consumer)

В NSQ можно использовать каналы (channel), чтобы каждый обработчик в рамках канала получал своё сообщение, а не все сразу (load balancing).

package main

import (
    "github.com/nsqio/go-nsq"
    "log"
)

// Обработчик сообщений
type MyHandler struct{}

func (h *MyHandler) HandleMessage(m *nsq.Message) error {
    log.Printf("NSQ Consumer получил: %s", string(m.Body))
    m.Finish() // Сообщаем NSQ, что сообщение обработано
    return nil
}

func main() {
    cfg := nsq.NewConfig()
    // Создаём consumer на топик "events", канал "worker1"
    consumer, err := nsq.NewConsumer("events", "worker1", cfg)
    if err != nil {
        log.Fatal(err)
    }

    // Привязываем обработчик
    consumer.AddHandler(&MyHandler{})

    // Подключаемся к nsqlookupd для получения адресов nsqd
    err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
    if err != nil {
        log.Fatal(err)
    }

    // Ожидаем, чтобы приложение не завершилось
    select {}
}

Повторная доставка и "dead letter queue"

Если HandleMessage возвращает ошибку, NSQ повторит доставку. Количество ретраев и поведение можно конфигурировать через MaxAttempts и другие параметры.

Особенности и фишки NSQ

  • Поддержка распределённых очередей, каналов и динамического масштабирования.
  • Легко отслеживать метрики через интерфейс nsqadmin.
  • Отлично подходит для горизонтального выстраивания воркеров (обработка фоновых заданий).
  • Обрабатывает большие объёмы данных без задержек.
  • Прост в установке и эксплуатации.

Особенности интеграции и архитектурные рекомендации

Что выбрать — NATS или NSQ

  • NATS: отлично подходит для легковесных систем, real-time коммуникаций, когда нужна минимальная задержка и простая pub/sub архитектура. Лучше подходит для коротких быстрых сообщений.
  • NSQ: идеален для построения надёжных очередей заданий, требует гарантированной обработки и поддержки большого количества воркеров. Удобен при тяжёлых задачах (background jobs), обеспечивает ат-лизд деливери и позволяет тонко настраивать поведение очереди.

Принципы проектирования event-bus компонентов

  • Используйте уникальные топики под разные типы событий.
  • Рекомендуется явно логировать все сбои доставки.
  • Следите за отказами подключения, позволяйте клиента автоматически переподключаться.
  • Не держите слишком большие сообщения в шине событий, лучше проксировать только идентификаторы.
  • Вынесите конфиги эндпоинтов и названия топиков в отдельные параметры, не захардкодивайте в коде.

Настройка безопасности и TLS

Обе системы поддерживают TLS-соединения и аутентификацию. Для NATS достаточно указать nats.Secure() и пути к сертификатам в опциях, для NSQ реализуйте TLS через опции tls_v1, больше информации — в документации к брокерам.

Как не потерять сообщения

Для NATS рекомендуем использовать JetStream для долговременного хранения и подтверждения доставки. В NSQ загрузка и обработка чётко отделены: сообщения хранятся в очереди, пока не будут обработаны.

Масштабирование

  • Для NATS просто запускайте несколько инстансов, настройте кластер (кластеризация настраивается сервером).
  • В NSQ масштабирование реализуется путём добавления новых nsqd и динамического обновления lookupd.

Пример реактивного микросервиса

Вот базовый пример архитектуры:

  • Микросервис А публикует события через NATS.
  • Микросервис В подписан на них и реагирует (например, записывает в базу или уведомляет клиента).
  • Микросервис С — подписчик очереди NSQ, который обрабатывает тяжёлые задачи (фоновая обработка), остальные воркеры берут задачи при необходимости.

Каждый микросервис изолирован, связь реализуется через шину событий. Даже если сервис одновременнно подписан на разные брокеры, код остаётся высоко независимым.

Заключение

NATS и NSQ — надёжные, простые и эффективные event-bus решения для Go. Они закрывают разные классы задач и легко интегрируются в микросервисную архитектуру. Используя простые клиенты для Go, вы сможете реализовать быструю доставку/получение событий, организовать балансировку нагрузки, масштабируемую обработку, а также дополнительно обезопасить систему через TLS и аутентификацию. Надеюсь, практические примеры, коды и рекомендации помогут вам быстро внедрить событийную шину в ваш проект.

Частозадаваемые технические вопросы по теме статьи и ответы на них

Как подключить авторизацию по JWT для NATS?

В файле конфигурации NATS-сервера задайте секцию authorization и добавьте ваши JWT-токены. Для клиента в Go используйте опции подключения, указав токен через nats.UserJWT.

Почему NSQ consumer не видит новые сообщения сразу?

Убедитесь, что consumer подключён к актуальному nsqlookupd, топик создан и id канала уникален для каждой группы воркеров. Проверьте порты между контейнерами — часто проблема во внутренней сети Docker.

Как реализовать отложенную доставку сообщений в NSQ?

Используйте параметр DeferredPublish у producer: producer.DeferredPublish(topic, время, сообщение). Это позволит отложить доставку на заданный интервал.

Как работать с TLS-соединениями в NATS и NSQ в Go?

Для NATS добавьте в опции подключения nats.Secure() и укажите сертификаты. Для NSQ в конфиге установите tls_v1 = true, а клиентам задайте соответствующие параметры TLS в nsq.NewConfig().

Как ограничить количество parallel-обработчиков в NSQ consumer?

Используйте метод consumer.ChangeMaxInFlight(n) для регулировки одновременных сообщений, которые consumer может обрабатывать. Например, consumer.ChangeMaxInFlight(10) — будет обрабатывать не больше 10 сообщений одновременно.

Стрелочка влевоТрейсинг запросов с OpenTelemetry в GoМиграции базы данных в GolangСтрелочка вправо

Постройте личный план изучения Golang до уровня Middle — бесплатно!

Golang — часть карты развития Backend

  • step100+ шагов развития
  • lessons30 бесплатных лекций
  • lessons300 бонусных рублей на счет

Бесплатные лекции

Все гайды по Golang

Работа с YAML в GolangПреобразование типов в GolangКонвертация структур в JSON в GolangStrconv в GolangИспользование пакета SQLx для работы с базами данных в GolangРазбираемся с SQL в GolangРазделение строк с помощью функции split в GolangSort в GoПоиск и замена строк в Go - GolangИспользование пакета reflect в GolangРабота с PostgreSQL в GoPointers в GolangПарсинг в GoРабота со списками (list) в GolangПреобразование int в string в GolangРабота с числами с плавающей точкой в GolangРабота с полями в GolangИспользование enum в GolangОбработка JSON в GoЧтение и запись CSV-файлов в GolangРабота с cookie в GolangРегистры в GoКэширование данных в GolangПреобразование byte в string в GolangByte в GoИспользование bufio для работы с потоками данных в GolangДобавление данных и элементов (add) в Go
Логирование в Golang. Zap, Logrus, Loki, GrafanaРабота с Docker-контейнерами в GoИспользование pprof в GolangМеханизмы синхронизации в GolangРабота с пакетом S3 в GolangМониторинг Golang приложений с помощью PrometheusОптимизация проектов на GoПаттерны проектирования в GolangТрейсинг запросов с OpenTelemetry в GoНастройка шины событий NATS NSQ в GoМиграции базы данных в GolangНастройка уровней логирования log levels в GoОркестрация контейнеров Go с Kubernetes + DockerGjGo Playground и компилятор GolangИспользование go mod init для создания модулей GolangРабота с переменными окружения (env) в GolangКоманда go build в GolangАвтоматизация Golang проектов — CI/CD с GitLab CI и JenkinsРуководство по embed в GoОтладка кода в GolangЧтение и использование конфигурации в приложениях на GolangКомпиляция в GolangКак развернуть Go-приложение на облаке AWSАутентификация в Golang
Сетевые протоколы в GoПеременные в GolangЗначения в GolangДженерик %T и его применение в GolangТипы данных в GolangИспользование tls в GolangИспользование tag в структурах GolangSwitch в GoСтроки в GolangРабота с потоками (stream) в GolangSelect в GoРуны в GoРабота с пакетом params в GolangКонвертация строк в числа в GolangNull, Nil, None, 0 в GoНаименования переменных, функций и структур в GoInt в GolangУстановка GolangЧтение и установка HTTP заголовков в GolangMethods в GolangGoLand — IDE для разработки на Golang от JetBrainsОбработка «not found» в GolangFloat в GolangФлаги командной строки в Go (Golang)Запуск внешних команд в GolangОбработка ошибок в GoИспользование defer в GolangЗначения default в GolangГенерация кода в GoФорматирование кода в GolangЧистая архитектура в GolangКаналы (channels) в GolangПолучение body из HTTP запроса в Golang
Открыть базу знаний

Лучшие курсы по теме

изображение курса

Основы Golang

Антон Ларичев
AI-тренажеры
Практика в студии
Гарантия
Бонусы
иконка звёздочки рейтинга4.7
3 999 ₽ 6 990 ₽
Подробнее
изображение курса

Nest.js с нуля

Антон Ларичев
AI-тренажеры
Практика в студии
Гарантия
Бонусы
иконка звёздочки рейтинга4.6
3 999 ₽ 6 990 ₽
Подробнее
изображение курса

Docker и Ansible

Антон Ларичев
AI-тренажеры
Гарантия
Бонусы
иконка звёздочки рейтинга4.8
3 999 ₽ 6 990 ₽
Подробнее