
neurodoc
구독자 69명구독중 35명
훌륭한 의사이면서
좋은 투자자가 되길 꿈꿉니다

경량화 해서 트레이딩뷰에서 돌리려고 숫자 줄여봤던 부분이 남아있어서 full LPPL로 코드 수정해서 올립니다.
이전에 가져가신 분들인 500이랑 20으로 서치하셔서 500 -> 750, 20->5로 바꾸시면 됩니다.
이전 글에 수정해서 코드 올려뒀으니 이전 글 다시 확인하셔도 됩니다
2025-03-03
LPPL 하락 분석 업데이트
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
plt.rcParams['font.family'] ='Malgun Gothic'
plt.rcParams['axes.unicode_minus'] =False
from scipy.optimize import minimize
from scipy.linalg import lstsq
from datetime import datetime, timedelta
import warnings
import os
import time
import random
warnings.filterwarnings('ignore')
class LPPL:
def __init__(self, observations):
"""
LPPL 모델 초기화
observations: DatetimeIndex와 'Adj Close' 열을 포함한 DataFrame.
"""
self.observations = observations.sort_index()
self.t = self.get_days_since_start()
self.price = self.observations['Adj Close'].values
self.log_price = np.log(self.price)
self.ts = self.t / self.t[-1] # [0, 1] 범위로 스케일링
# 파라미터 bounds 설정
self.bounds = {
'm': (0.1, 0.9),
'omega': (2, 25),
'phi': (0, 2 * np.pi)
}
# 유전 알고리즘 설정
self.population_size = 200
self.generations = 100
self.crossover_rate = 0.7
self.mutation_rate = 0.1
self.elitism_rate = 0.1
def get_days_since_start(self):
"""
DatetimeIndex를 시작 날짜로부터 경과한 일수 배열로 변환.
"""
start_date = self.observations.index[0]
return (self.observations.index - start_date).days.values
def lppl_matrix(self, t, tc, m, omega, phi):
"""
주어진 파라미터로 LPPL 행렬 생성.
"""
t = np.array(t)
dt = (tc - t).clip(0.00001) # 0에 가까워지는 수치적 문제 방지
dt_pow_m = dt ** m
oscillatory = dt_pow_m * np.cos(omega * np.log(dt) - phi)
return np.vstack((np.ones_like(t), dt_pow_m, oscillatory)).T
def _func_(self, params, t, log_price):
"""
비선형 최적화를 위한 목적 함수.
"""
m, omega, phi, tc = params
try:
lppl_m = self.lppl_matrix(t, tc, m, omega, phi)
coefs = lstsq(lppl_m, log_price)[0]
return np.sum((log_price - lppl_m @ coefs) ** 2)
except (np.linalg.LinAlgError, ValueError):
return 1e10
def simple_model_fit(self):
"""
단순 모델 피팅(C=0, beta=1)으로 초기 tc 추정.
"""
t_end = self.ts[-1]
def simple_obj(tc_val, t, log_price):
dt = tc_val - t
X = np.vstack((np.ones_like(t), dt)).T
try:
coefs = lstsq(X, log_price)[0]
return np.sum((log_price - X @ coefs) ** 2)
except np.linalg.LinAlgError:
return 1e10
tc_candidates = np.linspace(t_end + 0.01, t_end + 0.1, 100)
errors = [simple_obj(tc_val, self.ts, self.log_price) for tc_val in tc_candidates]
best_tc = tc_candidates[np.argmin(errors)]
dt = best_tc - self.ts
X = np.vstack((np.ones_like(self.ts), dt)).T
try:
A, B = lstsq(X, self.log_price)[0]
except np.linalg.LinAlgError:
A = np.mean(self.log_price)
B = -0.01
return best_tc, float(A), float(B)
def initialize_population(self, tc_seed):
"""
유전 알고리즘 초기 개체군 생성.
"""
population = []
t_end = self.ts[-1]
tc_low, tc_high = t_end + 0.01, t_end + 0.1
initial_individual = np.array([
np.random.uniform(*self.bounds['m']),
np.random.uniform(*self.bounds['omega']),
np.random.uniform(*self.bounds['phi']),
tc_seed
])
fitness = self._func_(initial_individual, self.ts, self.log_price)
population.append((initial_individual, fitness))
for _ in range(self.population_size - 1):
individual = np.array([
np.random.uniform(*self.bounds['m']),
np.random.uniform(*self.bounds['omega']),
np.random.uniform(*self.bounds['phi']),
np.random.uniform(tc_low, tc_high)
])
fitness = self._func_(individual, self.ts, self.log_price)
population.append((individual, fitness))
return population
def tournament_selection(self, population, tournament_size=3):
tournament = random.sample(population, min(tournament_size, len(population)))
return min(tournament, key=lambda x: x[1])
def crossover(self, parent1, parent2):
alpha = np.random.random()
return alpha * parent1 + (1 - alpha) * parent2
def mutate(self, individual):
m_strength = (self.bounds['m'][1] - self.bounds['m'][0]) * 0.1
omega_strength = (self.bounds['omega'][1] - self.bounds['omega'][0]) * 0.1
phi_strength = (self.bounds['phi'][1] - self.bounds['phi'][0]) * 0.1
tc_strength = 0.005
result = individual.copy()
if np.random.random() < 0.3:
result[0] = np.clip(result[0] + np.random.normal(0, m_strength), *self.bounds['m'])
if np.random.random() < 0.3:
result[1] = np.clip(result[1] + np.random.normal(0, omega_strength), *self.bounds['omega'])
if np.random.random() < 0.3:
result[2] = np.clip(result[2] + np.random.normal(0, phi_strength), *self.bounds['phi'])
if np.random.random() < 0.3:
t_end = self.ts[-1]
tc_low, tc_high = t_end + 0.01, t_end + 0.1
result[3] = np.clip(result[3] + np.random.normal(0, tc_strength), tc_low, tc_high)
return result
def genetic_algorithm_fit(self, tc_seed):
"""
유전 알고리즘을 이용한 LPPL 파라미터 최적화.
"""
population = self.initialize_population(tc_seed)
best_params, best_fitness = min(population, key=lambda x: x[1])
for generation in range(self.generations):
population.sort(key=lambda x: x[1])
elite_count = max(1, int(self.population_size * self.elitism_rate))
new_population = population[:elite_count].copy()
while len(new_population) < self.population_size:
if np.random.random() < self.crossover_rate:
parent1 = self.tournament_selection(population)[0]
parent2 = self.tournament_selection(population)[0]
child = self.crossover(parent1, parent2)
if np.random.random() < self.mutation_rate:
child = self.mutate(child)
fitness = self._func_(child, self.ts, self.log_price)
new_population.append((child, fitness))
else:
new_population.append(self.tournament_selection(population))
population = new_population[:self.population_size]
current_best, current_fitness = min(population, key=lambda x: x[1])
if current_fitness < best_fitness:
best_params, best_fitness = current_best, current_fitness
# 개선이 없으면 조기 종료
if generation > 10 and abs(current_fitness - best_fitness) < 1e-6:
break
return best_params, best_fitness
def multi_run_genetic_algorithm(self, tc_seed, runs=3):
best_overall_params = None
best_overall_fitness = float('inf')
original_mutation_rate = self.mutation_rate
original_population_size = self.population_size
for run in range(runs):
print(f"유전 알고리즘 실행 {run+1}/{runs}...")
self.mutation_rate = original_mutation_rate + (run * 0.05)
self.population_size = original_population_size + (run * 50)
params, fitness = self.genetic_algorithm_fit(tc_seed)
if fitness < best_overall_fitness:
best_overall_fitness = fitness
best_overall_params = params
self.mutation_rate = original_mutation_rate
self.population_size = original_population_size
return best_overall_params, best_overall_fitness
def fine_tune_parameters(self, params):
"""
지역 최적화를 통한 파라미터 미세 조정.
"""
tuned_params = params.copy()
param_bounds = [
self.bounds['m'],
self.bounds['omega'],
self.bounds['phi'],
(self.ts[-1] + 0.01, self.ts[-1] + 0.1)
]
for idx in range(4):
def objective(x):
params_copy = tuned_params.copy()
params_copy[idx] = x[0]
return self._func_(params_copy, self.ts, self.log_price)
result = minimize(objective, x0=[tuned_params[idx]],
bounds=[param_bounds[idx]], method='L-BFGS-B')
if result.success:
tuned_params[idx] = result.x[0]
return tuned_params
def verify_oscillations(self, params):
"""
로그 주기적 진동의 유의미성을 확인.
"""
try:
num_oscillations = float(params['num_oscillations'])
damping_factor = float(params['damping_factor'])
if num_oscillations >= 2.0:
return True
if damping_factor >= 0.8:
return True
tc, m, omega, phi = params['tc'], params['m'], params['omega'], params['phi']
...