AI

LSTM 모델로 이상 징후를 감지

david100gom 2025. 2. 27. 14:05

약품 냉장고 온도 데이터를 사용하여 LSTM 모델로 이상 징후를 감지하고 예측하는 종합적인 시스템입니다. 

  1. 데이터 처리 및 특성 엔지니어링:
    • 실제 데이터 또는 샘플 데이터 생성
    • 다양한 이상 패턴 모델링 (급격한 온도 상승, 냉각 실패, 과냉각)
    • 시간 기반 특성 및 통계적 특성 생성
  2. LSTM 모델 구축:
    • 2층 LSTM 네트워크로 시계열 패턴 학습
    • 드롭아웃으로 과적합 방지
    • 클래스 불균형 처리 (이상치는 소수이므로)
  3. 이상 감지 및 평가:
    • 혼동 행렬 및 분류 보고서 생성
    • 시각화를 통한 결과 검증
  4. 미래 예측 기능:
    • 24시간 선행 예측으로 문제 사전 감지
    • 점진적인 온도 변화 패턴 반영
  5. 실시간 모니터링 설계:
    • 알림 체계 구성 제안
    • 실시간 데이터 처리 프레임워크

이 시스템의 장점:

  • 약품 안전성 보장
  • 설비 오작동 조기 발견
  • 규제 준수 지원 (약품 보관 온도 규정)
  • 예방적 유지보수 가능

실제 구현 시 센서 데이터 수집 시스템과 연동하고, 이상 감지 임계값을 실제 환경에 맞게 조정해야 합니다. 또한 알림 시스템을 SMS, 이메일 또는 모바일 앱과 연동하면 실제 운영 환경에서 유용하게 활용할 수 있습니다.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import seaborn as sns
import os
from datetime import datetime, timedelta

# 파일 경로 설정
DATA_PATH = "drug_refrigerator_temperature.csv"  # 약품 냉장고 온도 데이터 파일
MODEL_PATH = "drug_refrigerator_anomaly_model.h5"  # 모델 저장 경로
RESULTS_PATH = "results"  # 결과 저장 디렉토리

# 결과 저장 디렉토리 생성
if not os.path.exists(RESULTS_PATH):
    os.makedirs(RESULTS_PATH)

# 1. 데이터 로드 및 전처리
def load_data(file_path):
    """
    약품 냉장고 온도 데이터를 로드하는 함수
    
    Parameters:
    file_path (str): 데이터 파일 경로
    
    Returns:
    DataFrame: 처리된 데이터프레임
    """
    try:
        # 실제 데이터 파일이 있을 경우
        if os.path.exists(file_path):
            # CSV 파일 로드 (날짜/시간 및 온도 데이터 포함)
            df = pd.read_csv(file_path, parse_dates=['timestamp'])
            df.set_index('timestamp', inplace=True)
            print(f"데이터 로드 완료: {len(df)} 개의 레코드")
            
        # 샘플 데이터 생성 (데이터 파일이 없는 경우)
        else:
            print(f"파일을 찾을 수 없습니다: {file_path}")
            print("샘플 데이터를 생성합니다...")
            
            # 날짜 범위 생성 (5분 간격으로 30일)
            date_range = pd.date_range(
                start='2023-01-01', 
                end='2023-01-31', 
                freq='5min'
            )
            
            # 기본 온도 패턴 생성 (약품 냉장고 권장 온도: 2-8°C)
            base_temp = 5.0  # 기본 온도 (°C)
            
            # 일간 주기 변동 (냉장고 문 열림, 냉각 사이클 등으로 인한 변동)
            hourly_pattern = 0.5 * np.sin(np.linspace(0, 2*np.pi*24*30, len(date_range)))
            
            # 냉각 시스템 사이클 (약 3시간 주기)
            cooling_cycle = 0.3 * np.sin(np.linspace(0, 2*np.pi*240*30, len(date_range)))
            
            # 정상 노이즈
            normal_noise = 0.2 * np.random.randn(len(date_range))
            
            # 온도 데이터 생성
            temperature = base_temp + hourly_pattern + cooling_cycle + normal_noise
            
            # 이상 온도 데이터 삽입 (임의의 날짜에 비정상적인 온도 스파이크)
            # 1. 갑작스러운 온도 상승 (문이 열려있는 경우)
            anomaly_indices_1 = np.random.choice(
                range(len(date_range)), 
                size=20, 
                replace=False
            )
            temperature[anomaly_indices_1] += np.random.uniform(1.5, 3.0, size=20)
            
            # 2. 냉각 실패 (온도가 점진적으로 상승)
            anomaly_start = np.random.randint(0, len(date_range) - 288)  # 1일(288 포인트) 확보
            for i in range(288):  # 24시간(5분 간격 = 288 포인트) 동안 점진적 상승
                temperature[anomaly_start + i] += 0.01 * i
            
            # 3. 냉각 과도 (온도가 0도 이하로 떨어짐)
            anomaly_indices_3 = np.random.choice(
                range(len(date_range)), 
                size=15, 
                replace=False
            )
            temperature[anomaly_indices_3] -= np.random.uniform(5.0, 7.0, size=15)
            
            # 데이터프레임 생성
            df = pd.DataFrame({
                'temperature': temperature
            }, index=date_range)
            
            print(f"샘플 데이터 생성 완료: {len(df)} 개의 레코드")
        
        # 기본적인 데이터 전처리
        # 1. 결측치 처리
        if df['temperature'].isna().sum() > 0:
            print(f"결측치 감지: {df['temperature'].isna().sum()} 개")
            df['temperature'] = df['temperature'].interpolate(method='time')
            print("결측치 보간 완료")
        
        # 2. 이상치 확인 (온도가 약품 보관 범위를 벗어난 경우)
        normal_range = (2, 8)  # 약품 냉장고 권장 온도 범위: 2-8°C
        anomalies = (df['temperature'] < normal_range[0]) | (df['temperature'] > normal_range[1])
        print(f"정상 범위를 벗어난 온도: {anomalies.sum()} 개 ({anomalies.sum()/len(df)*100:.2f}%)")
        
        # 3. 추가 특성 생성
        # 시간 관련 특성
        df['hour'] = df.index.hour
        df['day'] = df.index.day
        df['dayofweek'] = df.index.dayofweek
        
        # 온도 변화율 (°C/분)
        df['temp_change'] = df['temperature'].diff() / 5  # 5분 간격 가정
        
        # 이동 평균 및 표준편차 (과거 1시간)
        window_size = 12  # 5분 간격 데이터에서 1시간 = 12개 포인트
        df['temp_ma'] = df['temperature'].rolling(window=window_size).mean()
        df['temp_std'] = df['temperature'].rolling(window=window_size).std()
        
        # 첫 window_size 행의 NaN 값 처리
        df = df.fillna(method='bfill')
        
        # 4. 레이블 생성 (온도가 정상 범위를 벗어나면 1, 아니면 0)
        df['anomaly'] = anomalies.astype(int)
        
        # 약품 냉장고 온도 시각화
        plt.figure(figsize=(14, 7))
        plt.plot(df.index, df['temperature'], 'b-', label='온도')
        plt.scatter(
            df[df['anomaly'] == 1].index, 
            df[df['anomaly'] == 1]['temperature'], 
            c='red', 
            s=30, 
            label='이상치'
        )
        plt.axhline(y=normal_range[0], color='g', linestyle='--', label='최소 온도 (2°C)')
        plt.axhline(y=normal_range[1], color='r', linestyle='--', label='최대 온도 (8°C)')
        plt.title('약품 냉장고 온도 데이터')
        plt.xlabel('시간')
        plt.ylabel('온도 (°C)')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(os.path.join(RESULTS_PATH, 'temperature_data.png'))
        
        return df
    
    except Exception as e:
        print(f"데이터 로드 중 오류 발생: {e}")
        return None

# 2. 시퀀스 데이터 생성
def create_sequences(df, target_col, sequence_length=288):
    """
    LSTM 모델을 위한 시퀀스 데이터 생성
    
    Parameters:
    df (DataFrame): 입력 데이터프레임
    target_col (str): 타겟 변수 열 이름
    sequence_length (int): 시퀀스 길이 (기본값: 288 = 1일)
    
    Returns:
    tuple: (X, y) - 입력 시퀀스와 타겟 값
    """
    # 정규화를 위한 특성 선택
    features = ['temperature', 'temp_change', 'temp_ma', 'temp_std', 'hour', 'day', 'dayofweek']
    
    # 특성 정규화
    scaler = MinMaxScaler()
    scaled_features = scaler.fit_transform(df[features])
    
    # 타겟 변수
    targets = df[target_col].values
    
    X, y = [], []
    
    # 시퀀스 생성
    for i in range(len(df) - sequence_length):
        X.append(scaled_features[i:i + sequence_length])
        y.append(targets[i + sequence_length])
    
    return np.array(X), np.array(y), scaler, features

# 3. LSTM 모델 구축
def build_lstm_model(input_shape, dropout_rate=0.2):
    """
    이상 징후 탐지를 위한 LSTM 모델 생성
    
    Parameters:
    input_shape (tuple): 입력 데이터 형태 (시퀀스 길이, 특성 수)
    dropout_rate (float): 드롭아웃 비율
    
    Returns:
    Model: 컴파일된 Keras 모델
    """
    model = Sequential([
        LSTM(64, activation='relu', return_sequences=True, input_shape=input_shape),
        Dropout(dropout_rate),
        LSTM(32, activation='relu'),
        Dropout(dropout_rate),
        Dense(16, activation='relu'),
        Dense(1, activation='sigmoid')  # 이진 분류 (0: 정상, 1: 이상)
    ])
    
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# 4. 모델 학습 및 평가
def train_and_evaluate(X_train, y_train, X_test, y_test, input_shape, epochs=50, batch_size=32):
    """
    LSTM 모델 학습 및 평가
    
    Parameters:
    X_train, y_train: 학습 데이터
    X_test, y_test: 테스트 데이터
    input_shape: 입력 데이터 형태
    epochs: 학습 에포크 수
    batch_size: 배치 크기
    
    Returns:
    Model: 학습된 모델
    History: 학습 기록
    """
    # 모델 생성
    model = build_lstm_model(input_shape)
    
    # 콜백 설정
    callbacks = [
        EarlyStopping(
            monitor='val_loss', 
            patience=10, 
            restore_best_weights=True
        ),
        ModelCheckpoint(
            MODEL_PATH, 
            monitor='val_loss', 
            save_best_only=True
        )
    ]
    
    # 클래스 불균형 처리
    # 이상 데이터(1)의 가중치를 더 높게 설정
    class_weights = {
        0: 1.0,
        1: np.sum(y_train == 0) / np.sum(y_train == 1)  # 정상:이상 비율
    }
    
    # 모델 학습
    history = model.fit(
        X_train, 
        y_train,
        validation_split=0.2,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        class_weight=class_weights,
        verbose=1
    )
    
    # 모델 평가
    loss, accuracy = model.evaluate(X_test, y_test)
    print(f"테스트 손실: {loss:.4f}")
    print(f"테스트 정확도: {accuracy:.4f}")
    
    # 예측
    y_pred_prob = model.predict(X_test)
    y_pred = (y_pred_prob > 0.5).astype(int)
    
    # 혼동 행렬
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('혼동 행렬')
    plt.ylabel('실제 레이블')
    plt.xlabel('예측 레이블')
    plt.savefig(os.path.join(RESULTS_PATH, 'confusion_matrix.png'))
    
    # 분류 보고서
    print("\n분류 보고서:")
    report = classification_report(y_test, y_pred)
    print(report)
    
    # 학습 과정 시각화
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='훈련 손실')
    plt.plot(history.history['val_loss'], label='검증 손실')
    plt.title('모델 손실')
    plt.xlabel('에포크')
    plt.ylabel('손실')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='훈련 정확도')
    plt.plot(history.history['val_accuracy'], label='검증 정확도')
    plt.title('모델 정확도')
    plt.xlabel('에포크')
    plt.ylabel('정확도')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, 'training_history.png'))
    
    return model, history

# 5. 이상 징후 예측 및 알림 시스템
def predict_anomalies(model, df, scaler, features, sequence_length, threshold=0.7):
    """
    새로운 데이터에 대한 이상 징후 예측
    
    Parameters:
    model: 학습된 LSTM 모델
    df: 입력 데이터프레임
    scaler: 학습에 사용된 스케일러
    features: 특성 목록
    sequence_length: 시퀀스 길이
    threshold: 이상 감지 임계값 (기본값: 0.7)
    
    Returns:
    DataFrame: 예측 결과가 포함된 데이터프레임
    """
    # 데이터프레임 복사
    result_df = df.copy()
    
    # 스케일링
    scaled_data = scaler.transform(df[features])
    
    # 예측 결과 저장
    predictions = []
    anomaly_probs = []
    
    # 각 시점에 대한 예측 (시작점 + 시퀀스 길이부터)
    for i in range(len(df) - sequence_length):
        # 현재 시퀀스 추출
        seq = scaled_data[i:i + sequence_length].reshape(1, sequence_length, len(features))
        
        # 이상 확률 예측
        prob = model.predict(seq, verbose=0)[0][0]
        
        # 이전 시점은 NaN으로 채움
        if i == 0:
            predictions.extend([np.nan] * sequence_length)
            anomaly_probs.extend([np.nan] * sequence_length)
        
        # 현재 시점의 예측 값 추가
        predictions.append(1 if prob > threshold else 0)
        anomaly_probs.append(prob)
    
    # 결과 데이터프레임에 예측 결과 추가
    result_df['predicted_anomaly'] = predictions
    result_df['anomaly_probability'] = anomaly_probs
    
    # 예측 시각화
    plt.figure(figsize=(14, 10))
    
    # 온도 및 이상 징후 플롯
    plt.subplot(2, 1, 1)
    plt.plot(result_df.index, result_df['temperature'], 'b-', label='온도')
    plt.scatter(
        result_df[result_df['anomaly'] == 1].index, 
        result_df[result_df['anomaly'] == 1]['temperature'], 
        c='red', 
        s=30, 
        label='실제 이상'
    )
    plt.scatter(
        result_df[result_df['predicted_anomaly'] == 1].index, 
        result_df[result_df['predicted_anomaly'] == 1]['temperature'], 
        c='orange', 
        marker='x', 
        s=50, 
        label='예측 이상'
    )
    plt.axhline(y=2, color='g', linestyle='--', label='최소 온도 (2°C)')
    plt.axhline(y=8, color='r', linestyle='--', label='최대 온도 (8°C)')
    plt.title('약품 냉장고 온도 이상 징후 예측')
    plt.ylabel('온도 (°C)')
    plt.legend()
    plt.grid(True)
    
    # 이상 확률 플롯
    plt.subplot(2, 1, 2)
    plt.plot(result_df.index, result_df['anomaly_probability'], 'g-', label='이상 확률')
    plt.axhline(y=threshold, color='r', linestyle='--', label=f'임계값 ({threshold})')
    plt.title('이상 징후 확률')
    plt.xlabel('시간')
    plt.ylabel('확률')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, 'anomaly_predictions.png'))
    
    return result_df

# 6. 미래 온도 및 이상 징후 예측
def predict_future(model, last_sequence, scaler, features, n_steps=288, threshold=0.7):
    """
    미래 온도 및 이상 징후 예측
    
    Parameters:
    model: 학습된 LSTM 모델
    last_sequence: 마지막 시퀀스 데이터
    scaler: 학습에 사용된 스케일러
    features: 특성 목록
    n_steps: 예측할 미래 스텝 수
    threshold: 이상 감지 임계값
    
    Returns:
    DataFrame: 미래 예측 결과
    """
    # 마지막 시점 가져오기
    last_time = pd.to_datetime(last_sequence.index[-1])
    
    # 현재 시퀀스 저장
    current_sequence = last_sequence[features].values
    
    # 미래 예측 결과 저장
    future_times = []
    future_temperatures = []
    future_anomaly_probs = []
    future_anomalies = []
    
    # 미래 시점별 예측
    for i in range(n_steps):
        # 다음 시간 계산
        next_time = last_time + timedelta(minutes=(i+1)*5)
        future_times.append(next_time)
        
        # 미래 시간 정보
        hour = next_time.hour
        day = next_time.day
        dayofweek = next_time.dayofweek
        
        # 현재 시퀀스로 예측
        scaled_seq = scaler.transform(current_sequence)
        seq_reshaped = scaled_seq.reshape(1, len(current_sequence), len(features))
        anomaly_prob = float(model.predict(seq_reshaped, verbose=0)[0][0])
        
        # 예측 결과 저장
        future_anomaly_probs.append(anomaly_prob)
        future_anomalies.append(1 if anomaly_prob > threshold else 0)
        
        # 다음 온도값 추정 (간단한 예측 - 실제로는 별도의 온도 예측 모델이 필요할 수 있음)
        # 여기서는 최근 패턴을 기반으로 다음 온도를 추정
        last_temps = current_sequence[-24:, 0]  # 최근 2시간의 온도
        temp_change_rate = np.mean(np.diff(last_temps))
        next_temp = last_temps[-1] + temp_change_rate
        
        # 온도 범위 보정
        next_temp = max(min(next_temp, 10), 0)  # 0-10°C 범위로 제한
        future_temperatures.append(next_temp)
        
        # 다음 시퀀스를 위한 새 데이터 포인트 생성
        temp_change = next_temp - current_sequence[-1, 0]
        temp_ma = np.mean(np.append(current_sequence[-11:, 0], next_temp))
        temp_std = np.std(np.append(current_sequence[-11:, 0], next_temp))
        
        # 새 데이터 포인트
        new_point = np.array([[
            next_temp, temp_change, temp_ma, temp_std, hour, day, dayofweek
        ]])
        
        # 시퀀스 업데이트 (가장 오래된 값 제거, 새 값 추가)
        current_sequence = np.vstack((current_sequence[1:], new_point))
    
    # 결과 데이터프레임 생성
    future_df = pd.DataFrame({
        'temperature': future_temperatures,
        'anomaly_probability': future_anomaly_probs,
        'predicted_anomaly': future_anomalies
    }, index=future_times)
    
    # 미래 예측 시각화
    plt.figure(figsize=(14, 7))
    
    # 과거 데이터와 미래 예측 함께 표시
    plt.plot(
        last_sequence.index[-288:], 
        last_sequence['temperature'][-288:], 
        'b-', 
        label='과거 온도'
    )
    plt.plot(
        future_df.index, 
        future_df['temperature'], 
        'g--', 
        label='예측 온도'
    )
    
    # 예측된 이상 징후 표시
    plt.scatter(
        future_df[future_df['predicted_anomaly'] == 1].index,
        future_df[future_df['predicted_anomaly'] == 1]['temperature'],
        c='red',
        marker='X',
        s=70,
        label='예측된 이상 징후'
    )
    
    plt.axhline(y=2, color='g', linestyle='--', label='최소 온도 (2°C)')
    plt.axhline(y=8, color='r', linestyle='--', label='최대 온도 (8°C)')
    plt.title('약품 냉장고 미래 온도 및 이상 징후 예측')
    plt.xlabel('시간')
    plt.ylabel('온도 (°C)')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, 'future_predictions.png'))
    
    # 경고 메시지 생성
    if future_df['predicted_anomaly'].sum() > 0:
        first_anomaly = future_df[future_df['predicted_anomaly'] == 1].index[0]
        time_to_anomaly = (first_anomaly - last_time).total_seconds() / 60
        print(f"\n⚠️ 경고: {time_to_anomaly:.0f}분 후 이상 징후 발생 예측!")
        print(f"예상 시간: {first_anomaly}")
        print(f"예상 온도: {future_df.loc[first_anomaly, 'temperature']:.2f}°C")
        print(f"이상 확률: {future_df.loc[first_anomaly, 'anomaly_probability']:.4f}")
    else:
        print("\n✅ 향후 24시간 동안 이상 징후가 예측되지 않았습니다.")
    
    return future_df

# 7. 실시간 모니터링 및 알림 함수 (실제 사용 시 구현)
def setup_monitoring(model, scaler, features, threshold=0.7):
    """
    실시간 모니터링 및 알림 설정 (실제 구현 시 활용)
    
    Parameters:
    model: 학습된 LSTM 모델
    scaler: 정규화에 사용된 스케일러
    features: 특성 목록
    threshold: 이상 감지 임계값
    """
    print("\n===== 실시간 모니터링 시스템 설정 =====")
    print("※ 이 함수는 실제 모니터링 시스템 구현의 예시입니다.")
    print("※ 실제 구현 시 센서 데이터 수집 및 알림 시스템과 연동해야 합니다.\n")
    
    # 실제 구현 시 필요한 단계
    print("1. 온도 센서와 연결 설정")
    print("2. 데이터 수집 간격 설정 (권장: 5분)")
    print("3. 알림 채널 설정:")
    print("   - SMS 알림")
    print("   - 이메일 알림")
    print("   - 모바일 앱 푸시 알림")
    print("4. 이상 징후 감지 시 자동 조치 설정:")
    print("   - 백업 냉각 시스템 활성화")
    print("   - 담당자 자동 호출")
    print("\n알림 임계값 설정 완료: {:.2f}".format(threshold))
    print("모니터링 시스템 준비 완료!")

# 8. 메인 함수
def main():
    """
    메인 함수 - 전체 워크플로우 실행
    """
    print("===== 약품 냉장고 온도 이상 징후 예측 시스템 =====")
    
    # 1. 데이터 로드
    df = load_data(DATA_PATH)
    if df is None:
        print("프로그램 종료: 데이터를 로드할 수 없습니다.")
        return
    
    # 2. 시퀀스 데이터 생성
    sequence_length = 288  # 1일치 데이터 (5분 간격)
    X, y, scaler, features = create_sequences(df, 'anomaly', sequence_length)
    
    # 3. 학습/테스트 데이터 분할
    train_size = int(0.8 * len(X))
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    print(f"학습 데이터 크기: {X_train.shape}")
    print(f"테스트 데이터 크기: {X_test.shape}")
    
    # 4. 모델 학습 및 평가
    input_shape = (X_train.shape[1], X_train.shape[2])
    
    # 저장된 모델이 있는지 확인
    if os.path.exists(MODEL_PATH):
        print(f"저장된 모델을 불러옵니다: {MODEL_PATH}")
        model = load_model(MODEL_PATH)
    else:
        print("새 모델을 학습합니다...")
        model, _ = train_and_evaluate(X_train, y_train, X_test, y_test, input_shape)
    
    # 5. 이상 징후 예측
    result_df = predict_anomalies(model, df, scaler, features, sequence_length)
    
    # 6. 미래 예측
    # 마지막 시퀀스 데이터 선택
    last_sequence = df.iloc[-sequence_length:]
    
    # 미래 24시간 예측 (5분 간격, 24시간 = 288 포인트)
    future_df = predict_future(model, last_sequence, scaler, features)
    
    # 7. 실시간 모니터링 시스템 설정 (데모)
    setup_monitoring(model, scaler, features)
    
    print("\n===== 프로그램 완료 =====")
    print(f"결과가 {RESULTS_PATH} 디렉토리에 저장되었습니다.")

# 프로그램 실행
if __name__ == "__main__":
    main()
728x90