28. 미니 프로젝트: 로그 분석기
- 고급편에서 배운 것을 하나의 실전 도구에 통합한다.
- 정규식·datetime·자료구조·파일 입출력을 결합한다.
- 실무에서 흔한 "로그 분석"이라는 문제를 직접 푼다.
무엇을 만드나
서버나 프로그램이 남긴 로그 파일을 분석하는 도구를 만듭니다. 로그를 파싱해 레벨별로 집계하고, 오류만 추려내고, 시간대별 분포와 통계를 냅니다. 실무에서 "무슨 일이 있었는지" 로그를 뒤질 때 쓰는 바로 그 작업입니다.
고급편에서 배운 것들이 어떻게 어우러지는지 보세요.
| 기능 | 사용하는 고급 개념 |
|---|---|
| 로그 한 줄 파싱 | 21장 정규표현식 (그룹 추출) |
| 시각 처리·기간 계산 | 23장 날짜와 시간 |
| 레벨별·시간대별 집계 | 26장 자료구조 (Counter, defaultdict) |
| 오류 필터링 | 중급 13장 컴프리헨션 |
| 파일 읽기 | 중급 16장 파일 입출력 |
| 타입 힌트·데이터 클래스 | 중급 18장 타입 힌트 |
분석할 로그 형식
이런 형식의 로그 파일(sample.log)을 분석한다고 합시다. 22장에서 본 로깅 출력과 같은 형태입니다.
2025-12-31 09:15:23 INFO 서버 시작 2025-12-31 09:16:01 INFO 사용자 로그인 user=alice 2025-12-31 09:17:45 WARNING 응답 지연 endpoint=/api/data 2025-12-31 09:18:30 ERROR DB 연결 실패 host=db01 2025-12-31 09:20:55 ERROR 파일 없음 path=/tmp/x.txt 2025-12-31 10:01:15 CRITICAL 시스템 다운
각 줄은 날짜시간 레벨 메시지 구조입니다. 이 패턴을 정규식으로 잡아냅니다.
1단계: 로그 한 줄 파싱
가장 먼저, 로그 한 줄을 구조화된 데이터로 바꿉니다. 21장의 명명 그룹으로 시간·레벨·메시지를 추출합니다.
# log_analyzer.py import re from dataclasses import dataclass from datetime import datetime # 로그 형식을 명명 그룹으로 표현 LOG_PATTERN = re.compile( r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+" r"(?P<level>DEBUG|INFO|WARNING|ERROR|CRITICAL)\s+" r"(?P<message>.*)" ) @dataclass class LogEntry: timestamp: datetime level: str message: str def parse_line(line: str) -> LogEntry | None: m = LOG_PATTERN.match(line.strip()) if not m: return None # 형식 안 맞으면 None ts = datetime.strptime(m.group("timestamp"), "%Y-%m-%d %H:%M:%S") return LogEntry(ts, m.group("level"), m.group("message"))
새 요소 두 가지를 짚어봅시다.
- 정규식 패턴:
(?P<level>DEBUG|INFO|...)처럼 명명 그룹으로 각 부분을 잡고,m.group("level")로 꺼냅니다(21장).|는 "또는"이라 5개 레벨 중 하나를 매칭합니다. @dataclass: 데이터를 담는 클래스를 간단히 만드는 데코레이터입니다.__init__을 자동 생성해줘, 속성만 선언하면 됩니다. 타입 힌트(18장)와 함께 쓰면 "어떤 데이터인지" 명확합니다.
📎@dataclass는 이 책에서 처음 나옵니다.class LogEntry:위에 붙이면,timestamp·level·message를 받는__init__이 자동으로 만들어집니다. "데이터를 담기만 하는 클래스"를 짧게 쓰는 표준 도구입니다. (중급 11장의__init__을 손으로 안 써도 되는 셈입니다.)
매칭이 실패하면(형식이 다른 줄) None을 돌려줍니다(int | None 타입 힌트, 18장).
print(parse_line("2025-01-01 12:00:00 INFO 정상")) # LogEntry(...) print(parse_line("형식 안 맞는 줄")) # None
2단계: 파일 전체 파싱
파일을 줄 단위로 읽어(16장), 파싱 성공한 것만 모읍니다.
def parse_file(path: str) -> list[LogEntry]: entries = [] with open(path, encoding="utf-8") as f: # 16장 with for line in f: # 줄 단위 순회 entry = parse_line(line) if entry: # None이 아니면 (파싱 성공) entries.append(entry) return entries entries = parse_file("sample.log") print(len(entries)) # 10
3단계: 집계와 분석
이제 26장의 자료구조로 통계를 냅니다. Counter는 빈도 세기에 최적입니다(15장 collections).
from collections import Counter def level_counts(entries: list[LogEntry]) -> dict: return dict(Counter(e.level for e in entries)) # 제너레이터 표현식 print(level_counts(entries)) # {'INFO': 4, 'WARNING': 2, 'ERROR': 3, 'CRITICAL': 1}
Counter(e.level for e in entries)는 각 로그의 레벨을 세서 빈도 딕셔너리를 만듭니다(14장 제너레이터 표현식 + 15장 Counter).
오류만 추려내는 것은 컴프리헨션(중급 13장)입니다.
def errors_only(entries: list[LogEntry]) -> list[LogEntry]: return [e for e in entries if e.level in ("ERROR", "CRITICAL")] for e in errors_only(entries): print(f"[{e.level}] {e.message}") # [ERROR] DB 연결 실패 host=db01 # [ERROR] 파일 없음 path=/tmp/x.txt # [ERROR] DB 연결 실패 host=db02 # [CRITICAL] 시스템 다운
e.level in ("ERROR", "CRITICAL")로 두 레벨을 한 번에 거릅니다(26장에서 본 in, 여기선 작은 튜플이라 빠름).
4단계: 시간 분석
23장의 datetime으로 로그 기간을 계산합니다.
def time_span(entries: list[LogEntry]): if not entries: return None times = [e.timestamp for e in entries] return min(times), max(times) start, end = time_span(entries) print(f"기간: {start} ~ {end}") print(f"지속: {end - start}") # timedelta (23장) # 기간: 2025-12-31 09:15:23 ~ 2025-12-31 10:01:15 # 지속: 0:45:52
end - start가 timedelta를 주는 것(23장)을 활용해 전체 지속 시간을 구했습니다. 시간대(hour)별 분포는 defaultdict로 셉니다.
from collections import defaultdict def by_hour(entries: list[LogEntry]) -> dict: counts = defaultdict(int) # 없는 키는 0으로 시작 for e in entries: counts[e.timestamp.hour] += 1 return dict(counts) print(by_hour(entries)) # {9: 9, 10: 1}
📎defaultdict(int)는 "없는 키를 조회하면 자동으로 0을 만드는" 딕셔너리입니다(15장collections). 일반 딕셔너리라면counts[h] = counts.get(h, 0) + 1로 써야 할 것을,defaultdict덕에counts[h] += 1로 간단해집니다. 빈도 집계의 단골 도구입니다.
5단계: 보고서 출력
집계 결과를 보기 좋게 출력합니다. 레벨별 비율을 막대로 표시해 봅시다(초급 f-string·반복문).
def print_report(entries: list[LogEntry]) -> None: counts = level_counts(entries) total = sum(counts.values()) print(f"총 {total}건의 로그") print("-" * 30) for level, cnt in sorted(counts.items()): pct = cnt / total * 100 bar = "#" * int(pct / 5) print(f" {level:8} {cnt:2}건 ({pct:4.1f}%) {bar}") print_report(entries)
실행 결과:
총 10건의 로그 ------------------------------ CRITICAL 1건 (10.0%) ## ERROR 3건 (30.0%) ###### INFO 4건 (40.0%) ######## WARNING 2건 (20.0%) ####
{level:8}(왼쪽 정렬 폭 8), {pct:4.1f}(소수점 1자리) 같은 f-string 서식(초급 4장)으로 표를 가지런히 맞췄습니다.
돌아보기: 무엇을 했나
작은 분석기 하나에 고급편의 핵심이 모두 들어갔습니다.
flowchart TD
File["로그 파일"]:::data --> Parse["정규식 파싱<br/>21장"]:::a
Parse --> Entry["LogEntry 객체들<br/>dataclass·타입힌트"]:::a
Entry --> Time["시간 분석<br/>23장 datetime"]:::a
Entry --> Count["집계<br/>26장 Counter·defaultdict"]:::a
Entry --> Filter["오류 필터<br/>컴프리헨션"]:::a
Time --> Report["📊 분석 보고서"]:::core
Count --> Report
Filter --> Report
classDef data fill:#a8dadc,stroke:#457b9d,color:#1d3557
classDef a fill:#cfe8ee,stroke:#2a9d8f,color:#14532d
classDef core fill:#b8e6c1,stroke:#34a853,color:#14532d
이것이 실전 데이터 처리의 전형입니다. 읽고(파일) → 구조화하고(정규식) → 집계하고(자료구조) → 보고(출력). 도메인만 바뀔 뿐, 이 흐름은 데이터 분석·로그 모니터링·텍스트 처리 어디에나 적용됩니다.
🧪 직접 확장해보기
이 분석기를 발판으로 다음 기능을 추가해 보세요. 정답은 없습니다.
확장 1. messages_by_level(entries, level) 함수를 만들어, 특정 레벨의 메시지만 리스트로 돌려주세요. (컴프리헨션)
확장 2. 가장 자주 등장한 에러 메시지의 첫 단어를 찾으세요. (Counter의 .most_common(1) 활용)
확장 3. 22장 로깅을 결합해, 분석 중 파싱 실패한 줄을 logger.warning으로 기록하세요.
확장 4. user=alice 같은 부분에서 사용자 이름을 정규식으로 추출해, 사용자별 활동 수를 집계하세요. (21장 그룹 + 26장 Counter)
확장 5. 특정 시간 범위(예: 9시~10시)의 로그만 거르는 함수를 만드세요. (23장 datetime 비교)
<details>
<summary>💡 확장 2·4 예시 답안</summary>
from collections import Counter import re # 확장 2: 가장 흔한 에러의 첫 단어 def top_error_keyword(entries): errors = [e for e in entries if e.level in ("ERROR", "CRITICAL")] words = Counter(e.message.split()[0] for e in errors) return words.most_common(1) # [('DB', 2)] # 확장 4: 사용자별 활동 집계 def user_activity(entries): counts = Counter() for e in entries: m = re.search(r"user=(\w+)", e.message) if m: counts[m.group(1)] += 1 return dict(counts) # {'alice': 2, 'bob': 1}
</details>
이 장에서 배운 것
- 실전 데이터 처리는 읽기 → 구조화 → 집계 → 보고의 흐름이다. 고급편 개념이 한 도구에 통합된다.
- 정규식(명명 그룹)으로 로그를 파싱하고,
@dataclass로 구조화하고,Counter·defaultdict로 집계한다. - datetime으로 기간·시간대를 분석하고, 컴프리헨션으로 필터링한다.
- 도메인이 바뀌어도 이 흐름은 로그 분석·데이터 처리·모니터링 어디에나 적용된다.
◀️ 이전 장: 27. 이진 트리 | ▶️ 다음: 98. 고급 용어 보강