[실습] 웹에서 라즈베리파이 온습도 센서 모니터링
필기자
2025-05-18 19:22
1,119
0
본문
웹에서 라즈베리파이 온습도 센서 모니터링
목적
목차
1. 파이썬 쓰레드

2. 온습도 센서 모니터링 프론트엔드 및 서버프로그램 개발

[응용 문제: 온도 센서와 RGB LED 자동 제어 실습]
목적
- 파이썬 쓰레드에 대해 이해하고 이론을 정립한다.
- 내 PC에서 웹 프라우져(크롬)으로 라즈베리파이 온습도 센서를 모니터링한다.
목차
1. 파이썬 쓰레드
2. 온습도 센서 모니터링 프론트엔드 및 서버프로그램 개발
2. 온습도 센서 모니터링 프론트엔드 및 서버프로그램 개발
1. 파이썬 쓰레드
- 파이썬 쓰레드 개요
- 하나의 프로그램 안에서 동시에 여러 작업을 수행할 수 있도록 해주는 실행 단위
- 프로그램은 한 번에 하나의 작업만 수행하지만, 쓰레드 사용시 백그라운드에서 센서 값을 병렬로 읽으면서 웹 요청을 처리할 수 있음
- threading 모듈을 사용하여 구현

- 파이썬에서 멀티 쓰레드
- Processus: 하나의 프로세스(프로그램 실행 단위)를 의미함
- 하나의 프로세스 안에 여러 Thread(쓰레드)가 존재
- 쓰레드들은 공통된 전역 변수 공간(Variable globale)을 공유함
- 각 쓰레드는 자신만의 지역 변수(Variable locale)와 코드(Code) 영역을 가짐
- GIL(Global Interpreter Lock) 개념 : 병렬성 제한
- 전역 변수 공유 방식으로 구조 단순화 이해가 쉬움
- GIL
항목 | 내용 |
---|---|
풀네임 | Global Interpreter Lock |
개념 | 파이썬 인터프리터가 한 번에 하나의 쓰레드만 실행하도록 강제하는 전역 락 |
이유 | 메모리 안정성 보장을 위해 참조 카운트 충돌 방지 |
실행 방식 | 진짜 병렬 실행이 아닌, 매우 빠른 시분할(Scheduling)로 쓰레드 전환 |
적용 대상 | CPython (기본 파이썬 구현체)만 해당. Jython, IronPython 등은 GIL 없음 |
- threading vs multiprocessing
항목 | threading | multiprocessing |
---|---|---|
실행 단위 | 하나의 프로세스 내부 쓰레드 | 별도의 독립된 프로세스 |
메모리 | 공유 메모리 공간 사용 | 프로세스마다 메모리 분리 |
GIL 영향 | 있음 (병렬 X) | 없음 (CPU 병렬 O) |
병렬성 | 시분할 유사 병렬 (I/O에 강함) | 진짜 병렬 처리 (CPU 연산에 강함) |
자원 공유 | 전역 변수, 큐 등으로 간단 | Queue, Pipe, Manager 등 명시적 공유 필요 |
속도 | 경량, 빠른 전환 | 무겁고 프로세스 생성 비용 큼 |
안정성 | 하나가 죽으면 전체 영향 | 프로세스 분리로 안정성 높음 |
적합한 작업 | 센서, 네트워크, 이벤트 기반 등 I/O 중심 | 영상 처리, 연산, 대용량 데이터 병렬 연산 등 CPU 중심 |
- threading vs multiprocessing : 데이터 공유 측면 비교
항목 | threading | multiprocessing |
---|---|---|
메모리 공간 | 공유됨 (전역 변수 참조 가능) | 완전히 분리됨 (전역 변수 공유 안 됨) |
공유 방식 | 전역 변수, 리스트, 딕셔너리 직접 사용 가능 | multiprocessing.Queue, Manager, Pipe 등 별도 공유 방식 필요 |
동기화 | Lock, Event등 간단 | Manager().dict(), Value, Array 등에 Lock 지정 필요 |
구현 난이도 | 상대적으로 단순 | 복잡하고 명시적 설계 필요 |
개발 속도 | 빠름 빠른 개발 + 센서 모니터링 + 웹 연동 |
느림 고성능 병렬 계산 + CPU 연산 최적화 |
- threading 모듈 주요 함수 및 메서드
함수/메서드 | 설명 |
---|---|
threading.Thread() | 새로운 쓰레드 객체를 생성 파라미터: target=함수, args=(인자,) |
start() | 쓰레드 실행 시작 (내부적으로 run() 호출) |
run() | 쓰레드가 실행할 함수 내용 (직접 호출 X, 오버라이드용) |
join() | 해당 쓰레드가 끝날 때까지 대기 |
is_alive() | 쓰레드가 실행 중인지 여부 반환 |
daemon = True | 데몬 쓰레드로 설정하면 메인 프로그램 종료 시 함께 종료됨 |
current_thread() | 현재 실행 중인 쓰레드 객체 반환 |
active_count() | 현재 실행 중인 쓰레드 수 반환 |
enumerate() | 모든 활성 쓰레드 리스트 반환 |
Lock() | 쓰레드 간 공유 자원 접근 시 동기화를 위한 락 객체 생성 |
- threading.Thread()
- 사용방식 : threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
인자명 | 자료형 | 설명 |
---|---|---|
group | None 또는 ThreadGroup | 현재는 사용되지 않으며 None으로 설정해야 함 |
target | callable | 스레드가 실행할 함수 |
name | str | 스레드 이름 (생략 시 자동 할당됨) |
args | tuple | target 함수에 전달할 인자 튜플 |
kwargs | dict | target 함수에 전달할 키워드 인자 딕셔너리 |
daemon | bool | True일 경우 데몬 스레드로 실행되어 메인 스레드 종료 시 함께 종료됨 |
- 파이썬 쓰레드 실습
- threading_test.py 생성
import threading
import time
# 전역 변수와 락 생성
counter = 0
lock = threading.Lock() # 쓰레드 간 자원 동기화를 위한 락
# 쓰레드에서 실행할 함수 정의
def worker(name):
global counter
for i in range(10):
with lock: # 락을 사용하여 공유 자원(counter) 접근 보호
counter += 1
print(f"[{threading.current_thread().name}] {name} 작업 중... (카운터={counter})")
time.sleep(1)
# 쓰레드 객체 생성 (target은 함수, args는 인자 튜플)
t1 = threading.Thread(target=worker, args=("스레드1",), name="Thread-1")
t2 = threading.Thread(target=worker, args=("스레드2",), name="Thread-2")
# 데몬 설정 (True로 설정 시 메인 종료와 함께 쓰레드도 종료됨)
t1.daemon = False
t2.daemon = False
# 쓰레드 실행
t1.start()
t2.start()
# 현재 실행 중인 쓰레드 정보 출력
print(f"현재 실행 중인 쓰레드 수: {threading.active_count()}")
print(f"현재 실행 중인 쓰레드 목록: {threading.enumerate()}")
# is_alive()로 쓰레드 실행 여부 확인
print(f"{t1.name} 살아있나? {t1.is_alive()}")
print(f"{t2.name} 살아있나? {t2.is_alive()}")
# join()으로 쓰레드가 종료될 때까지 대기
t1.join()
t2.join()
print("메인 쓰레드 종료됨.")
print(f"최종 카운터 값: {counter}")

2. 온습도 센서 모니터링 프론트엔드 및 서버프로그램 개발
- apaceh2 home 폴더에 프론트프로그램 개발
- /var/www/html 폴더안에 index.html 파일 수정
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>LED 제어 및 온습도 모니터링</title>
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
</head>
<body>
<h3>LED 제어</h3>
<label><input type="radio" name="led" value="on"> ON</label>
<label><input type="radio" name="led" value="off"> OFF</label>
<p id="status">LED 상태: 알 수 없음</p>
<h3>온습도 모니터링</h3>
<p id="th">온도: --℃ / 습도: --%</p>
<script>
const socket = io(location.protocol + '//' + location.hostname + ':5000');
//const socket = io("http://192.168.137.196:5000"); // Flask-SocketIO 서버 주소
// 접속 시 LED 상태 요청 및 온습도 요청
socket.on("connect", function() {
socket.emit("get_led_status");
socket.emit("get_temperature_humidity");
});
// LED 상태 수신
socket.on("led_status", function(data) {
console.log("LED 상태:", data);
$("#status").text("LED 상태: " + data.state);
$("input[name='led'][value='" + data.state + "']").prop("checked", true);
});
// LED 상태 변경 요청
$("input[name='led']").change(function() {
const state = $(this).val();
socket.emit("led_control", { state: state });
});
// 온습도 요청
setInterval(function(){
socket.emit("get_temperature_humidity_status");
}, 2000);
// 온습도 수신
socket.on("temperature_humidity_status", function(data) {
console.log("온습도 수신:", data);
$("#th").text("온도: " + data.temp + "℃ / 습도: " + data.hum + "%");
});
</script>
</body>
</html>
- 라즈베리파이에 온습도 센서 배선


- DHT22 또는 DHT11 라이브러리 설치(기존 설치 안된 경우)
#adafruit-circuitpython-dht 라이브러리 설치
pip install adafruit-circuitpython-dht
- iot 가상환경에 socket.io서버 프로그램 수정
- home/iot/iot_socket.py 파일 수정
- 온습도 센서 측정(쓰레드) & 온습도 센서 전송 메소드 추가
import time
import threading
from flask import Flask, request
from flask_socketio import SocketIO
import RPi.GPIO as GPIO
import board
import adafruit_dht
# --- 글로벌 변수 ---
latest_temp = None
latest_hum = None
# --- GPIO 설정 ---
LED_PIN = 17
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)
# --- Flask-SocketIO 설정 ---
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")
# --- DHT22 센서 설정 ---
dht_device = adafruit_dht.DHT22(board.D18)
# --- LED 상태 반환 함수 ---
def get_led_state():
return "on" if GPIO.input(LED_PIN) else "off"
# --- LED 제어 처리 ---
@socketio.on("led_control")
def control_led(data):
state = data.get("state")
if state == "on":
GPIO.output(LED_PIN, GPIO.HIGH)
elif state == "off":
GPIO.output(LED_PIN, GPIO.LOW)
state = get_led_state()
print(f"led 상태 : {state}")
socketio.emit("led_status", {"state": state})
# --- LED 상태 요청 처리 ---
@socketio.on("get_led_status")
def handle_status_request():
state = get_led_state()
print(f"led 상태 : {state}")
socketio.emit("led_status", {"state": state}, room=request.sid)
# --- 온습도 상태 요청 처리 (유니캐스트) ---
@socketio.on("get_temperature_humidity_status")
def send_temperature_humidity_status():
if latest_temp is not None and latest_hum is not None:
socketio.emit("temperature_humidity_status", {
"temp": latest_temp,
"hum": latest_hum
}, room=request.sid)
else:
socketio.emit("temperature_humidity_status", {
"temp": "N/A",
"hum": "N/A"
}, room=request.sid)
# --- 센서 측정 스레드 ---
def temperature_monitor_thread():
global latest_temp, latest_hum
while True:
try:
temp = dht_device.temperature
hum = dht_device.humidity
if hum is not None and temp is not None:
latest_temp = round(temp, 1)
latest_hum = round(hum, 1)
print(f"센서 측정: {latest_temp}℃ / {latest_hum}%")
else:
print("센서 데이터 없음")
except RuntimeError as e:
print("센서 에러:", e.args[0])
except Exception as e:
dht_device.exit()
raise e
time.sleep(2)
# --- 센서 스레드 시작 ---
def start_sensor_thread():
t = threading.Thread(target=temperature_monitor_thread)
t.daemon = True
t.start()
# --- 메인 ---
if __name__ == "__main__":
print("서버 시작")
start_sensor_thread()
socketio.run(app, host="0.0.0.0", port=5000)
- 클라이언트 실행 화면

- 서버 실행화면

[응용 문제: 온도 센서와 RGB LED 자동 제어 실습]
- 온습도 센서로 온도를 측정한다.
- 파랑(B): GPIO 17, 빨강(R): GPIO 24 핀에 연결된 RGB LED를 제어한다.
- 웹에서 LED를 ON 해야 LED가 동작하도록 한다.
- LED가 ON 상태일 때만 다음 조건에 따라 자동 제어된다
- 온도가 27도 이상이면 빨간색 LED ON
- 온도가 27도 미만이면 파란색 LED ON
- 항상 주변 온도를 주기적으로 확인하여 이 동작을 반복한다.
import time
import threading
from flask import Flask, request
from flask_socketio import SocketIO
import RPi.GPIO as GPIO
import board
import adafruit_dht
# --- 글로벌 변수 ---
latest_temp = None
latest_hum = None
led_enabled = False # 웹에서 LED ON 시에만 제어 활성
# --- GPIO 설정 ---
GPIO.setmode(GPIO.BCM)
RED_PIN = 24
BLUE_PIN = 17
GPIO.setup(RED_PIN, GPIO.OUT)
GPIO.setup(BLUE_PIN, GPIO.OUT)
# --- Flask 및 SocketIO 설정 ---
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*", async_mode="eventlet")
# --- 온습도 센서 설정 ---
dht_device = adafruit_dht.DHT22(board.D18)
# --- RGB LED 제어 함수 ---
def set_rgb(red_on, blue_on):
GPIO.output(RED_PIN, GPIO.HIGH if red_on else GPIO.LOW)
GPIO.output(BLUE_PIN, GPIO.HIGH if blue_on else GPIO.LOW)
# --- 현재 온도 기준으로 RGB 상태 갱신 ---
def update_rgb_by_temperature():
if not led_enabled:
set_rgb(False, False)
return
if latest_temp is None:
print("온도 정보 없음, RGB 유지")
return
if latest_temp >= 27:
set_rgb(True, False) # 빨강 ON
else:
set_rgb(False, True) # 파랑 ON
# --- LED 제어 요청 처리 ---
@socketio.on("led_control")
def control_led(data):
global led_enabled
state = data.get("state")
if state == "on":
led_enabled = True
elif state == "off":
led_enabled = False
print(f"LED 제어 상태: {state}")
socketio.emit("led_status", {"state": state})
update_rgb_by_temperature() # 즉시 RGB 반영
# --- LED 상태 요청 처리 ---
@socketio.on("get_led_status")
def handle_status_request():
state = "on" if led_enabled else "off"
print(f"LED 상태 요청 응답: {state}")
socketio.emit("led_status", {"state": state}, room=request.sid)
# --- 온습도 상태 요청 처리 (유니캐스트) ---
@socketio.on("get_temperature_humidity_status")
def send_temperature_humidity_status():
if latest_temp is not None and latest_hum is not None:
socketio.emit("temperature_humidity_status", {
"temp": latest_temp,
"hum": latest_hum
}, room=request.sid)
else:
socketio.emit("temperature_humidity_status", {
"temp": "N/A",
"hum": "N/A"
}, room=request.sid)
# --- 센서 측정 스레드 ---
def temperature_monitor_thread():
global latest_temp, latest_hum
while True:
try:
temp = dht_device.temperature
hum = dht_device.humidity
if temp is not None and hum is not None:
latest_temp = round(temp, 1)
latest_hum = round(hum, 1)
print(f"센서 측정: {latest_temp}℃ / {latest_hum}%")
update_rgb_by_temperature()
else:
print("센서 데이터 없음")
except RuntimeError as e:
print("센서 에러:", e.args[0])
except Exception as e:
dht_device.exit()
raise e
time.sleep(2)
# --- 센서 스레드 시작 ---
def start_sensor_thread():
t = threading.Thread(target=temperature_monitor_thread)
t.daemon = True
t.start()
# --- 메인 ---
if __name__ == "__main__":
print("서버 시작")
start_sensor_thread()
socketio.run(app, host="0.0.0.0", port=5000)
댓글목록0