私がやりたいことの網羅的な指示書にしましょう
# Option Volume Imbalance (OVI) 戦略 完全実装指示書
## プロジェクト概要
**目的**: "Option Volume Imbalance as a predictor for equity market returns"論文[1]の3つの主要発見を再現実装し、市場予測戦略を構築する
**主要発見**:
1. **高IV契約**: 予測力が最も強い[1]
2. **プットオプション**: コールオプションより情報価値が高い[1]
3. **Market Makerボリューム**: 最強のシグナル[1]
## Phase 1: 環境構築
### **必要ライブラリのインストール**
```bash
pip install pandas numpy scipy matplotlib seaborn
pip install yfinance pandas-datareader
pip install scikit-learn statsmodels
pip install jupyter notebook
```
### **フォルダ構造作成**
```
ovi_project/
├── data/
│ ├── raw/
│ ├── processed/
│ └── historical/
├── src/
│ ├── data_collection.py
│ ├── black_scholes.py
│ ├── ovi_calculator.py
│ ├── backtester.py
│ └── analysis.py
├── notebooks/
│ └── analysis.ipynb
├── results/
│ ├── plots/
│ └── reports/
└── config/
└── settings.py
```
## Phase 2: データ収集システム
### **2.1 Yahoo Financeデータ取得**
```python
# src/data_collection.py
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
class OptionDataCollector:
def __init__(self, tickers=['SPY', 'QQQ', 'IWM']):
self.tickers = tickers
self.data_path = '../data/raw/'
def collect_daily_options(self, ticker):
"""日次オプションデータ収集"""
stock = yf.Ticker(ticker)
try:
# 基本株価情報
info = stock.info
current_price = info.get('regularMarketPrice', stock.history(period='1d')['Close'].iloc[-1])
# オプションチェーン取得
expiration_dates = stock.options[:4] # 最初の4つの満期日
all_options =
for exp_date in expiration_dates:
option_chain = stock.option_chain(exp_date)
# コールオプション
calls = option_chain.calls.copy()
calls['option_type'] = 'call'
calls['expiration'] = exp_date
calls['underlying_price'] = current_price
calls['collection_date'] = pd.Timestamp.now()
calls['ticker'] = ticker
# プットオプション
puts = option_chain.puts.copy()
puts['option_type'] = 'put'
puts['expiration'] = exp_date
puts['underlying_price'] = current_price
puts['collection_date'] = pd.Timestamp.now()
puts['ticker'] = ticker
all_options.extend([calls, puts])
return pd.concat(all_options, ignore_index=True)
except Exception as e:
print(f"Error collecting data for {ticker}: {e}")
return None
def daily_collection_routine(self):
"""日次データ収集ルーチン"""
today = pd.Timestamp.now().strftime('%Y%m%d')
for ticker in self.tickers:
print(f"Collecting data for {ticker}...")
option_data = self.collect_daily_options(ticker)
if option_data is not None:
filename = f"{self.data_path}{ticker}_options_{today}.csv"
option_data.to_csv(filename, index=False)
print(f"Saved {len(option_data)} records to {filename}")
# Rate limiting
time.sleep(2)
```
### **2.2 履歴データ構築**
```python
def build_historical_database(days_back=30):
"""過去データを遡って収集"""
collector = OptionDataCollector()
historical_data =
for i in range(days_back):
print(f"Collecting historical data: Day {i+1}/{days_back}")
for ticker in collector.tickers:
try:
daily_data = collector.collect_daily_options(ticker)
if daily_data is not None:
historical_data.append(daily_data)
except:
continue
time.sleep(1) # Rate limiting
if historical_data:
combined_data = pd.concat(historical_data, ignore_index=True)
combined_data.to_csv('../data/historical/combined_options_data.csv', index=False)
print(f"Historical database built with {len(combined_data)} records")
```
## Phase 3: Black-Scholes実装
### **3.1 価格計算エンジン**
```python
# src/black_scholes.py
import numpy as np
from scipy.stats import norm
from scipy.optimize import minimize_scalar
class BlackScholesEngine:
@staticmethod
def calculate_price(S, K, T, r, sigma, option_type='call'):
"""Black-Scholes価格計算"""
if T <= 0:
if option_type == 'call':
return max(S - K, 0)
else:
return max(K - S, 0)
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
if option_type == 'call':
price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
else: # put
price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
return price
@staticmethod
def calculate_greeks(S, K, T, r, sigma, option_type='call'):
"""グリーク計算"""
if T <= 0:
return {'delta': 0, 'gamma': 0, 'theta': 0, 'vega': 0}
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
# Delta
if option_type == 'call':
delta = norm.cdf(d1)
else:
delta = norm.cdf(d1) - 1
# Gamma
gamma = norm.pdf(d1) / (S * sigma * np.sqrt(T))
# Theta (per day)
if option_type == 'call':
theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T))
- r * K * np.exp(-r * T) * norm.cdf(d2)) / 365
else:
theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T))
+ r * K * np.exp(-r * T) * norm.cdf(-d2)) / 365
# Vega (per 1% volatility change)
vega = S * norm.pdf(d1) * np.sqrt(T) / 100
return {'delta': delta, 'gamma': gamma, 'theta': theta, 'vega': vega}
@staticmethod
def implied_volatility(market_price, S, K, T, r, option_type='call'):
"""インプライドボラティリティ計算"""
def objective(sigma):
model_price = BlackScholesEngine.calculate_price(S, K, T, r, sigma, option_type)
return (model_price - market_price)**2
try:
result = minimize_scalar(objective, bounds=(0.001, 5.0), method='bounded')
return result.x if result.success else np.nan
except:
return np.nan
```
### **3.2 データ拡張処理**
```python
def enhance_option_data(option_df, risk_free_rate=0.05):
"""オプションデータにIVとグリークを追加"""
bs_engine = BlackScholesEngine()
enhanced_data =
for _, row in option_df.iterrows():
try:
# 満期までの時間計算
exp_date = pd.to_datetime(row['expiration'])
collection_date = pd.to_datetime(row['collection_date'])
days_to_exp = (exp_date - collection_date).days
T = max(days_to_exp / 365, 1/365)
# 市場価格
bid = row.get('bid', 0)
ask = row.get('ask', 0)
market_price = (bid + ask) / 2 if (bid > 0 and ask > 0) else row.get('lastPrice', 0)
if market_price <= 0:
continue
iv = bs_engine.implied_volatility(
market_price, row['underlying_price'], row['strike'],
T, risk_free_rate, row['option_type']
)
# グリーク計算
if not np.isnan(iv) and iv > 0:
greeks = bs_engine.calculate_greeks(
row['underlying_price'], row['strike'],
T, risk_free_rate, iv, row['option_type']
)
# 理論価格
theoretical_price = bs_engine.calculate_price(
row['underlying_price'], row['strike'],
T, risk_free_rate, iv, row['option_type']
)
else:
greeks = {'delta': np.nan, 'gamma': np.nan, 'theta': np.nan, 'vega': np.nan}
theoretical_price = np.nan
enhanced_row = {
**row.to_dict(),
'implied_volatility': iv,
'theoretical_price': theoretical_price,
'time_to_expiry': T,
'moneyness': row['strike'] / row['underlying_price'],
**greeks
}
enhanced_data.append(enhanced_row)
except Exception as e:
print(f"Error processing row: {e}")
continue
return pd.DataFrame(enhanced_data)
```
## Phase 4: OVIシグナル計算
### **4.1 核心OVI計算ロジック**
```python
# src/ovi_calculator.py
import pandas as pd
import numpy as np
class OVICalculator:
def __init__(self):
self.high_iv_percentile = 80 # 高IV基準(上位20%)
self.put_weight = 3.0 # プット重み(論文発見に基づく)
self.call_weight = 1.0
def calculate_basic_ovi(self, option_data):
"""基本OVI計算"""
results =
grouped = option_data.groupby(['ticker', 'collection_date', 'expiration'])
for (ticker, date, exp), group in grouped:
calls = group[group['option_type'] == 'call']
puts = group[group['option_type'] == 'put']
call_volume = calls['volume'].sum()
put_volume = puts['volume'].sum()
total_volume = call_volume + put_volume
if total_volume > 0:
basic_ovi = (call_volume - put_volume) / total_volume
else:
basic_ovi = 0
results.append({
'ticker': ticker,
'date': date,
'expiration': exp,
'basic_ovi': basic_ovi,
'total_volume': total_volume
})
return pd.DataFrame(results)
def calculate_high_iv_ovi(self, option_data):
"""発見1: 高IV契約に基づくOVI"""
results =
# 日次でIVパーセンタイル計算
option_data['iv_percentile'] = option_data.groupby(['ticker', 'collection_date'])['implied_volatility'].transform(
lambda x: x.rank(pct=True) * 100
)
# 高IV契約のフィルタリング
high_iv_data = option_data[option_data['iv_percentile'] >= self.high_iv_percentile]
grouped = high_iv_data.groupby(['ticker', 'collection_date', 'expiration'])
for (ticker, date, exp), group in grouped:
if len(group) == 0:
continue
calls = group[group['option_type'] == 'call']
puts = group[group['option_type'] == 'put']
call_volume = calls['volume'].sum()
put_volume = puts['volume'].sum()
total_volume = call_volume + put_volume
if total_volume > 0:
high_iv_ovi = (call_volume - put_volume) / total_volume
confidence_score = min(1.0, total_volume / 1000) # Volume-based confidence
else:
high_iv_ovi = 0
confidence_score = 0
results.append({
'ticker': ticker,
'date': date,
'expiration': exp,
'high_iv_ovi': high_iv_ovi,
'high_iv_volume': total_volume,
'confidence_score': confidence_score
})
return pd.DataFrame(results)
def calculate_put_focused_ovi(self, option_data):
"""発見2: プット重視OVI"""
results =
grouped = option_data.groupby(['ticker', 'collection_date', 'expiration'])
for (ticker, date, exp), group in grouped:
calls = group[group['option_type'] == 'call']
puts = group[group['option_type'] == 'put']
call_volume = calls['volume'].sum()
put_volume = puts['volume'].sum()
# プット重視の加重計算
weighted_bullish = (call_volume * self.call_weight) + (put_volume * self.put_weight * -1)
weighted_bearish = (call_volume * self.call_weight * -1) + (put_volume * self.put_weight)
total_weighted = abs(weighted_bullish) + abs(weighted_bearish)
if total_weighted > 0:
put_focused_ovi = weighted_bullish / total_weighted
else:
put_focused_ovi = 0
# プット情報比率
put_info_ratio = put_volume / (call_volume + put_volume) if (call_volume + put_volume) > 0 else 0
results.append({
'ticker': ticker,
'date': date,
'expiration': exp,
'put_focused_ovi': put_focused_ovi,
'put_info_ratio': put_info_ratio,
'put_dominance': put_info_ratio > 0.6
})
return pd.DataFrame(results)
def calculate_optimized_ovi(self, option_data):
"""統合最適化OVI: 3つの発見を結合"""
# 高IV契約フィルタリング
option_data['iv_percentile'] = option_data.groupby(['ticker', 'collection_date'])['implied_volatility'].transform(
lambda x: x.rank(pct=True) * 100
)
high_iv_data = option_data[option_data['iv_percentile'] >= self.high_iv_percentile]
results =
grouped = high_iv_data.groupby(['ticker', 'collection_date', 'expiration'])
for (ticker, date, exp), group in grouped:
if len(group) == 0:
continue
calls = group[group['option_type'] == 'call']
puts = group[group['option_type'] == 'put']
call_volume = calls['volume'].sum()
put_volume = puts['volume'].sum()
# 最適化計算: 高IV + プット重視
optimized_bullish = call_volume * self.call_weight
optimized_bearish = put_volume * self.put_weight
total_optimized = optimized_bullish + optimized_bearish
if total_optimized > 0:
optimized_ovi = (optimized_bullish - optimized_bearish) / total_optimized
else:
optimized_ovi = 0
# 信頼度スコア
volume_confidence = min(1.0, total_optimized / 5000)
iv_confidence = len(group) / max(1, len(option_data[(option_data['ticker'] == ticker) &
(option_data['collection_date'] == date) &
(option_data['expiration'] == exp)]))
overall_confidence = (volume_confidence + iv_confidence) / 2
results.append({
'ticker': ticker,
'date': date,
'expiration': exp,
'optimized_ovi': optimized_ovi,
'signal_strength': 'strong' if abs(optimized_ovi) > 0.3 else 'moderate' if abs(optimized_ovi) > 0.1 else 'weak',
'confidence_score': overall_confidence,
'volume_used': total_optimized,
'components': {
'high_iv': True,
'put_focused': True,
'market_maker': False # Yahoo Financeデータでは不可
}
})
return pd.DataFrame(results)
```
## Phase 5: バックテスト実装
### **5.1 戦略バックテスター**
```python
# src/backtester.py
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
class OVIBacktester:
def __init__(self, holding_period=5, transaction_cost=0.001):
self.holding_period = holding_period
self.transaction_cost = transaction_cost
def prepare_price_data(self, tickers, start_date, end_date):
"""株価データを準備"""
import yfinance as yf
price_data =
for ticker in tickers:
stock = yf.Ticker(ticker)
hist = stock.history(start=start_date, end=end_date)
hist['ticker'] = ticker
hist['date'] = hist.index
hist.reset_index(drop=True, inplace=True)
price_data.append(hist'ticker', 'date', 'Close')
return pd.concat(price_data, ignore_index=True)
def calculate_forward_returns(self, price_data):
"""将来リターン計算"""
price_data = price_data.sort_values(['ticker', 'date'])
returns =
for ticker in price_data['ticker'].unique():
ticker_data = price_data[price_data['ticker'] == ticker].copy()
ticker_data = ticker_data.sort_values('date')
for i in range(len(ticker_data) - self.holding_period):
current_price = ticker_data.iloc[i]['Close']
future_price = ticker_data.iloc[i + self.holding_period]['Close']
forward_return = (future_price - current_price) / current_price
returns.append({
'ticker': ticker,
'date': ticker_data.iloc[i]['date'],
'current_price': current_price,
'future_price': future_price,
'forward_return': forward_return
})
return pd.DataFrame(returns)
def backtest_ovi_signals(self, ovi_signals, price_returns):
"""OVIシグナルのバックテスト"""
# データのマージ
merged_data = ovi_signals.merge(
price_returns,
left_on=['ticker', 'date'],
right_on=['ticker', 'date'],
how='inner'
)
if len(merged_data) == 0:
print("No matching data found for backtesting")
return None
results =
for _, row in merged_data.iterrows():
ovi_value = row['optimized_ovi']
actual_return = row['forward_return']
# シグナル方向の決定
if ovi_value > 0.1:
signal_direction = 1 # Bullish
elif ovi_value < -0.1:
signal_direction = -1 # Bearish
else:
signal_direction = 0 # Neutral (skip)
if signal_direction != 0:
# 取引コスト考慮
gross_return = signal_direction * actual_return
net_return = gross_return - self.transaction_cost
# 予測正確性
correct_prediction = (signal_direction * actual_return) > 0
results.append({
'ticker': row['ticker'],
'date': row['date'],
'ovi_signal': ovi_value,
'predicted_direction': signal_direction,
'actual_return': actual_return,
'gross_return': gross_return,
'net_return': net_return,
'correct_prediction': correct_prediction,
'confidence_score': row.get('confidence_score', 0)
})
return pd.DataFrame(results)
def calculate_performance_metrics(self, backtest_results):
"""パフォーマンス指標計算"""
if len(backtest_results) == 0:
return {}
returns = backtest_results['net_return']
# 基本統計
total_trades = len(returns)
accuracy = backtest_results['correct_prediction'].mean()
avg_return = returns.mean()
std_return = returns.std()
# リスク調整リターン