← 블로그로 돌아가기

Observable 패턴과 함수형 파이프라인 — 왜 순수 함수가 동시성에서 자유로운가

문제: 상태를 공유하면 깨진다

동시성 프로그래밍에서 가장 흔한 버그는 뭘까? Race condition이다. 여러 작업이 같은 상태를 동시에 읽고 쓰면 결과를 예측할 수 없게 된다.

간단한 예시를 보자. 주식 가격 스트림을 받아서 이동평균을 계산하는 프로그램이다.

import threading

prices = []
moving_avg = 0.0

def on_price(price):
    global moving_avg
    prices.append(price)
    if len(prices) > 5:
        prices.pop(0)
    moving_avg = sum(prices) / len(prices)

# 여러 스레드에서 동시에 호출되면?
threads = [
    threading.Thread(target=on_price, args=(100,)),
    threading.Thread(target=on_price, args=(102,)),
    threading.Thread(target=on_price, args=(98,)),
]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(moving_avg)  # 뭐가 나올지 모른다

prices.append(price) 하고 prices.pop(0) 사이에 다른 스레드가 끼어들 수 있다. len(prices)를 읽는 시점과 sum(prices)를 계산하는 시점 사이에 리스트가 바뀔 수도 있다. 전형적인 shared mutable state 문제다.

해결책? Lock을 건다. 그런데 lock은 또 다른 문제를 낳는다 — deadlock, priority inversion, lock contention… 복잡한 시스템일수록 lock을 올바르게 관리하는 것 자체가 엔지니어링 과제가 된다.

그렇다면 아예 공유하는 상태 자체를 없애면 어떨까?

Observer 패턴: 이벤트를 “밀어넣는” 구조

Observable 패턴을 이해하려면 먼저 Observer 패턴부터 짚고 가자.

전통적인 프로그래밍에서 데이터를 가져오는 방식은 pull 이다.

# pull: 내가 직접 가져온다
data = database.query("SELECT * FROM prices")
for row in data:
    process(row)

하지만 실시간 데이터 스트림은 이렇게 못 한다. 주식 가격이 언제 들어올지 모르는데 계속 polling 할 순 없다. 그래서 push 방식이 필요하다 — 데이터가 오면 알려줘.

# push: 데이터가 오면 콜백이 불린다
price_stream.subscribe(on_price_update)

이게 Observer 패턴의 핵심이다. Subject(데이터 소스)가 Observer(구독자)에게 데이터를 밀어넣는다. 그런데 여기서 문제가 생긴다. 콜백 안에서 외부 상태를 수정하기 시작하면, 아까 봤던 그 shared mutable state 문제가 그대로 돌아온다.

Observable: Observer 패턴 + 함수형 파이프라인

Observable은 Observer 패턴의 진화다. 핵심 아이디어는 이렇다:

콜백 안에서 상태를 직접 바꾸지 말고, 데이터가 흘러가는 파이프라인을 선언적으로 조립하자.

음식점으로 비유하면:

  • Observer 패턴: 주문이 들어오면 주방장이 “알겠습니다!” 하고 자기 머릿속 메모장(=외부 상태)에 적는다. 주문이 동시에 여러 개 들어오면 메모장이 꼬인다.
  • Observable 패턴: 주문서가 컨베이어 벨트를 타고 흘러간다. 각 구간에서 하나의 작업만 수행한다. 주문서 확인 → 재료 준비 → 조리 → 플레이팅. 각 구간은 앞에서 받은 것만 보고 뒤로 넘길 뿐, 별도의 메모장을 공유하지 않는다.

RxPY로 아까 그 이동평균 예시를 다시 짜보자.

import reactivex as rx
from reactivex import operators as ops

price_source = rx.of(100, 102, 98, 105, 97, 103, 99, 101)

price_source.pipe(
    ops.buffer_with_count(count=5, skip=1),  # 최근 5개씩 묶는다
    ops.map(lambda window: sum(window) / len(window)),  # 평균 계산
).subscribe(
    on_next=lambda avg: print(f"이동평균: {avg:.1f}"),
    on_error=lambda e: print(f"에러: {e}"),
    on_completed=lambda: print("완료"),
)
이동평균: 100.4
이동평균: 101.0
이동평균: 100.4
이동평균: 101.0

잠깐, 아까 코드랑 비교해보자.

명령형 (아까) Observable (지금)
상태 prices 리스트, moving_avg 변수 없음
데이터 흐름 콜백이 외부 변수를 직접 수정 파이프라인을 통해 데이터가 변환되며 흘러감
동시성 안전 Lock 필요 파이프라인 내부에 공유 상태가 없으므로 안전

핵심은 외부 상태를 건드리는 코드가 한 줄도 없다는 것이다. buffer_with_count는 입력 스트림에서 윈도우를 잘라서 다음 단계로 넘길 뿐이고, map은 받은 윈도우를 평균값으로 변환해서 넘길 뿐이다. 각 단계가 순수 함수(pure function)다.

순수 함수가 동시성에서 자유로운 이유

순수 함수의 정의는 간단하다:

  1. 같은 입력에는 항상 같은 출력을 낸다.
  2. 외부 상태를 읽지도, 바꾸지도 않는다 (side effect가 없다).

이 두 가지 성질이 동시성에서 왜 중요한지 보자.

# 순수 함수
def calculate_avg(window: list[float]) -> float:
    return sum(window) / len(window)

# 비순수 함수
total = 0
count = 0

def update_avg(price: float) -> float:
    global total, count
    total += price
    count += 1
    return total / count

calculate_avg는 100개의 스레드에서 동시에 호출해도 안전하다. 왜? 각 호출이 자기가 받은 window만 보고 결과를 돌려줄 뿐, 어디에도 쓰지 않으니까. 공유하는 게 없으면 충돌할 일도 없다.

update_avg는? totalcount가 공유 상태다. 두 스레드가 동시에 total += price를 실행하면 값이 씹힌다. 이건 아까 threading 글에서 본 counter += 1 문제와 정확히 같은 구조다.

정리하면:

Race condition이 발생하려면 최소 두 가지가 필요하다: (1) 공유 상태 (2) 그 상태에 대한 동시 쓰기. 순수 함수는 (1)을 원천적으로 제거한다.

실전 예시: 센서 데이터 파이프라인

좀 더 현실적인 예시를 보자. IoT 센서에서 온도 데이터가 들어오는데, 노이즈를 필터링하고, 5초 윈도우로 평균을 내고, 임계값을 넘으면 알림을 보내야 한다.

명령형으로 짜면 이렇게 된다:

import threading
import time

lock = threading.Lock()
readings = []
alert_sent = False

def on_sensor_reading(temp):
    global alert_sent
    with lock:
        readings.append((time.time(), temp))
        # 5초보다 오래된 데이터 제거
        cutoff = time.time() - 5
        while readings and readings[0][0] < cutoff:
            readings.pop(0)
        # 노이즈 필터링 (범위 밖이면 무시)
        valid = [t for _, t in readings if -40 <= t <= 80]
        if not valid:
            return
        avg = sum(valid) / len(valid)
        if avg > 50 and not alert_sent:
            send_alert(f"고온 경고: {avg:.1f}°C")
            alert_sent = True
        elif avg <= 50:
            alert_sent = False

lock도 걸었고, 동작은 한다. 그런데 문제는:

  • readings, alert_sent가 전역 상태다. 함수만 보고는 어떤 상태가 변하는지 추적하기 어렵다.
  • 새로운 요구사항이 추가될 때마다 (예: “최근 1분간 3회 이상 알림이면 에스컬레이션”) lock 안의 코드가 점점 비대해진다.
  • 테스트하려면 전역 상태를 매번 리셋해야 한다.

Observable로 바꾸면:

import reactivex as rx
from reactivex import operators as ops
from reactivex.scheduler import TimeoutScheduler
import time

def create_sensor_pipeline(sensor_source):
    return sensor_source.pipe(
        ops.filter(lambda temp: -40 <= temp <= 80),          # 노이즈 제거
        ops.buffer_with_time(timespan=5.0),                   # 5초 윈도우
        ops.filter(lambda buf: len(buf) > 0),                 # 빈 윈도우 무시
        ops.map(lambda buf: sum(buf) / len(buf)),             # 평균 계산
        ops.distinct_until_changed(lambda avg: avg > 50),     # 상태 변경 시에만
        ops.filter(lambda avg: avg > 50),                     # 임계값 초과만
    )

# 사용
sensor = rx.subject.Subject()
create_sensor_pipeline(sensor).subscribe(
    on_next=lambda avg: send_alert(f"고온 경고: {avg:.1f}°C")
)

# 센서에서 데이터가 들어올 때마다
sensor.on_next(48.5)
sensor.on_next(51.2)
sensor.on_next(52.0)

어떤 차이가 있는지 보자:

  • 전역 변수가 하나도 없다. readings, alert_sent, lock 전부 사라졌다.
  • 각 단계가 독립적인 순수 함수다. filter, map, buffer_with_time 각각이 입력을 받아 출력을 내보낼 뿐이다.
  • 새 요구사항? 파이프라인에 operator 하나만 끼워넣으면 된다.
  • 테스트? 입력 스트림을 만들어서 파이프라인에 넣고 출력을 검증하면 끝이다. 상태 리셋이 필요 없다.
# 에스컬레이션 추가? operator 하나면 된다
create_sensor_pipeline(sensor).pipe(
    ops.buffer_with_time(timespan=60.0),
    ops.filter(lambda alerts: len(alerts) >= 3),
    ops.map(lambda alerts: f"에스컬레이션: 1분간 {len(alerts)}회 고온 경고"),
).subscribe(on_next=escalate)

왜 “파이프라인"인가?

함수형 파이프라인이 동시성에 강한 이유를 한 문장으로 요약하면:

데이터가 파이프를 타고 흐르는 구조에서는, 각 구간이 자기 앞에서 온 데이터만 처리하고 뒤로 넘기면 된다. 옆에서 뭘 하는지 알 필요도 없고, 공유할 것도 없다.

이건 Unix 파이프와 같은 철학이다.

cat sensor.log | grep -v "ERROR" | awk '{print $3}' | sort -n

grepawk의 내부 상태에 접근하는가? 아니다. 각자 stdin에서 읽고 stdout으로 내보낼 뿐이다. 이런 구조에서 race condition이 발생할 여지가 없다.

RxPY의 pipe는 이 Unix 철학을 프로그래밍 언어 수준으로 가져온 것이다:

source.pipe(
    ops.filter(...),    # grep
    ops.map(...),       # awk
    ops.scan(...),      # 상태가 필요하면 scan으로 캡슐화
)

scan: 상태가 꼭 필요할 때

“순수 함수만으로 어떻게 상태를 다루냐"는 당연한 질문이다. 현실의 많은 로직에는 이전 값을 기억해야 하는 경우가 있다. 이럴 때 쓰는 게 scan이다.

# 누적 합계를 계산해야 할 때
# 명령형: 전역 변수
total = 0
def add(x):
    global total
    total += x
    return total

# 함수형: scan
source.pipe(
    ops.scan(lambda acc, x: acc + x, seed=0)
)

scan은 “상태를 파이프라인 안에 가둔다.” 외부에서 접근할 수 없는, 파이프라인 내부의 accumulator다. 상태가 존재하지만, 그 상태의 소유권이 파이프라인에 있기 때문에 동시성 문제가 생기지 않는다. 이전 글에서 Go의 goroutine이 channel을 통해 데이터를 주고받으며 공유 메모리 대신 “메시지 전달"을 하는 것과 비슷한 철학이다:

Don’t communicate by sharing memory; share memory by communicating. — Go Proverb

Observable 파이프라인도 마찬가지다. 상태를 공유하지 말고, 데이터를 흘려보내라.

정리

Observable 패턴의 핵심은 “데이터 스트림을 순수 함수들의 파이프라인으로 처리한다"는 것이다.

이게 동시성에서 강한 이유는 명확하다. Race condition은 공유 상태에 대한 동시 접근에서 발생하는데, 순수 함수 파이프라인에는 공유 상태가 없다. 상태가 필요하면 scan처럼 파이프라인 내부에 캡슐화한다.

명령형 코드에서는 “이 변수를 누가 언제 바꾸는가?“를 추적해야 한다. 함수형 파이프라인에서는 “데이터가 어떤 변환을 거쳐 흘러가는가?“만 보면 된다. lock을 어디에 걸지 고민하는 대신, 파이프라인을 어떻게 조립할지 고민하면 된다.

물론 Observable이 만능은 아니다. 단순한 CRUD 로직에 RxPY를 붙이면 오버엔지니어링이고, 디버깅 시 스택 트레이스가 파이프라인 내부로 들어가면 읽기 까다롭다. 하지만 실시간 데이터 스트림을 다루면서 동시성 안전까지 확보해야 하는 상황이라면, “상태를 공유하지 않는 구조"를 강제하는 Observable 패턴은 확실히 강력한 선택지다.