Антон Ларичев

Введение
Event Sourcing — архитектурный паттерн, при котором состояние системы хранится не как текущий снимок, а как последовательность событий, приведших к этому состоянию. Вместо UPDATE users SET balance = 100 мы пишем событие MoneyDeposited { amount: 50 }, и текущий баланс получаем, применив все события подряд.
NestJS даёт удобную инфраструктуру для реализации этого подхода: модуль @nestjs/cqrs предоставляет агрегаты, шину команд, событий и запросов. В этой статье разберём, как собрать рабочий Event Sourcing-стек: от агрегата до проекций и снапшотов.
Зачем нужен Event Sourcing
Классический CRUD теряет историю: вы знаете, что у пользователя баланс 100, но не знаете, как он туда попал. Event Sourcing решает несколько задач:
- Полный аудит из коробки — каждое изменение это явное событие.
- Возможность «перемотать» состояние на любой момент времени.
- Естественная интеграция с CQRS и асинхронными проекциями.
- Простое построение новых read-моделей задним числом.
Плата за это — сложность: нужно версионировать события, делать снапшоты, аккуратно обрабатывать конкурентность.
Базовая установка
npm install @nestjs/cqrs
Подключаем модуль и регистрируем агрегат и обработчики команд.
// account.module.ts
import { Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { OpenAccountHandler } from './commands/open-account.handler';
import { DepositHandler } from './commands/deposit.handler';
import { AccountRepository } from './account.repository';
@Module({
imports: [CqrsModule],
providers: [OpenAccountHandler, DepositHandler, AccountRepository],
})
export class AccountModule {}
Агрегат и события
Агрегат — корень консистентности. Он принимает команды, валидирует их и порождает события. Применение событий меняет внутреннее состояние.
// events/account.events.ts
export class AccountOpenedEvent {
constructor(
public readonly accountId: string,
public readonly owner: string,
) {}
}
export class MoneyDepositedEvent {
constructor(
public readonly accountId: string,
public readonly amount: number,
) {}
}
// account.aggregate.ts
import { AggregateRoot } from '@nestjs/cqrs';
import { AccountOpenedEvent, MoneyDepositedEvent } from './events/account.events';
export class AccountAggregate extends AggregateRoot {
private balance = 0;
private opened = false;
constructor(private readonly id: string) {
super();
}
open(owner: string) {
if (this.opened) throw new Error('Аккаунт уже открыт');
this.apply(new AccountOpenedEvent(this.id, owner));
}
deposit(amount: number) {
if (!this.opened) throw new Error('Аккаунт не открыт');
if (amount <= 0) throw new Error('Сумма должна быть положительной');
this.apply(new MoneyDepositedEvent(this.id, amount));
}
// обработчики событий — меняют состояние, не порождают сайд-эффектов
onAccountOpenedEvent(_: AccountOpenedEvent) {
this.opened = true;
}
onMoneyDepositedEvent(event: MoneyDepositedEvent) {
this.balance += event.amount;
}
}
Ключ в том, что apply() и сохраняет событие в очередь uncommitted, и вызывает метод on<EventName>. Это позволяет восстанавливать состояние при загрузке из event store.
Event Store и репозиторий
Репозиторий отвечает за загрузку агрегата по истории событий и сохранение новых событий.
// account.repository.ts
import { Injectable } from '@nestjs/common';
import { EventBus, EventPublisher } from '@nestjs/cqrs';
import { AccountAggregate } from './account.aggregate';
import { EventStore } from './event-store';
@Injectable()
export class AccountRepository {
constructor(
private readonly store: EventStore,
private readonly publisher: EventPublisher,
private readonly eventBus: EventBus,
) {}
async load(id: string): Promise<AccountAggregate> {
const history = await this.store.readStream(`account-${id}`);
const aggregate = this.publisher.mergeObjectContext(new AccountAggregate(id));
aggregate.loadFromHistory(history);
return aggregate;
}
async save(aggregate: AccountAggregate, expectedVersion: number): Promise<void> {
const events = aggregate.getUncommittedEvents();
await this.store.appendToStream(`account-${aggregate['id']}`, events, expectedVersion);
aggregate.commit();
}
}
EventStore — это абстракция над реальным хранилищем. В проде это может быть EventStoreDB, Postgres с таблицей events или Kafka-лог. Минимально нужно две операции: appendToStream с проверкой версии и readStream.
Команды и проекции
Командный обработчик загружает агрегат, выполняет операцию и сохраняет события.
// commands/deposit.handler.ts
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { AccountRepository } from '../account.repository';
export class DepositCommand {
constructor(
public readonly accountId: string,
public readonly amount: number,
public readonly expectedVersion: number,
) {}
}
@CommandHandler(DepositCommand)
export class DepositHandler implements ICommandHandler<DepositCommand> {
constructor(private readonly repo: AccountRepository) {}
async execute(command: DepositCommand) {
const account = await this.repo.load(command.accountId);
account.deposit(command.amount);
await this.repo.save(account, command.expectedVersion);
}
}
Проекция строит read-модель из потока событий. Она подписывается на шину и обновляет таблицу, оптимизированную под чтение.
// projections/balance.projection.ts
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { MoneyDepositedEvent } from '../events/account.events';
import { BalanceReadModel } from './balance.read-model';
@EventsHandler(MoneyDepositedEvent)
export class BalanceProjection implements IEventHandler<MoneyDepositedEvent> {
constructor(private readonly readModel: BalanceReadModel) {}
async handle(event: MoneyDepositedEvent) {
await this.readModel.increment(event.accountId, event.amount);
}
}
Снапшоты
Если по агрегату накопились тысячи событий, восстановление каждый раз станет дорогим. Решение — снапшоты: периодически сохранять полное состояние и применять только события после него.
async load(id: string): Promise<AccountAggregate> {
const snapshot = await this.store.readSnapshot(`account-${id}`);
const aggregate = this.publisher.mergeObjectContext(new AccountAggregate(id));
if (snapshot) {
aggregate.restoreFromSnapshot(snapshot.state);
}
const history = await this.store.readStream(`account-${id}`, snapshot?.version ?? 0);
aggregate.loadFromHistory(history);
return aggregate;
}
Практическое правило — снапшот каждые 50–100 событий.
Частые ошибки
- Сайд-эффекты в обработчиках событий внутри агрегата. Методы
on<Event>должны быть чистыми: они вызываются и при восстановлении состояния. HTTP-запрос или отправка письма приведут к дублированию. - Изменение схемы события без версионирования. Старые события в стриме никуда не денутся. Используйте поле
versionи апкастеры для миграции на лету. - Игнорирование оптимистичной блокировки. Без проверки
expectedVersionвappendToStreamдве параллельные команды затрут друг друга. Это сердце консистентности. - Огромные агрегаты. Если в одном стриме десятки тысяч событий и нет снапшотов, загрузка превратится в минуты. Делите домен на более мелкие границы консистентности.
- Синхронные проекции в одной транзакции с записью события. Это убивает основное преимущество — отвязку записи от чтения. Проекции должны быть асинхронными и идемпотентными.
- Использование Event Sourcing там, где он не нужен. Простой CRUD-сервис без аудита и сложной бизнес-логики ничего не выиграет, а сложность вырастет на порядок.
Заключение
Event Sourcing на NestJS строится из четырёх блоков: агрегат с AggregateRoot, event store с оптимистичной блокировкой, репозиторий, восстанавливающий состояние из истории, и проекции, поднимающие read-модели через EventBus. Модуль @nestjs/cqrs закрывает большую часть инфраструктуры, остаётся правильно спроектировать границы агрегатов, версионирование событий и снапшоты. Применяйте паттерн осознанно — там, где история изменений и сложная доменная логика дают реальную ценность.






Комментарии
0