AI
LSTM 모델로 이상 징후를 감지
david100gom
2025. 2. 27. 14:05
약품 냉장고 온도 데이터를 사용하여 LSTM 모델로 이상 징후를 감지하고 예측하는 종합적인 시스템입니다.
- 데이터 처리 및 특성 엔지니어링:
- 실제 데이터 또는 샘플 데이터 생성
- 다양한 이상 패턴 모델링 (급격한 온도 상승, 냉각 실패, 과냉각)
- 시간 기반 특성 및 통계적 특성 생성
- LSTM 모델 구축:
- 2층 LSTM 네트워크로 시계열 패턴 학습
- 드롭아웃으로 과적합 방지
- 클래스 불균형 처리 (이상치는 소수이므로)
- 이상 감지 및 평가:
- 혼동 행렬 및 분류 보고서 생성
- 시각화를 통한 결과 검증
- 미래 예측 기능:
- 24시간 선행 예측으로 문제 사전 감지
- 점진적인 온도 변화 패턴 반영
- 실시간 모니터링 설계:
- 알림 체계 구성 제안
- 실시간 데이터 처리 프레임워크
이 시스템의 장점:
- 약품 안전성 보장
- 설비 오작동 조기 발견
- 규제 준수 지원 (약품 보관 온도 규정)
- 예방적 유지보수 가능
실제 구현 시 센서 데이터 수집 시스템과 연동하고, 이상 감지 임계값을 실제 환경에 맞게 조정해야 합니다. 또한 알림 시스템을 SMS, 이메일 또는 모바일 앱과 연동하면 실제 운영 환경에서 유용하게 활용할 수 있습니다.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import seaborn as sns
import os
from datetime import datetime, timedelta
# 파일 경로 설정
DATA_PATH = "drug_refrigerator_temperature.csv" # 약품 냉장고 온도 데이터 파일
MODEL_PATH = "drug_refrigerator_anomaly_model.h5" # 모델 저장 경로
RESULTS_PATH = "results" # 결과 저장 디렉토리
# 결과 저장 디렉토리 생성
if not os.path.exists(RESULTS_PATH):
os.makedirs(RESULTS_PATH)
# 1. 데이터 로드 및 전처리
def load_data(file_path):
"""
약품 냉장고 온도 데이터를 로드하는 함수
Parameters:
file_path (str): 데이터 파일 경로
Returns:
DataFrame: 처리된 데이터프레임
"""
try:
# 실제 데이터 파일이 있을 경우
if os.path.exists(file_path):
# CSV 파일 로드 (날짜/시간 및 온도 데이터 포함)
df = pd.read_csv(file_path, parse_dates=['timestamp'])
df.set_index('timestamp', inplace=True)
print(f"데이터 로드 완료: {len(df)} 개의 레코드")
# 샘플 데이터 생성 (데이터 파일이 없는 경우)
else:
print(f"파일을 찾을 수 없습니다: {file_path}")
print("샘플 데이터를 생성합니다...")
# 날짜 범위 생성 (5분 간격으로 30일)
date_range = pd.date_range(
start='2023-01-01',
end='2023-01-31',
freq='5min'
)
# 기본 온도 패턴 생성 (약품 냉장고 권장 온도: 2-8°C)
base_temp = 5.0 # 기본 온도 (°C)
# 일간 주기 변동 (냉장고 문 열림, 냉각 사이클 등으로 인한 변동)
hourly_pattern = 0.5 * np.sin(np.linspace(0, 2*np.pi*24*30, len(date_range)))
# 냉각 시스템 사이클 (약 3시간 주기)
cooling_cycle = 0.3 * np.sin(np.linspace(0, 2*np.pi*240*30, len(date_range)))
# 정상 노이즈
normal_noise = 0.2 * np.random.randn(len(date_range))
# 온도 데이터 생성
temperature = base_temp + hourly_pattern + cooling_cycle + normal_noise
# 이상 온도 데이터 삽입 (임의의 날짜에 비정상적인 온도 스파이크)
# 1. 갑작스러운 온도 상승 (문이 열려있는 경우)
anomaly_indices_1 = np.random.choice(
range(len(date_range)),
size=20,
replace=False
)
temperature[anomaly_indices_1] += np.random.uniform(1.5, 3.0, size=20)
# 2. 냉각 실패 (온도가 점진적으로 상승)
anomaly_start = np.random.randint(0, len(date_range) - 288) # 1일(288 포인트) 확보
for i in range(288): # 24시간(5분 간격 = 288 포인트) 동안 점진적 상승
temperature[anomaly_start + i] += 0.01 * i
# 3. 냉각 과도 (온도가 0도 이하로 떨어짐)
anomaly_indices_3 = np.random.choice(
range(len(date_range)),
size=15,
replace=False
)
temperature[anomaly_indices_3] -= np.random.uniform(5.0, 7.0, size=15)
# 데이터프레임 생성
df = pd.DataFrame({
'temperature': temperature
}, index=date_range)
print(f"샘플 데이터 생성 완료: {len(df)} 개의 레코드")
# 기본적인 데이터 전처리
# 1. 결측치 처리
if df['temperature'].isna().sum() > 0:
print(f"결측치 감지: {df['temperature'].isna().sum()} 개")
df['temperature'] = df['temperature'].interpolate(method='time')
print("결측치 보간 완료")
# 2. 이상치 확인 (온도가 약품 보관 범위를 벗어난 경우)
normal_range = (2, 8) # 약품 냉장고 권장 온도 범위: 2-8°C
anomalies = (df['temperature'] < normal_range[0]) | (df['temperature'] > normal_range[1])
print(f"정상 범위를 벗어난 온도: {anomalies.sum()} 개 ({anomalies.sum()/len(df)*100:.2f}%)")
# 3. 추가 특성 생성
# 시간 관련 특성
df['hour'] = df.index.hour
df['day'] = df.index.day
df['dayofweek'] = df.index.dayofweek
# 온도 변화율 (°C/분)
df['temp_change'] = df['temperature'].diff() / 5 # 5분 간격 가정
# 이동 평균 및 표준편차 (과거 1시간)
window_size = 12 # 5분 간격 데이터에서 1시간 = 12개 포인트
df['temp_ma'] = df['temperature'].rolling(window=window_size).mean()
df['temp_std'] = df['temperature'].rolling(window=window_size).std()
# 첫 window_size 행의 NaN 값 처리
df = df.fillna(method='bfill')
# 4. 레이블 생성 (온도가 정상 범위를 벗어나면 1, 아니면 0)
df['anomaly'] = anomalies.astype(int)
# 약품 냉장고 온도 시각화
plt.figure(figsize=(14, 7))
plt.plot(df.index, df['temperature'], 'b-', label='온도')
plt.scatter(
df[df['anomaly'] == 1].index,
df[df['anomaly'] == 1]['temperature'],
c='red',
s=30,
label='이상치'
)
plt.axhline(y=normal_range[0], color='g', linestyle='--', label='최소 온도 (2°C)')
plt.axhline(y=normal_range[1], color='r', linestyle='--', label='최대 온도 (8°C)')
plt.title('약품 냉장고 온도 데이터')
plt.xlabel('시간')
plt.ylabel('온도 (°C)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(RESULTS_PATH, 'temperature_data.png'))
return df
except Exception as e:
print(f"데이터 로드 중 오류 발생: {e}")
return None
# 2. 시퀀스 데이터 생성
def create_sequences(df, target_col, sequence_length=288):
"""
LSTM 모델을 위한 시퀀스 데이터 생성
Parameters:
df (DataFrame): 입력 데이터프레임
target_col (str): 타겟 변수 열 이름
sequence_length (int): 시퀀스 길이 (기본값: 288 = 1일)
Returns:
tuple: (X, y) - 입력 시퀀스와 타겟 값
"""
# 정규화를 위한 특성 선택
features = ['temperature', 'temp_change', 'temp_ma', 'temp_std', 'hour', 'day', 'dayofweek']
# 특성 정규화
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(df[features])
# 타겟 변수
targets = df[target_col].values
X, y = [], []
# 시퀀스 생성
for i in range(len(df) - sequence_length):
X.append(scaled_features[i:i + sequence_length])
y.append(targets[i + sequence_length])
return np.array(X), np.array(y), scaler, features
# 3. LSTM 모델 구축
def build_lstm_model(input_shape, dropout_rate=0.2):
"""
이상 징후 탐지를 위한 LSTM 모델 생성
Parameters:
input_shape (tuple): 입력 데이터 형태 (시퀀스 길이, 특성 수)
dropout_rate (float): 드롭아웃 비율
Returns:
Model: 컴파일된 Keras 모델
"""
model = Sequential([
LSTM(64, activation='relu', return_sequences=True, input_shape=input_shape),
Dropout(dropout_rate),
LSTM(32, activation='relu'),
Dropout(dropout_rate),
Dense(16, activation='relu'),
Dense(1, activation='sigmoid') # 이진 분류 (0: 정상, 1: 이상)
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
return model
# 4. 모델 학습 및 평가
def train_and_evaluate(X_train, y_train, X_test, y_test, input_shape, epochs=50, batch_size=32):
"""
LSTM 모델 학습 및 평가
Parameters:
X_train, y_train: 학습 데이터
X_test, y_test: 테스트 데이터
input_shape: 입력 데이터 형태
epochs: 학습 에포크 수
batch_size: 배치 크기
Returns:
Model: 학습된 모델
History: 학습 기록
"""
# 모델 생성
model = build_lstm_model(input_shape)
# 콜백 설정
callbacks = [
EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
),
ModelCheckpoint(
MODEL_PATH,
monitor='val_loss',
save_best_only=True
)
]
# 클래스 불균형 처리
# 이상 데이터(1)의 가중치를 더 높게 설정
class_weights = {
0: 1.0,
1: np.sum(y_train == 0) / np.sum(y_train == 1) # 정상:이상 비율
}
# 모델 학습
history = model.fit(
X_train,
y_train,
validation_split=0.2,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
class_weight=class_weights,
verbose=1
)
# 모델 평가
loss, accuracy = model.evaluate(X_test, y_test)
print(f"테스트 손실: {loss:.4f}")
print(f"테스트 정확도: {accuracy:.4f}")
# 예측
y_pred_prob = model.predict(X_test)
y_pred = (y_pred_prob > 0.5).astype(int)
# 혼동 행렬
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('혼동 행렬')
plt.ylabel('실제 레이블')
plt.xlabel('예측 레이블')
plt.savefig(os.path.join(RESULTS_PATH, 'confusion_matrix.png'))
# 분류 보고서
print("\n분류 보고서:")
report = classification_report(y_test, y_pred)
print(report)
# 학습 과정 시각화
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='훈련 손실')
plt.plot(history.history['val_loss'], label='검증 손실')
plt.title('모델 손실')
plt.xlabel('에포크')
plt.ylabel('손실')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='훈련 정확도')
plt.plot(history.history['val_accuracy'], label='검증 정확도')
plt.title('모델 정확도')
plt.xlabel('에포크')
plt.ylabel('정확도')
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(RESULTS_PATH, 'training_history.png'))
return model, history
# 5. 이상 징후 예측 및 알림 시스템
def predict_anomalies(model, df, scaler, features, sequence_length, threshold=0.7):
"""
새로운 데이터에 대한 이상 징후 예측
Parameters:
model: 학습된 LSTM 모델
df: 입력 데이터프레임
scaler: 학습에 사용된 스케일러
features: 특성 목록
sequence_length: 시퀀스 길이
threshold: 이상 감지 임계값 (기본값: 0.7)
Returns:
DataFrame: 예측 결과가 포함된 데이터프레임
"""
# 데이터프레임 복사
result_df = df.copy()
# 스케일링
scaled_data = scaler.transform(df[features])
# 예측 결과 저장
predictions = []
anomaly_probs = []
# 각 시점에 대한 예측 (시작점 + 시퀀스 길이부터)
for i in range(len(df) - sequence_length):
# 현재 시퀀스 추출
seq = scaled_data[i:i + sequence_length].reshape(1, sequence_length, len(features))
# 이상 확률 예측
prob = model.predict(seq, verbose=0)[0][0]
# 이전 시점은 NaN으로 채움
if i == 0:
predictions.extend([np.nan] * sequence_length)
anomaly_probs.extend([np.nan] * sequence_length)
# 현재 시점의 예측 값 추가
predictions.append(1 if prob > threshold else 0)
anomaly_probs.append(prob)
# 결과 데이터프레임에 예측 결과 추가
result_df['predicted_anomaly'] = predictions
result_df['anomaly_probability'] = anomaly_probs
# 예측 시각화
plt.figure(figsize=(14, 10))
# 온도 및 이상 징후 플롯
plt.subplot(2, 1, 1)
plt.plot(result_df.index, result_df['temperature'], 'b-', label='온도')
plt.scatter(
result_df[result_df['anomaly'] == 1].index,
result_df[result_df['anomaly'] == 1]['temperature'],
c='red',
s=30,
label='실제 이상'
)
plt.scatter(
result_df[result_df['predicted_anomaly'] == 1].index,
result_df[result_df['predicted_anomaly'] == 1]['temperature'],
c='orange',
marker='x',
s=50,
label='예측 이상'
)
plt.axhline(y=2, color='g', linestyle='--', label='최소 온도 (2°C)')
plt.axhline(y=8, color='r', linestyle='--', label='최대 온도 (8°C)')
plt.title('약품 냉장고 온도 이상 징후 예측')
plt.ylabel('온도 (°C)')
plt.legend()
plt.grid(True)
# 이상 확률 플롯
plt.subplot(2, 1, 2)
plt.plot(result_df.index, result_df['anomaly_probability'], 'g-', label='이상 확률')
plt.axhline(y=threshold, color='r', linestyle='--', label=f'임계값 ({threshold})')
plt.title('이상 징후 확률')
plt.xlabel('시간')
plt.ylabel('확률')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(RESULTS_PATH, 'anomaly_predictions.png'))
return result_df
# 6. 미래 온도 및 이상 징후 예측
def predict_future(model, last_sequence, scaler, features, n_steps=288, threshold=0.7):
"""
미래 온도 및 이상 징후 예측
Parameters:
model: 학습된 LSTM 모델
last_sequence: 마지막 시퀀스 데이터
scaler: 학습에 사용된 스케일러
features: 특성 목록
n_steps: 예측할 미래 스텝 수
threshold: 이상 감지 임계값
Returns:
DataFrame: 미래 예측 결과
"""
# 마지막 시점 가져오기
last_time = pd.to_datetime(last_sequence.index[-1])
# 현재 시퀀스 저장
current_sequence = last_sequence[features].values
# 미래 예측 결과 저장
future_times = []
future_temperatures = []
future_anomaly_probs = []
future_anomalies = []
# 미래 시점별 예측
for i in range(n_steps):
# 다음 시간 계산
next_time = last_time + timedelta(minutes=(i+1)*5)
future_times.append(next_time)
# 미래 시간 정보
hour = next_time.hour
day = next_time.day
dayofweek = next_time.dayofweek
# 현재 시퀀스로 예측
scaled_seq = scaler.transform(current_sequence)
seq_reshaped = scaled_seq.reshape(1, len(current_sequence), len(features))
anomaly_prob = float(model.predict(seq_reshaped, verbose=0)[0][0])
# 예측 결과 저장
future_anomaly_probs.append(anomaly_prob)
future_anomalies.append(1 if anomaly_prob > threshold else 0)
# 다음 온도값 추정 (간단한 예측 - 실제로는 별도의 온도 예측 모델이 필요할 수 있음)
# 여기서는 최근 패턴을 기반으로 다음 온도를 추정
last_temps = current_sequence[-24:, 0] # 최근 2시간의 온도
temp_change_rate = np.mean(np.diff(last_temps))
next_temp = last_temps[-1] + temp_change_rate
# 온도 범위 보정
next_temp = max(min(next_temp, 10), 0) # 0-10°C 범위로 제한
future_temperatures.append(next_temp)
# 다음 시퀀스를 위한 새 데이터 포인트 생성
temp_change = next_temp - current_sequence[-1, 0]
temp_ma = np.mean(np.append(current_sequence[-11:, 0], next_temp))
temp_std = np.std(np.append(current_sequence[-11:, 0], next_temp))
# 새 데이터 포인트
new_point = np.array([[
next_temp, temp_change, temp_ma, temp_std, hour, day, dayofweek
]])
# 시퀀스 업데이트 (가장 오래된 값 제거, 새 값 추가)
current_sequence = np.vstack((current_sequence[1:], new_point))
# 결과 데이터프레임 생성
future_df = pd.DataFrame({
'temperature': future_temperatures,
'anomaly_probability': future_anomaly_probs,
'predicted_anomaly': future_anomalies
}, index=future_times)
# 미래 예측 시각화
plt.figure(figsize=(14, 7))
# 과거 데이터와 미래 예측 함께 표시
plt.plot(
last_sequence.index[-288:],
last_sequence['temperature'][-288:],
'b-',
label='과거 온도'
)
plt.plot(
future_df.index,
future_df['temperature'],
'g--',
label='예측 온도'
)
# 예측된 이상 징후 표시
plt.scatter(
future_df[future_df['predicted_anomaly'] == 1].index,
future_df[future_df['predicted_anomaly'] == 1]['temperature'],
c='red',
marker='X',
s=70,
label='예측된 이상 징후'
)
plt.axhline(y=2, color='g', linestyle='--', label='최소 온도 (2°C)')
plt.axhline(y=8, color='r', linestyle='--', label='최대 온도 (8°C)')
plt.title('약품 냉장고 미래 온도 및 이상 징후 예측')
plt.xlabel('시간')
plt.ylabel('온도 (°C)')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(RESULTS_PATH, 'future_predictions.png'))
# 경고 메시지 생성
if future_df['predicted_anomaly'].sum() > 0:
first_anomaly = future_df[future_df['predicted_anomaly'] == 1].index[0]
time_to_anomaly = (first_anomaly - last_time).total_seconds() / 60
print(f"\n⚠️ 경고: {time_to_anomaly:.0f}분 후 이상 징후 발생 예측!")
print(f"예상 시간: {first_anomaly}")
print(f"예상 온도: {future_df.loc[first_anomaly, 'temperature']:.2f}°C")
print(f"이상 확률: {future_df.loc[first_anomaly, 'anomaly_probability']:.4f}")
else:
print("\n✅ 향후 24시간 동안 이상 징후가 예측되지 않았습니다.")
return future_df
# 7. 실시간 모니터링 및 알림 함수 (실제 사용 시 구현)
def setup_monitoring(model, scaler, features, threshold=0.7):
"""
실시간 모니터링 및 알림 설정 (실제 구현 시 활용)
Parameters:
model: 학습된 LSTM 모델
scaler: 정규화에 사용된 스케일러
features: 특성 목록
threshold: 이상 감지 임계값
"""
print("\n===== 실시간 모니터링 시스템 설정 =====")
print("※ 이 함수는 실제 모니터링 시스템 구현의 예시입니다.")
print("※ 실제 구현 시 센서 데이터 수집 및 알림 시스템과 연동해야 합니다.\n")
# 실제 구현 시 필요한 단계
print("1. 온도 센서와 연결 설정")
print("2. 데이터 수집 간격 설정 (권장: 5분)")
print("3. 알림 채널 설정:")
print(" - SMS 알림")
print(" - 이메일 알림")
print(" - 모바일 앱 푸시 알림")
print("4. 이상 징후 감지 시 자동 조치 설정:")
print(" - 백업 냉각 시스템 활성화")
print(" - 담당자 자동 호출")
print("\n알림 임계값 설정 완료: {:.2f}".format(threshold))
print("모니터링 시스템 준비 완료!")
# 8. 메인 함수
def main():
"""
메인 함수 - 전체 워크플로우 실행
"""
print("===== 약품 냉장고 온도 이상 징후 예측 시스템 =====")
# 1. 데이터 로드
df = load_data(DATA_PATH)
if df is None:
print("프로그램 종료: 데이터를 로드할 수 없습니다.")
return
# 2. 시퀀스 데이터 생성
sequence_length = 288 # 1일치 데이터 (5분 간격)
X, y, scaler, features = create_sequences(df, 'anomaly', sequence_length)
# 3. 학습/테스트 데이터 분할
train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
print(f"학습 데이터 크기: {X_train.shape}")
print(f"테스트 데이터 크기: {X_test.shape}")
# 4. 모델 학습 및 평가
input_shape = (X_train.shape[1], X_train.shape[2])
# 저장된 모델이 있는지 확인
if os.path.exists(MODEL_PATH):
print(f"저장된 모델을 불러옵니다: {MODEL_PATH}")
model = load_model(MODEL_PATH)
else:
print("새 모델을 학습합니다...")
model, _ = train_and_evaluate(X_train, y_train, X_test, y_test, input_shape)
# 5. 이상 징후 예측
result_df = predict_anomalies(model, df, scaler, features, sequence_length)
# 6. 미래 예측
# 마지막 시퀀스 데이터 선택
last_sequence = df.iloc[-sequence_length:]
# 미래 24시간 예측 (5분 간격, 24시간 = 288 포인트)
future_df = predict_future(model, last_sequence, scaler, features)
# 7. 실시간 모니터링 시스템 설정 (데모)
setup_monitoring(model, scaler, features)
print("\n===== 프로그램 완료 =====")
print(f"결과가 {RESULTS_PATH} 디렉토리에 저장되었습니다.")
# 프로그램 실행
if __name__ == "__main__":
main()
728x90