↓ かたい きじ だけど この アプリ を つくった おはなし だよ ↓
ひらいて みてね!!
1. はじめに
本記事ではイベントソーシングを用いたクライアントサイドでのデータ管理手法を提案します。
本提案は、以下の要求に対応することを目的としています:
- データベースサーバーの維持コストを削減し、初期段階ではアプリケーション単体でデータを保持すること
- 将来的なサーバーとの同期ニーズに柔軟に対応できる設計であること
- データ集計要件の変化に柔軟に対応できる設計であること
- Reactで実装されたクライアントアプリケーションから簡単に使用可能であること
本稿では提案する実装が上記の要求を満たし、最終的に次のような形でReactで作成されたクライアントアプリケーションから使用可能なことを示します。
イベントを記録する
const { addEvent } = useListenEvent(); addEvent({ itemId });
集計されたデータを使用する
const { totals } = useListenEvent(); const total = totals[itemId];
2. 使用する技術
本提案の実現のためにイベントソーシングおよびCQRSを採用しました。
イベントソーシングはアプリケーションの状態変更をイベントとして記録し、これらのイベントを再生することで現在の状態を再構築する手法です。この手法には以下の利点があります:
- 完全な監査履歴の維持
- 状態の任意の時点への巻き戻しが可能
- イベントを基にした柔軟な分析や派生モデルの構築
CQRSはデータの更新(コマンド)と読み取り(クエリ)の責務を分離するアーキテクチャパターンです。このパターンには以下のような利点があります:
3. システム設計
提案するシステムは、以下のコンポーネントで構成されています:
- EventStore: イベントの保存と取得を担当
- EventAggregator: イベントの集計と状態の管理を担当
- React Hooks: UIとデータ層の連携を担当
これらのコンポーネントを組み合わせることで、クライアントサイドでイベントソーシングとCQRSを実現します。
4. 実装の詳細
4.1 データ構造
基本となるデータ構造は以下の通りです:
\\ 集計可能なイベントの基本となる型 interface Aggregatable {} \\ ローカルデータベースに記録されたデータの型 interface StoredEvent<V extends Aggregatable> { localId: number; // Auto-increment primary key globalId: string; // UUIDv7 for global primary key value: V; } \\ 集計されたデータの基本となる型 interface AggregatedValue<T> { aggregationKey: T; }
Aggregatableとしてイベントを定義し、ローカルデータベースに保存された段階でStoredEventとなり、その後の集計処理を終えてAggregatedValueとなります。
Aggregatable、AggregatedValue には継承するinterfaceで自由にプロパティを設定できます。
StoredEventはローカルで自動採番されるlocalIdのほか、UUIDv7形式のglobalIdを持ち、これらの値から将来サーバーへデータが送信され処理される場合もある程度の正確性で時系列順で処理が行われることを担保しています。
これらのインターフェースをもとに、具体的なイベントと集計値を定義します。
4.2 EventStore
EventStoreクラスは、IndexedDBを使用してイベントを保存し、効率的に取得する機能を提供します。主な機能は以下の通りです:
- イベントの追加
- 指定したIDの前後のイベントの取得
- イベントの購読機能
import { Aggregatable, StoredEvent } from "./types"; import { v7 as uuidv7 } from 'uuid'; export interface GetEventsOptions { limit?: number; } const DatabaseVersion = 1; const DataStoreName = 'Events'; export class EventStore<V extends Aggregatable> { private db: IDBDatabase | null = null; private listeners: Set<(msg: StoredEvent<V>) => Promise<void>> = new Set(); constructor(public databaseName: string) {} async initialize(): Promise<void> { await this.initializeDatabase(); } async add(value: V): Promise<void> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readwrite'); const store = trx.objectStore(DataStoreName); const globalId = uuidv7(); const request = store.add({ globalId, value }); request.onerror = () => reject(new Error(`Add error: ${request.error?.message || 'Unknown error'}`)); request.onsuccess = () => { const localId = request.result as number; const storedEvent: StoredEvent<V> = { localId, globalId, value }; this.broadcastAddEvent(storedEvent); resolve(); }; trx.onerror = () => reject(new Error(`Transaction error: ${trx.error?.message || 'Unknown error'}`)); trx.onabort = () => reject(new Error('Transaction aborted')); }); } async getEventsAfter(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'next', IDBKeyRange.lowerBound(localId, true), options); } async getEventsBefore(localId: number, options?: GetEventsOptions): Promise<StoredEvent<V>[]> { return this.getEvents( 'prev', IDBKeyRange.upperBound(localId, true), options); } private async getEvents( direction: IDBCursorDirection, range: IDBKeyRange, options?: GetEventsOptions ): Promise<StoredEvent<V>[]> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([DataStoreName], 'readonly'); const store = trx.objectStore(DataStoreName); const results: StoredEvent<V>[] = []; const request = store.openCursor(range, direction); request.onerror = () => reject(request.error); request.onsuccess = (event) => { const cursor = (event.target as IDBRequest<IDBCursorWithValue>).result; if (cursor) { const storedEvent: StoredEvent<V> = { localId: cursor.key as number, globalId: cursor.value.globalId, value: cursor.value.value }; results.push(storedEvent); if (!options?.limit || results.length < options.limit) { cursor.continue(); } else { resolve(results); } } else { resolve(results); } }; }); } private async initializeDatabase(): Promise<void> { ... snip ... } async hasAnyRecord(): Promise<boolean> { ... snip ... } private broadcastAddEvent(event: StoredEvent<V>) { ... snip ... } subscribe(listener: (msg: StoredEvent<V>) => Promise<void>): () => void { ... snip... } dispose() { ...snip... } }
4.3 EventAggregator
EventAggregatorクラスは、イベントを集計し、現在の状態を管理します。 抽象クラスであり、継承する具象クラスでabstractとなっている関数を実装する必要があります。 主な機能は以下の通りです:
- 新しいイベントの処理
- バッチ処理によるイベントの集計
- 処理済み範囲の管理
- 集計結果の提供
import {AggregatedValue, Aggregatable, StoredEvent} from "./types"; import { EventStore, GetEventsOptions } from "./eventStore"; interface ProcessedRange { start: number; end: number; } const MetadataStoreName = 'Metadata'; const ProcessedRangesKey = 'ProcessedRanges'; export type AggregatorChangeListener<V> = (changedItem: V) => void; export abstract class EventAggregator<V extends Aggregatable, A extends AggregatedValue<string>> { protected db: IDBDatabase | null = null; private processedRanges: ProcessedRange[] = []; private processingIntervalId: number | null = null; private listeners: Set<AggregatorChangeListener<A>> = new Set(); constructor( protected eventStore: EventStore<V>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) {} async initialize(): Promise<void> { ...snip... } // サブクラスで実装する protected abstract applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void; protected abstract processEvent(trx: IDBTransaction, event: StoredEvent<V>): Promise<A>; private applyMetadataMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { const store = db.createObjectStore(MetadataStoreName, {keyPath: 'key'}); } } private async handleNewEvent(ev: StoredEvent<V>): Promise<void> { if (!this.db) throw new Error('Database not initialized'); const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { const changedItem = await this.processEvent(tx, ev); this.updateProcessedRanges(ev.localId, ev.localId); await this.saveProcessedRanges(tx); if (changedItem) { this.notifyListeners([changedItem]); } return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing new event:", err); tx.abort(); } } private async processEvents(): Promise<void> { if (!this.db) throw new Error('Database not initialized'); if (this.isFullyCovered()) { this.stopProcessing(); return; } const range = await this.findRangeToProcess(); if (!range) { return; } const options: GetEventsOptions = { limit: Math.min(this.batchSize, range.end - range.start - 1) }; const eventsBefore = await this.eventStore.getEventsBefore(range.end, options); if (eventsBefore.length === 0) { return; } const maxId = eventsBefore[0].localId; const minId = eventsBefore[eventsBefore.length-1].localId; const changedItems: A[] = []; const tx = this.db.transaction(this.db.objectStoreNames, 'readwrite'); try { for (const ev of eventsBefore) { const changedItem = await this.processEvent(tx, ev); if (changedItem) { changedItems.push(changedItem); } } if (eventsBefore.length < this.batchSize) { this.updateProcessedRanges(1, maxId); } else { this.updateProcessedRanges(minId, maxId); } await this.saveProcessedRanges(tx); return new Promise<void>((resolve, reject) => { tx.oncomplete = () => resolve(); tx.onerror = () => reject(tx.error); }); } catch (err) { console.error("Error processing events:", err); tx.abort(); } this.notifyListeners(changedItems); } private async findRangeToProcess(): Promise<ProcessedRange | null> { const size = this.processedRanges.length; if (size === 0) { return { start: 0, end: Number.MAX_SAFE_INTEGER } } const rangeEnd = this.processedRanges[size-1].start; if (rangeEnd === 1) { return null; } if (rangeEnd === 0) { throw new Error('Unexpected value'); } if (1 < size) { const rangeStart = this.processedRanges[size-2].end; return { start: rangeStart, end: rangeEnd }; } // size === 1 return { start: 0, end: rangeEnd }; } private isFullyCovered(): boolean { if (this.processedRanges.length === 0) { this.eventStore.hasAnyRecord().then((hasAnyRecord) => { return !hasAnyRecord; }); return false; } return this.processedRanges.length === 1 && this.processedRanges[0].start === 0; } private updateProcessedRanges(start: number, end: number): void { const newRange: ProcessedRange = { start, end }; const allRanges = [...this.processedRanges, newRange]; allRanges.sort((a, b) => a.start - b.start); const mergedRanges: ProcessedRange[] = []; let currentRange = allRanges[0]; for (let i = 1; i < allRanges.length; i++) { const nextRange = allRanges[i]; if (currentRange.end + 1 >= nextRange.start) { currentRange.end = Math.max(currentRange.end, nextRange.end); } else { mergedRanges.push(currentRange); currentRange = nextRange; } } mergedRanges.push(currentRange); this.processedRanges = mergedRanges; } private async loadProcessedRanges(): Promise<void> { ...snip... } private async saveProcessedRanges(tx: IDBTransaction): Promise<void> { ...snip... } startProcessing(): void { ...snip... } stopProcessing(): void { ...snip... } private notifyListeners(changes: A[]): void { ...snip... } subscribe(listener: AggregatorChangeListener<A>): () => void { ...snip... } private async initializeDatabase(): Promise<void> { ...snip... } dispose() { ... snip... } }
5. 具体的なユースケース
本システムの有効性を示すために音楽再生アプリケーションでの使用例を提示します。
5.1 音楽再生イベントの記録と集計
MusicListenEventStore は音楽が再生されたというイベントを保存します。 MusicListenEventAggregator は楽曲ごとの再生回数の総数を集計します。
MusicListenEventStore および MusicListenEventAggregator クラス
import {Aggregatable, AggregatedValue, StoredEvent} from "../../types"; import {EventStore} from "../../eventStore"; import {EventAggregator} from "../../eventAggregator"; const TotalStoreName = 'Total'; export interface MusicListenEvent extends Aggregatable { itemId: string; } export interface MusicListenAggregationValue extends AggregatedValue<string> { total: number; } export class MusicListenEventStore extends EventStore<MusicListenEvent> {} export class MusicListenEventAggregator extends EventAggregator<MusicListenEvent, MusicListenAggregationValue> { constructor( protected eventStore: EventStore<MusicListenEvent>, protected databaseName: string, protected databaseVersion: number = 1, protected batchSize: number = 100, protected processingInterval: number = 1000, ) { ...snip... } protected applyMigrations(db: IDBDatabase, oldVersion: number, newVersion: number): void { if (oldVersion < 1) { db.createObjectStore(TotalStoreName, {keyPath: 'aggregationKey'}); } } protected async processEvent(trx: IDBTransaction, event: StoredEvent<MusicListenEvent>): Promise<MusicListenAggregationValue> { return new Promise((resolve, reject) => { const store = trx.objectStore(TotalStoreName); const aggregationKey = event.value.itemId; const getReq = store.get(aggregationKey); getReq.onerror = (error) => reject(getReq.error); getReq.onsuccess = () => { const data = getReq.result as MusicListenAggregationValue | undefined; const total = (data?.total ?? 0) + 1; const updated: MusicListenAggregationValue = { aggregationKey: aggregationKey, total }; const putReq = store.put(updated); putReq.onerror = () => reject(putReq.error); putReq.onsuccess = () => resolve(updated); }; }); } async getAggregatedTotal(itemIds: string[]): Promise<{ [key: string]: number }> { if (!this.db) throw new Error('Database not initialized'); return new Promise((resolve, reject) => { const trx = this.db!.transaction([TotalStoreName], 'readonly'); const store = trx.objectStore(TotalStoreName); const getItemData = (itemId: string): Promise<[string, number]> => new Promise((resolveItem, rejectItem) => { const request = store.get(itemId); request.onerror = () => rejectItem(new Error(`Error fetching data for item ${itemId}: ${request.error}`)); request.onsuccess = () => { const data = request.result as MusicListenAggregationValue; resolveItem([itemId, data ? data.total : 0]); }; }); Promise.all(itemIds.map(getItemData)) .then(entries => entries.reduce((acc, [key, value]) => ({ ...acc, [key]: value }), {}) ) .then(resolve) .catch(reject); trx.onerror = () => { reject(new Error(`Transaction error: ${trx.error}`)); }; }); } }
リリース後に集計ロジックの変更や集計対象の追加が必要になった場合は、新たにAggregatorを作成することができます。
例えば新しく「最初に再生された時刻」を集計対象に追加した MusicListenEventAggregatorV2 を作成した場合を考えます。
その場合、MusicListenEventAggregatorV2 のコンストラクタに MusicListenEventAggregator と同様に MusicListenEventStore のインスタンスを指定すれば、MusicListenEventAggregatorV2 も MusicListenEventStore に蓄積された全てのイベントを集計の対象とすることができます。
useListenEvent フック
次のようなReact カスタムフックを使用することで、Reactコンポーネントから容易に上記クラスを使用したイベントの追加と集計結果の取得が可能になります。
'use client'; import React, {createContext, useContext, useState, useEffect, ReactNode, useRef, useMemo} from 'react'; import {MusicListenAggregationValue, MusicListenEvent, MusicListenEventAggregator} from './listenEvent'; import {EventStore} from "../../eventStore"; type ListenEventContextType = { addEvent: (event: MusicListenEvent) => Promise<void>; totals: { [key: string]: number }; isInitializing: boolean; isSyncing: boolean; error: Error | null; }; export const ListenEventContext = createContext<ListenEventContextType | undefined>(undefined); export type ListenEventContextProps = { keys: string[]; children: ReactNode; }; export const ListenEventProvider: React.FC<ListenEventContextProps> = ({ keys, children }) => { const [totals, setTotals] = useState<{ [key: string]: number }>({}); const [isInitializing, setIsInitializing] = useState(true); const [isSyncing, setIsSyncing] = useState(false); const [error, setError] = useState<Error | null>(null); const { eventStore, aggregator } = useMemo(() => { const eventStore = new EventStore<MusicListenEvent>('MusicListenEvents'); const aggregator = new MusicListenEventAggregator(eventStore, 'MusicListenAggregator_V2'); (async() => { await eventStore.initialize(); await aggregator.initialize(); aggregator.startProcessing(); })().then(() => { setIsInitializing(false); setIsSyncing(true); }).catch((err) => { setError(new Error(`Failed to initialize EventStore/Aggregator: ${err}`)); setIsInitializing(false); setIsSyncing(false); }); return { eventStore, aggregator }; }, []); useEffect(() => { const handleUpdate = (updated: MusicListenAggregationValue) => { setTotals(prevTotals => ({ ...prevTotals, [updated.aggregationKey]: updated.total })); }; const unsubscribe = aggregator.subscribe(handleUpdate); return () => { if (typeof unsubscribe === 'function') { unsubscribe(); } }; }, [eventStore, aggregator]); useEffect(() => { if (isInitializing || error != null) return; const fetchTotals = async () => { try { const result = await aggregator.getAggregatedTotal(keys); if (result != null) { setTotals(result); } } catch (err) { setError(err instanceof Error ? err : new Error('Failed to fetch totals')); } setIsSyncing(false); }; fetchTotals(); }, [keys, isInitializing]); return ( <ListenEventContext.Provider value={{ addEvent: eventStore.add.bind(eventStore), totals, isInitializing, isSyncing, error }}> {children} </ListenEventContext.Provider> ); }; export const useListenEvent = () => { const context = useContext(ListenEventContext); if (context === undefined) { throw new Error('useListenTotal must be used within a ListenEventContext'); } return context; };
使用方法は冒頭で示した通り次のような形です。
イベントを記録する
const { addEvent } = useListenEvent(); addEvent({ itemId });
集計されたデータを使用する
const { totals } = useListenEvent(); const total = totals[itemId];
ただしReact.Contextを使用しているため、上記の処理は次のListenEventProviderで囲まれた中で記述する必要があります。
<ListenEventProvider keys={listenKeys}> //この中で実行する必要がある </ListenEventProvider>
6. 課題
本提案による実装がサーバーサイドと効果的に統合するためには次のような課題があります。
- クライアントで記録されたイベントをサーバーへアップロードする仕組みの実装
- サーバー上でのイベントデータの効率的な処理と集計
- クライアント側の集計結果とサーバー側の集計結果の整合性確保
7. おわりに
本稿ではクライアントサイドでのイベントソーシングとCQRSの実装方法を提案しました。
本提案により、初期段階でのデータベースサーバー維持コストの削減と、将来的な拡張性の確保を両立することが可能になります。
本提案がモバイルアプリケーションやSPAの開発者にとって効率的なデータ管理手法の一助となることを期待しています。