본문 바로가기
IT - 코딩/AI, 예측모델

시계열 예측 모델 _ LSTM 환율 예측

by 조기정 2023. 10. 24.

차익거래 논문을 쓰기 위해 환율을 3가지로 나누어 비교했었다.

실질적 환율인 비공식환율, 그리고 그냥 일반 우리가 아는 국가가정해주는 공식환율, 그리고 마지막 LSTM으로 예측하는 환율. 

본 글에서는 LSTM으로 환율을 예측하는 코드를 짜보려고 한다.

# 사용할 모든 파일 리스트로 읽어오기
import os
import numpy as np
import pandas as pd
os_file_list = os.listdir('C:/Users/Happy/Desktop/논문용/krwusdtdata/전처리된파일')

read_path = "C:\\Users\\Happy\\Desktop\\논문용\\krwusdtdata\\전처리된파일\\"


# 딕셔너리 안에 모든 데이터 프레임 저장
All_data = {}

for files in os_file_list:
    print(files)
    df= pd.read_csv(read_path+files, encoding = "cp949")
    All_data[files[:-4]] = df

key_list = list(All_data.keys())
key_list = list(set(key_list) - {"BTC_KRW","bin_data"})
key_list

데이터들은 이렇게 되며 사전 연구된 논문에서 유효성이 높은 변수들을 추려서 데이터로 쓴다.

 

import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib as mpl
mpl.rcParams['axes.unicode_minus'] = False

# 그래프에서 한글 폰트 깨지는 문제에 대한 대처(전역 글꼴 설정)
from matplotlib import font_manager, rc
font_name = font_manager.FontProperties(fname='c:/Windows/Fonts/malgun.ttf').get_name()

# import seaborn as sns

    # 데이터 프레임 모든 컬럼 그리기
    # 함수명으로도 정의하기
def columns_plot(data): # y좌표
    
#     col_list_plt = list(data.select_dtypes(exclude = "object").columns)
    col_list_plt = list(data.select_dtypes(include = ["float","int"]).columns)
    x_plt_n = 3 # 한번에 그릴 plt x 축 수
    y_plt_n = 4 # 한번에 그릴 plt y 축 수
    w_space = 0.2 # float("0."+str(x_plt_n)
    h_space = 0.2 # float("0."+str(y_plt_n)
    
    if (x_plt_n < math.ceil(len(col_list_plt) / y_plt_n)):
        x_plt_n = math.ceil(len(col_list_plt) / y_plt_n)

    #plot 이쁘게 
    f, axes = plt.subplots(x_plt_n, y_plt_n)
    f.set_size_inches((20, 15))
    plt.subplots_adjust(wspace = w_space, hspace = h_space)

    for i in range(len(col_list_plt)):

        bb = math.ceil(((i )// y_plt_n))
        axes[bb][i - math.ceil(((i )// y_plt_n)) * y_plt_n].plot(data[col_list_plt[i]],label = col_list_plt[i] )#, color = 'blue', marker = 'o')
        axes[bb][i - math.ceil(((i )// y_plt_n)) * y_plt_n].set_title(col_list_plt[i])
    plt.rc('font', family='Malgun Gothic')
    plt.show()



for i in range(len(key_list)):
    print(key_list[i])
    columns_plot(All_data[key_list[i]])

해당 코드로 각각의 데이터들을 한번 확인해주었고, 

 

def set_index(df):
    df['time'] = pd.to_datetime(df['time'], format='%Y-%m-%d', errors='raise')
    df = df.set_index('time',drop=False)
    return df
    
Use_data = {}
# All_data , key_list
for i in range(len(key_list)):
#     print(key_list[i])
#     print(All_data[key_list[i]].info())
    Use_data[key_list[i]] = All_data[key_list[i]].copy()[["time","Open"]]
#     All_data[key_list[i]] = All_data[key_list[i]]

for i in range(len(key_list)):
    print(key_list[i],i)
    Use_data[key_list[i]] = set_index(Use_data[key_list[i]])

데이터 프레임의 time인덱스를 전부 날짜 형식으로 읽을 수 있게 조정한다.

# 기간 조정
pariod_min_list = [Use_data[key_list[i]].index.min() for i in range(len(key_list)) ]
pariod_max_list = [Use_data[key_list[i]].index.max() for i in range(len(key_list)) ]
pariod_start = max(pariod_min_list)
pariod_end = min(pariod_max_list)
print("시작일 교집합 : ",pariod_start)
print("최종일 교집합 : ",pariod_end)

#날짜 조정.(모든 사용할 데이터들의 날짜를 같게 조정함.)
for i in range(len(key_list)):
    Use_data[key_list[i]] = Use_data[key_list[i]].loc[Use_data[key_list[i]]["time"].between(pariod_start,pariod_end)]

각각의 기간이 다 다르니 기간을 조정해준다. 2001~2022년이다.

# 모든 open 컬럼명을 데이터 프레임 이름으로 바꿈
for i in range(len(key_list)):
    Use_data[key_list[i]] = Use_data[key_list[i]].rename(columns={'Open':key_list[i]})
ALL_DATA_BY_USE = pd.DataFrame()
for i in range(len(key_list)):
#     Use_data[key_list[i]]
    print(len(Use_data[key_list[i]]),key_list[i])
    if i == 0: # 처음시작만 @ 또는 가장 길이가 긴 값!!!!(날짜)
        ALL_DATA_BY_USE = Use_data[key_list[i]]
    else:
        ALL_DATA_BY_USE = pd.concat([ALL_DATA_BY_USE,Use_data[key_list[i]]],axis = 1)
#     try:
#         del ALL_DATA_BY_USE["time"]
#     except:
#         pass
ALL_DATA_BY_USE = ALL_DATA_BY_USE.fillna(method="ffill")
 
if (len(ALL_DATA_BY_USE) == len(ALL_DATA_BY_USE.drop_duplicates())): # 중복데이터가 더 없는지 확인
    pass
else:
    print("길이가 다름")
    
# try:
#     del ALL_DATA_BY_USE["time"]
# except:
#     pass

try:
    del ALL_DATA_BY_USE["time"]
except:
    pass

use_columns = list(ALL_DATA_BY_USE.columns)
target = "KRW_USDT"

 

 

# index가 time의 경우 안좋기 때문에 인덱스 변환.
ALL_DATA_BY_USE = ALL_DATA_BY_USE.reset_index(drop = True)
ALL_DATA_BY_USE

 

from statsmodels.tsa.seasonal import seasonal_decompose


open_col = seasonal_decompose(ALL_DATA_BY_USE[target], model = 'additive' ,period = 500, extrapolate_trend = 1)

fig = plt.figure()
fig = open_col.plot()
fig.set_size_inches(15,12)

ALL_DATA_BY_USE[target+'col_trend'] = open_col.trend
ALL_DATA_BY_USE[target+'col_seasonal'] = open_col.seasonal
ALL_DATA_BY_USE[target+'col_resid'] = open_col.resid

추세와 다양성을 보기 위해서 시계열 분해를 시행한다. 어느정도 시즌성이 보이긴 하나 추세의 영향이 큰 것을 볼 수 있다.

시계열 분해

#### 머신러닝 or 딥러닝을 위한 전처리

여기서 부터 중요하다고 볼 수 있는데 일단, 데이터 전처리가 끝났다고 가정하면 아래 코드는 그냥 쓸 수 있다

X_data = ALL_DATA_BY_USE[list(set(use_columns) - {target})]
# X_data = ALL_DATA_BY_USE.drop([target],axis = 1)
Y_data = ALL_DATA_BY_USE[[target]]
ALL_DATA_BY_USE = pd.concat([X_data,Y_data],axis = 1)

 

 

from sklearn.preprocessing import MinMaxScaler,PowerTransformer,LabelEncoder,StandardScaler

# 이부분 class로 변환할까? 나중에 정해진 범위까지만 데이터 스케일링 할 수도 있음.
# def MinMaxScaler_fun(df): #맞춤 변환 함수와 그에 맞는 데이터를 돌려줌
#     col = list(df.columns)
#     Scaler = MinMaxScaler()
#     Scaler.fit(df)
#     df = Scaler.transform(df)
# #     df = Scaler_X.fit_transform(df)
#     df = pd.DataFrame(df, columns = X_col)
#     return df, Scaler
# X_data ,Scaler = MinMaxScaler_fun(X_data)
# Y_data , Y_data_Scaler = MinMaxScaler_fun(Y_data)

X_col = list(X_data.columns)
Scaler = MinMaxScaler()
Scaler.fit(X_data)
X_data = Scaler.transform(X_data)
#     df = Scaler_X.fit_transform(df)
X_data = pd.DataFrame(X_data, columns = X_col)
    
# 모데링 잘나오게만 할거면 이렇게 하면 됨



# @@ 나중에 여기에 검증셋으로 나누어서 하는 모델도 추가.
# 최대최소 정규화
Y_col = list(Y_data.columns)

MAX_val = float(max(Y_data.values))
MIN_val = float(min(Y_data.values))
# 변환(man,max 변환을 또해주면 기존값을 잃어버림.)
Y_data[Y_col] = Y_data[Y_col].apply(lambda x : (x - MIN_val)/(MAX_val - MIN_val))
# 역변환
# Y_data[Y_col] = Y_data[Y_col].apply(lambda x : (x * (MAX_val - MIN_val) + MIN_val))

자 이 부분까지 변환을 해준다.

 

################ LSTM을 위한 전처리 ################

# 지정해야할 초기변수들 
WINDOW_SIZE=64 # 64 #1024 # n일간 데이터를 기반으로 내일 데이터 예측 @@ 더 장기간 데이터 예측가능?
BATCH_SIZE=16 # 32
day_offset = 0 # 예측할 미래 날짜 day_offset = 0 이면 하루 뒤


def make_window_size_dataset(data, label, window_size, day_offset=0): # x값 , y값 , 윈도우 사이즈, 예측할 미래 날짜
    feature_list = []
    label_list = []
    for i in range(len(data) - window_size - day_offset):
        feature_list.append(np.array(data.iloc[i:i+window_size]))
        label_list.append(np.array(label.iloc[i+window_size + day_offset]))
    return np.array(feature_list), np.array(label_list)

x_WINDOW, y_WINDOW = make_window_size_dataset(X_data, Y_data, WINDOW_SIZE, day_offset)
x_WINDOW.shape,y_WINDOW.shape

LSTM을 위한 전처리다. batch size 와 window size를 조절하면서 최적을 찾아도 된다. day_offset이 0 이면 window size만큼의 전 데이터로 다음 데이터를 예측한다. 만약 1이면 다다음 데이터를 예측한다.

a = math.floor(y_WINDOW.shape[0]*0.1)
b = math.floor(y_WINDOW.shape[0]*0.2)

x_train_WINDOW = x_WINDOW[:-b]
x_validation_WINDOW = x_WINDOW[-b:-a]
x_test_WINDOW = x_WINDOW[-a:]

y_train_WINDOW = y_WINDOW[:-b]
y_validation_WINDOW = y_WINDOW[-b:-a]
y_test_WINDOW = y_WINDOW[-a:]

벨류데이션 셋과 트레인 셋을 나눈다.

from keras.models import Sequential
from keras.layers import Dense, LSTM, Conv1D, Lambda
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import LSTM

from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
from keras.layers import Input, LSTM, Dense

from keras.layers import Flatten
from keras.layers import Dropout
from tensorflow.keras import optimizers
from keras import layers
import tensorflow as tf
from keras import losses

# relu 은닉 층으로 학습
# sigmoid 이진 분류 문제
# softmax 클래스 분류 문제

leaky_relu = tf.nn.leaky_relu # leaky_relu 
activation = leaky_relu #leaky_relu#"relu"#"tanh" #"relu"=>0~1 시그모이드랑 함꼐

model = Sequential() # x_train_feature.shape[1] == WINDOW_SIZE
#activation 을 tanh 로 주는 부분도 고려
model.add(LSTM(128, activation=activation, input_shape=(x_train_WINDOW.shape[1] , x_train_WINDOW.shape[2]), return_sequences = True))
model.add(Dropout(0.8)) # 드롭아웃 추가. 비율은 50%
#@@
model.add(LSTM(64, activation=activation,return_sequences = False))#LSTM
# model.add(LSTM(64, activation=activation))#LSTM
model.add(Dropout(0.5)) # 드롭아웃 추가. 비율은 50%
# model.add(Flatten()) # Flastten 함수는 다차원 모델을 평탄화 하는 것.
# model.add(Dense(8, activation=activation)) 
# model.add(Dropout(0.1)) # 드롭아웃 추가. 비율은 50%
model.add(Dense(1, activation=leaky_relu))#DNN


# model.compile(loss= Huber(), optimizer='adam', metrics=['mse']) # Sequence 학습에 비교적 좋은 퍼포먼스를 내는 Huber()를 사용합니다.
model.compile(loss=Huber(), optimizer=Adam(0.00001), metrics=['mse']) #0.001이 기본 러닝 레이트
# model.compile(loss='mse', optimizer='adam', metrics=['mse'])

# LSTM(64, activation=activation, input_shape=(x_train_WINDOW.shape[1] , x_train_WINDOW.shape[2]), return_sequences = True)
# LSTM(64, activation=activation,return_sequences = False)
# Dense(1, activation=leaky_relu)

시계열에 예측에 좀 더 좋다는 Huber를 loss로 지정하였고, mse로 판단하게 하였다.

import os
from keras.callbacks import EarlyStopping, ModelCheckpoint
# earlystopping은 (patience 수)n번 epoch통안 val_loss 개선이 없다면 학습을 멈춥니다.
early_stop = EarlyStopping(monitor='val_loss', patience=10)

model_path = 'C:\\Users\\Happy\\Desktop\논문용\\krwusdtdata'
filename = os.path.join(model_path,'USDT_LSTM.h5')

checkpoint = ModelCheckpoint(filename, #filepath
                             monitor='val_loss',#모델 저장시 기준이 되는 값 => val_loss는 loss가 가장 적을 때 저장
                             verbose=1, # 이게 1 이면 저장되었다고 표시됨
                             save_best_only=True, # True의 경우 학습 중 현 시점 가장 좋은 모델로 저장됨
                             save_weights_only=True, # True의 경우 모델 레이어 및 가중치도 저장됨
#                              save_freq = BATCH_SIZE, # 'epoch'을 사용할 경우, 매 에폭마다 모델이 저장됩니다. integer을 사용할 경우, 숫자만큼의 배치를 진행되면 모델이 저장됩니다.
                             mode='auto'# val_acc 인 경우, 정확도이기 때문에 클수록 좋습니다. 따라서 이때는 max를 입력해줘야합니다. 만약 val_loss 인 경우, loss 값이기 때문에 값이 작을수록 좋습니다. 따라서 이때는 min을 입력해줘야합니다. auto로 할 경우, 모델이 알아서 min, max를 판단하여 모델을 저장합니다.
                            )

early stop을 이용하여 에포크에 모델 개선이 없으면 멈추게 하였다. 

history = model.fit(x_train_WINDOW, y_train_WINDOW, 
                                    epochs=1000, 
                                    batch_size=BATCH_SIZE,
                                    validation_data=(x_validation_WINDOW, y_validation_WINDOW),
                                    callbacks=[early_stop, checkpoint]) # 여기에 얼리스타핑 ,드롭아웃, L1,L2,엘라 규제 등 추가 가능
# model.load_weights(filename)

이제 학습을 시작한다.

모델은 매번 학습할 필요 없이 저장했다가

model.load_weights(filename) #저장된 최적 모델 불러옴 끄면 그냥 최종 모델 사용가능

를 이용해서 쉽게 불러오면 된다.

pred = model.predict(x_test_WINDOW) #예측값
actual = np.asarray(y_test_WINDOW)

pred = pd.DataFrame(pred,columns = ["예측값"]).apply(lambda x : (x * (MAX_val - MIN_val) + MIN_val)) # 원래 값 복원
actual = pd.DataFrame(actual,columns = ["실제값"]).apply(lambda x : (x * (MAX_val - MIN_val) + MIN_val)) # 예측 값 복원

print(pred.shape, actual.shape)

i = 0
j = len(pred)

plt.figure(figsize=(12, 9))
plt.title('test_data')
plt.plot(actual[i:j], label='actual')
plt.plot(pred[i:j], label='prediction')
plt.legend()
plt.show()

해당 코드를 사용하여 비교한다.

 

https://github.com/GiJungCho/AI_project_/tree/main_/%EC%9E%90%EA%B2%A9%EC%A6%9D_%EB%B9%85%EB%B6%84%EA%B8%B0%20%ED%86%A0%EC%8A%A4%20_AICE