以下の内容はhttps://kafkafinancialgroup.hatenablog.com/entry/2025/08/22/121758より取得しました。


ボラティリティ不均衡

私がやりたいことの網羅的な指示書にしましょう

 

# Option Volume Imbalance (OVI) 戦略 完全実装指示書

 

## プロジェクト概要

 

**目的**: "Option Volume Imbalance as a predictor for equity market returns"論文[1]の3つの主要発見を再現実装し、市場予測戦略を構築する

 

**主要発見**:

1. **高IV契約**: 予測力が最も強い[1]

2. **プットオプション**: コールオプションより情報価値が高い[1]

3. **Market Makerボリューム**: 最強のシグナル[1]

 

## Phase 1: 環境構築

 

### **必要ライブラリのインストール**

```bash

pip install pandas numpy scipy matplotlib seaborn

pip install yfinance pandas-datareader

pip install scikit-learn statsmodels

pip install jupyter notebook

```

 

### **フォルダ構造作成**

```

ovi_project/

├── data/

│ ├── raw/

│ ├── processed/

│ └── historical/

├── src/

│ ├── data_collection.py

│ ├── black_scholes.py

│ ├── ovi_calculator.py

│ ├── backtester.py

│ └── analysis.py

├── notebooks/

│ └── analysis.ipynb

├── results/

│ ├── plots/

│ └── reports/

└── config/

    └── settings.py

```

 

## Phase 2: データ収集システム

 

### **2.1 Yahoo Financeデータ取得**

```python

# src/data_collection.py

import yfinance as yf

import pandas as pd

import numpy as np

from datetime import datetime, timedelta

import time

 

class OptionDataCollector:

    def __init__(self, tickers=['SPY', 'QQQ', 'IWM']):

        self.tickers = tickers

        self.data_path = '../data/raw/'

        

    def collect_daily_options(self, ticker):

        """日次オプションデータ収集"""

        stock = yf.Ticker(ticker)

        

        try:

            # 基本株価情報

            info = stock.info

            current_price = info.get('regularMarketPrice', stock.history(period='1d')['Close'].iloc[-1])

            

            # オプションチェーン取得

            expiration_dates = stock.options[:4] # 最初の4つの満期日

            

            all_options =

            

            for exp_date in expiration_dates:

                option_chain = stock.option_chain(exp_date)

                

                # コールオプション

                calls = option_chain.calls.copy()

                calls['option_type'] = 'call'

                calls['expiration'] = exp_date

                calls['underlying_price'] = current_price

                calls['collection_date'] = pd.Timestamp.now()

                calls['ticker'] = ticker

                

                # プットオプション

                puts = option_chain.puts.copy()

                puts['option_type'] = 'put'

                puts['expiration'] = exp_date  

                puts['underlying_price'] = current_price

                puts['collection_date'] = pd.Timestamp.now()

                puts['ticker'] = ticker

                

                all_options.extend([calls, puts])

            

            return pd.concat(all_options, ignore_index=True)

            

        except Exception as e:

            print(f"Error collecting data for {ticker}: {e}")

            return None

    

    def daily_collection_routine(self):

        """日次データ収集ルーチン"""

        today = pd.Timestamp.now().strftime('%Y%m%d')

        

        for ticker in self.tickers:

            print(f"Collecting data for {ticker}...")

            

            option_data = self.collect_daily_options(ticker)

            

            if option_data is not None:

                filename = f"{self.data_path}{ticker}_options_{today}.csv"

                option_data.to_csv(filename, index=False)

                print(f"Saved {len(option_data)} records to {filename}")

                

                # Rate limiting

                time.sleep(2)

```

 

### **2.2 履歴データ構築**

```python

def build_historical_database(days_back=30):

    """過去データを遡って収集"""

    collector = OptionDataCollector()

    

    historical_data =

    

    for i in range(days_back):

        print(f"Collecting historical data: Day {i+1}/{days_back}")

        

        for ticker in collector.tickers:

            try:

                daily_data = collector.collect_daily_options(ticker)

                if daily_data is not None:

                    historical_data.append(daily_data)

            except:

                continue

        

        time.sleep(1) # Rate limiting

    

    if historical_data:

        combined_data = pd.concat(historical_data, ignore_index=True)

        combined_data.to_csv('../data/historical/combined_options_data.csv', index=False)

        print(f"Historical database built with {len(combined_data)} records")

```

 

## Phase 3: Black-Scholes実装

 

### **3.1 価格計算エンジン**

```python

# src/black_scholes.py

import numpy as np

from scipy.stats import norm

from scipy.optimize import minimize_scalar

 

class BlackScholesEngine:

    @staticmethod

    def calculate_price(S, K, T, r, sigma, option_type='call'):

        """Black-Scholes価格計算"""

        if T <= 0:

            if option_type == 'call':

                return max(S - K, 0)

            else:

                return max(K - S, 0)

        

        d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))

        d2 = d1 - sigma * np.sqrt(T)

        

        if option_type == 'call':

            price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

        else: # put

            price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)

        

        return price

    

    @staticmethod

    def calculate_greeks(S, K, T, r, sigma, option_type='call'):

        """グリーク計算"""

        if T <= 0:

            return {'delta': 0, 'gamma': 0, 'theta': 0, 'vega': 0}

            

        d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))

        d2 = d1 - sigma * np.sqrt(T)

        

        # Delta

        if option_type == 'call':

            delta = norm.cdf(d1)

        else:

            delta = norm.cdf(d1) - 1

        

        # Gamma

        gamma = norm.pdf(d1) / (S * sigma * np.sqrt(T))

        

        # Theta (per day)

        if option_type == 'call':

            theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T)) 

                    - r * K * np.exp(-r * T) * norm.cdf(d2)) / 365

        else:

            theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T)) 

                    + r * K * np.exp(-r * T) * norm.cdf(-d2)) / 365

        

        # Vega (per 1% volatility change)

        vega = S * norm.pdf(d1) * np.sqrt(T) / 100

        

        return {'delta': delta, 'gamma': gamma, 'theta': theta, 'vega': vega}

    

    @staticmethod

    def implied_volatility(market_price, S, K, T, r, option_type='call'):

        """インプライドボラティリティ計算"""

        def objective(sigma):

            model_price = BlackScholesEngine.calculate_price(S, K, T, r, sigma, option_type)

            return (model_price - market_price)**2

        

        try:

            result = minimize_scalar(objective, bounds=(0.001, 5.0), method='bounded')

            return result.x if result.success else np.nan

        except:

            return np.nan

```

 

### **3.2 データ拡張処理**

```python

def enhance_option_data(option_df, risk_free_rate=0.05):

    """オプションデータにIVとグリークを追加"""

    bs_engine = BlackScholesEngine()

    enhanced_data =

    

    for _, row in option_df.iterrows():

        try:

            # 満期までの時間計算

            exp_date = pd.to_datetime(row['expiration'])

            collection_date = pd.to_datetime(row['collection_date'])

            days_to_exp = (exp_date - collection_date).days

            T = max(days_to_exp / 365, 1/365)

            

            # 市場価格

            bid = row.get('bid', 0)

            ask = row.get('ask', 0)

            market_price = (bid + ask) / 2 if (bid > 0 and ask > 0) else row.get('lastPrice', 0)

            

            if market_price <= 0:

                continue

                

            # インプライドボラティリティ

            iv = bs_engine.implied_volatility(

                market_price, row['underlying_price'], row['strike'],

                T, risk_free_rate, row['option_type']

            )

            

            # グリーク計算

            if not np.isnan(iv) and iv > 0:

                greeks = bs_engine.calculate_greeks(

                    row['underlying_price'], row['strike'],

                    T, risk_free_rate, iv, row['option_type']

                )

                

                # 理論価格

                theoretical_price = bs_engine.calculate_price(

                    row['underlying_price'], row['strike'],

                    T, risk_free_rate, iv, row['option_type']

                )

            else:

                greeks = {'delta': np.nan, 'gamma': np.nan, 'theta': np.nan, 'vega': np.nan}

                theoretical_price = np.nan

            

            enhanced_row = {

                **row.to_dict(),

                'implied_volatility': iv,

                'theoretical_price': theoretical_price,

                'time_to_expiry': T,

                'moneyness': row['strike'] / row['underlying_price'],

                **greeks

            }

            

            enhanced_data.append(enhanced_row)

            

        except Exception as e:

            print(f"Error processing row: {e}")

            continue

    

    return pd.DataFrame(enhanced_data)

```

 

## Phase 4: OVIシグナル計算

 

### **4.1 核心OVI計算ロジック**

```python

# src/ovi_calculator.py

import pandas as pd

import numpy as np

 

class OVICalculator:

    def __init__(self):

        self.high_iv_percentile = 80 # 高IV基準(上位20%)

        self.put_weight = 3.0 # プット重み(論文発見に基づく)

        self.call_weight = 1.0

    

    def calculate_basic_ovi(self, option_data):

        """基本OVI計算"""

        results =

        

        grouped = option_data.groupby(['ticker', 'collection_date', 'expiration'])

        

        for (ticker, date, exp), group in grouped:

            calls = group[group['option_type'] == 'call']

            puts = group[group['option_type'] == 'put']

            

            call_volume = calls['volume'].sum()

            put_volume = puts['volume'].sum()

            

            total_volume = call_volume + put_volume

            

            if total_volume > 0:

                basic_ovi = (call_volume - put_volume) / total_volume

            else:

                basic_ovi = 0

                

            results.append({

                'ticker': ticker,

                'date': date,

                'expiration': exp,

                'basic_ovi': basic_ovi,

                'total_volume': total_volume

            })

        

        return pd.DataFrame(results)

    

    def calculate_high_iv_ovi(self, option_data):

        """発見1: 高IV契約に基づくOVI"""

        results =

        

        # 日次でIVパーセンタイル計算

        option_data['iv_percentile'] = option_data.groupby(['ticker', 'collection_date'])['implied_volatility'].transform(

            lambda x: x.rank(pct=True) * 100

        )

        

        # 高IV契約のフィルタリング

        high_iv_data = option_data[option_data['iv_percentile'] >= self.high_iv_percentile]

        

        grouped = high_iv_data.groupby(['ticker', 'collection_date', 'expiration'])

        

        for (ticker, date, exp), group in grouped:

            if len(group) == 0:

                continue

                

            calls = group[group['option_type'] == 'call']

            puts = group[group['option_type'] == 'put']

            

            call_volume = calls['volume'].sum()

            put_volume = puts['volume'].sum()

            

            total_volume = call_volume + put_volume

            

            if total_volume > 0:

                high_iv_ovi = (call_volume - put_volume) / total_volume

                confidence_score = min(1.0, total_volume / 1000) # Volume-based confidence

            else:

                high_iv_ovi = 0

                confidence_score = 0

            

            results.append({

                'ticker': ticker,

                'date': date,

                'expiration': exp,

                'high_iv_ovi': high_iv_ovi,

                'high_iv_volume': total_volume,

                'confidence_score': confidence_score

            })

        

        return pd.DataFrame(results)

    

    def calculate_put_focused_ovi(self, option_data):

        """発見2: プット重視OVI"""

        results =

        

        grouped = option_data.groupby(['ticker', 'collection_date', 'expiration'])

        

        for (ticker, date, exp), group in grouped:

            calls = group[group['option_type'] == 'call']

            puts = group[group['option_type'] == 'put']

            

            call_volume = calls['volume'].sum()

            put_volume = puts['volume'].sum()

            

            # プット重視の加重計算

            weighted_bullish = (call_volume * self.call_weight) + (put_volume * self.put_weight * -1)

            weighted_bearish = (call_volume * self.call_weight * -1) + (put_volume * self.put_weight)

            

            total_weighted = abs(weighted_bullish) + abs(weighted_bearish)

            

            if total_weighted > 0:

                put_focused_ovi = weighted_bullish / total_weighted

            else:

                put_focused_ovi = 0

            

            # プット情報比率

            put_info_ratio = put_volume / (call_volume + put_volume) if (call_volume + put_volume) > 0 else 0

            

            results.append({

                'ticker': ticker,

                'date': date,

                'expiration': exp,

                'put_focused_ovi': put_focused_ovi,

                'put_info_ratio': put_info_ratio,

                'put_dominance': put_info_ratio > 0.6

            })

        

        return pd.DataFrame(results)

    

    def calculate_optimized_ovi(self, option_data):

        """統合最適化OVI: 3つの発見を結合"""

        # 高IV契約フィルタリング

        option_data['iv_percentile'] = option_data.groupby(['ticker', 'collection_date'])['implied_volatility'].transform(

            lambda x: x.rank(pct=True) * 100

        )

        

        high_iv_data = option_data[option_data['iv_percentile'] >= self.high_iv_percentile]

        

        results =

        

        grouped = high_iv_data.groupby(['ticker', 'collection_date', 'expiration'])

        

        for (ticker, date, exp), group in grouped:

            if len(group) == 0:

                continue

                

            calls = group[group['option_type'] == 'call']

            puts = group[group['option_type'] == 'put']

            

            call_volume = calls['volume'].sum()

            put_volume = puts['volume'].sum()

            

            # 最適化計算: 高IV + プット重視

            optimized_bullish = call_volume * self.call_weight

            optimized_bearish = put_volume * self.put_weight

            

            total_optimized = optimized_bullish + optimized_bearish

            

            if total_optimized > 0:

                optimized_ovi = (optimized_bullish - optimized_bearish) / total_optimized

            else:

                optimized_ovi = 0

            

            # 信頼度スコア

            volume_confidence = min(1.0, total_optimized / 5000)

            iv_confidence = len(group) / max(1, len(option_data[(option_data['ticker'] == ticker) & 

                                                                (option_data['collection_date'] == date) & 

                                                                (option_data['expiration'] == exp)]))

            

            overall_confidence = (volume_confidence + iv_confidence) / 2

            

            results.append({

                'ticker': ticker,

                'date': date,

                'expiration': exp,

                'optimized_ovi': optimized_ovi,

                'signal_strength': 'strong' if abs(optimized_ovi) > 0.3 else 'moderate' if abs(optimized_ovi) > 0.1 else 'weak',

                'confidence_score': overall_confidence,

                'volume_used': total_optimized,

                'components': {

                    'high_iv': True,

                    'put_focused': True,

                    'market_maker': False # Yahoo Financeデータでは不可

                }

            })

        

        return pd.DataFrame(results)

```

 

## Phase 5: バックテスト実装

 

### **5.1 戦略バックテスター**

```python

# src/backtester.py

import pandas as pd

import numpy as np

from sklearn.metrics import classification_report

 

class OVIBacktester:

    def __init__(self, holding_period=5, transaction_cost=0.001):

        self.holding_period = holding_period

        self.transaction_cost = transaction_cost

        

    def prepare_price_data(self, tickers, start_date, end_date):

        """株価データを準備"""

        import yfinance as yf

        

        price_data =

        

        for ticker in tickers:

            stock = yf.Ticker(ticker)

            hist = stock.history(start=start_date, end=end_date)

            hist['ticker'] = ticker

            hist['date'] = hist.index

            hist.reset_index(drop=True, inplace=True)

            price_data.append(hist'ticker', 'date', 'Close')

        

        return pd.concat(price_data, ignore_index=True)

    

    def calculate_forward_returns(self, price_data):

        """将来リターン計算"""

        price_data = price_data.sort_values(['ticker', 'date'])

        

        returns =

        

        for ticker in price_data['ticker'].unique():

            ticker_data = price_data[price_data['ticker'] == ticker].copy()

            ticker_data = ticker_data.sort_values('date')

            

            for i in range(len(ticker_data) - self.holding_period):

                current_price = ticker_data.iloc[i]['Close']

                future_price = ticker_data.iloc[i + self.holding_period]['Close']

                

                forward_return = (future_price - current_price) / current_price

                

                returns.append({

                    'ticker': ticker,

                    'date': ticker_data.iloc[i]['date'],

                    'current_price': current_price,

                    'future_price': future_price,

                    'forward_return': forward_return

                })

        

        return pd.DataFrame(returns)

    

    def backtest_ovi_signals(self, ovi_signals, price_returns):

        """OVIシグナルのバックテスト"""

        # データのマージ

        merged_data = ovi_signals.merge(

            price_returns, 

            left_on=['ticker', 'date'], 

            right_on=['ticker', 'date'],

            how='inner'

        )

        

        if len(merged_data) == 0:

            print("No matching data found for backtesting")

            return None

        

        results =

        

        for _, row in merged_data.iterrows():

            ovi_value = row['optimized_ovi']

            actual_return = row['forward_return']

            

            # シグナル方向の決定

            if ovi_value > 0.1:

                signal_direction = 1 # Bullish

            elif ovi_value < -0.1:

                signal_direction = -1 # Bearish

            else:

                signal_direction = 0 # Neutral (skip)

            

            if signal_direction != 0:

                # 取引コスト考慮

                gross_return = signal_direction * actual_return

                net_return = gross_return - self.transaction_cost

                

                # 予測正確性

                correct_prediction = (signal_direction * actual_return) > 0

                

                results.append({

                    'ticker': row['ticker'],

                    'date': row['date'],

                    'ovi_signal': ovi_value,

                    'predicted_direction': signal_direction,

                    'actual_return': actual_return,

                    'gross_return': gross_return,

                    'net_return': net_return,

                    'correct_prediction': correct_prediction,

                    'confidence_score': row.get('confidence_score', 0)

                })

        

        return pd.DataFrame(results)

    

    def calculate_performance_metrics(self, backtest_results):

        """パフォーマンス指標計算"""

        if len(backtest_results) == 0:

            return {}

        

        returns = backtest_results['net_return']

        

        # 基本統計

        total_trades = len(returns)

        accuracy = backtest_results['correct_prediction'].mean()

        avg_return = returns.mean()

        std_return = returns.std()

        

        # リスク調整リターン

        sharpe_ratio = (avg_return / std_return) * np.sqrt(252 / se




以上の内容はhttps://kafkafinancialgroup.hatenablog.com/entry/2025/08/22/121758より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14