직행과의 산학협력을 진행하면서, 기업 뉴스를 조회하는 새로운 기능을 제안하게 되었다.
특히 제안한 기능은 해당 기업의 PR 부서가 내는 뉴스나 글에 대해서 보여주는 것이 목적이었고, PR 부서의 뉴스들만을 모아서 제공하는 서비스는 따로 존재하지 않았기 때문에 직접 뉴스나 글을 수집하는 파이프라인을 구축해야했다.
우선 최종 구축하게 된 크롤링 파이프라인의 흐름은 아래와 같다.
전체 프로세스를 크롤링, 데이터 적재로 구분하여 구현하게 되었고, 크롤러는 크롤링의 역할을 수행하게 되고, 로더는 정보를 DB에 저장하는 역할을 수행하게 된다.
<크롤러>

<로더>

# 크롤러와 로더로의 분리
우선 가상 면접 사례로 배우는 대규모 시스템 설계 기초 책을 참고했을 때, 크롤러를 설계하는 로직에서 한 곳에서 모든 걸 처리하기보단 큐를 넣고 DB에 적재하는 순간에도 크롤링이 돌아가게 설계하는 것이 좋다라는 것을 보고, DB적재와 크롤링의 역할을 분리해서 구현해야겠다고 생각했다.
또한 개인적인 생각으로도 크롤러와 로더의 역할을 분리하여 CronJob으로 띄워지는 팟의 목적을 달리 두는 것이 팟을 안정적으로 운영하는 데에도 옳은 방향성이라고 생각했다.
# Cloud Functions VS CronJob
이후, 그렇다면 NCP의 Cloud Functions를 통해 주기적으로 실행시킬까도 고민했지만, Cloud Functions의 함수와 외부와 통신하려면, NAT GATEWAY를 무조건 하나 클라우드에 띄워야만 가능하다는 설명이 있었고, 현재 상황에서 NAT GATEWAY를 띄우는 건 비용적으로나 프로젝트의 방향성에 부합하지 않다고 판단했다. 따라서 사용하고 있는 k3s를 적극적으로 사용하기로 결정했고, Cronjob을 통해 해당 크롤링 파이프라인을 구축하게 되었다.
# 구현 내용
├── crawlers # 기업별 크롤러를 저장하는 폴더
│ ├── kakao-r.py
│ ├── lgcns.py
│ ├── naver-o.py
│ ├── 등 크롤러들
│ ├── out # 크롤링 결과 csv 파일을 저장
│ │ ├── kakao
│ │ │ ├── kakao_press.csv
│ │ │ ├── kakao_press.tsv
│ │ │ └── uploaded_manifest.tsv
├── run_all.py # crawlers 폴더 안의 파이썬 파일들에 대해서 실행시키는 파일
└── util
├── month_filter.py # 수집한 기업 글 중 redis에 발행한 데이터를 선별하는 모듈
├── redis_pub.py # redis에 퍼블리싱 하는 모듈과
└── s3.py # s3에 csv를 업로드 하는 모듈
# 크롤러
< 1️⃣ 실행 대상 크롤러 찾기 >
def discover_targets(includes, excludes):
cand = []
for p in CRAWLERS_DIR.glob("*.py"):
...
# 우선순위: *-r.py > *-o.py > *.py
def key(p: Path):
n = p.name
return (0 if n.endswith("-r.py") else 1 if n.endswith("-o.py") else 2, n)
return sorted(cand, key=key)
- crawlers/ 디렉터리 안에서 .py 파일을 스캔하고 --include / --exclude 패턴에 맞추어 필터링을 진행한다.
< 2️⃣ 병렬 실행 컨트롤 >
sem = asyncio.Semaphore(args.concurrency)
async def guard_run(p: Path):
async with sem:
env = base_env.copy()
site = p.stem.split("-")[0]
env["NCP_DEFAULT_DIR"] = f"{args.ncp_prefix}/{site}/{date_tag}"
return await run_one(p, env, args.timeout, args.retries)
tasks = [asyncio.create_task(guard_run(p)) for p in targets]
for t in asyncio.as_completed(tasks):
res = await t
...
- asyncio.Semaphore(args.concurrency)를 통해 동시에 몇 개까지 실행할지 제한한다.
- guard_run() → 크롤러 실행 전 환경변수 설정 (특히 NCP_DEFAULT_DIR 경로 분리)한다.
- asyncio.create_task() 로 실행 태스크 생성한다.
- asyncio.as_completed(tasks) 로 완료된 순서대로 결과 수집한다.
< 3️⃣ 결과 요약 >
print("\n[SUMMARY]")
for r in results:
status = "OK" if r["rc"] == 0 else f"FAIL({r['rc']})"
print(f" - {r['name']:20s} : {status} attempts={r['attempts']}")
print(f"\nTotal: {len(results)}, Success: {len(ok)}, Fail: {len(ng)}")
- 각 크롤러의 성공/실패 여부를 재시도 횟수와 함께 출력한다.
- 전체 실행 결과 집계한다.
# 로더
< Redis에서 DB 로더용 채널을 준비 >
def ensure_groups(r: Redis):
r.xgroup_create(STREAM_COMPLETED, GROUP_COMPLETED, id=GROUP_START_COMPLETED, mkstream=True)
r.xgroup_create(STREAM_RECORDS, GROUP_RECORDS, id=GROUP_START_RECORDS, mkstream=True)
- crawl:records, crawl:completed 스트림을 대상으로 Consumer Group을 생성한다.
- 그룹 단위로 읽으면 여러 Loader 인스턴스가 동시에 실행되더라도 메시지가 중복 소비되지 않는다.
- GROUP_START_RECORDS="0-0"이면 과거 데이터까지 다 읽을 수 있고, >는 새로운 것부터만 읽는다.
< 2️⃣ Redis → MySQL 적재의 메인 로직 >
recs = r.xreadgroup(GROUP_RECORDS, CONSUMER, {STREAM_RECORDS: ">"}, count=BATCH, block=BLOCK_MS) or []
if recs:
_, entries = recs[0]
with db.cursor() as cur:
for mid, f in entries:
records = f.get("records")
if isinstance(records, str):
records = json.loads(records) # JSON 파싱
# fallback: 단건 필드
if not records: ...
# 회사 upsert
cur.execute(UPSERT_COMPANY, (company_name, company_name_kr))
cur.execute(SELECT_COMPANY_ID, (company_name,))
company_id = row[0]
# 뉴스 upsert
for rec in records:
cur.execute(UPDATE_NEWS_BY_URL, (...))
if cur.rowcount == 0:
cur.execute(INSERT_NEWS, (...))
db.commit()
- xreadgroup 으로 crawl:records에서 배치 단위로 메시지를 읽는다.
- JSON 파싱 실패 시 단일 필드(url, title) 기반으로 fallback 처리한다.
- 회사 이름을 기준으로 company 테이블을 업서트한다.
- 같은 URL이 있으면 UPDATE, 없으면 INSERT → company_news 테이블을 업서트한다.
- 처리 후 xack 으로 Redis에 "읽음 처리"
< 3️⃣ 종료 조건 관리 >
if all_finished():
if (time.time() - last_activity) >= QUIET_S:
gi_r = r.xinfo_groups(STREAM_RECORDS)
gi_c = r.xinfo_groups(STREAM_COMPLETED)
if pend_r == 0 and pend_c == 0:
log.info("[EXIT] expected==done + QUIET + pending=0 → 종료")
break
- expected(크롤러가 올린 row_count)와 done(DB에 적재 완료된 개수)을 비교한다.
- QUIET 기간(예: 120초) 동안 새로운 데이터가 없고, pending=0이면 종료한다.
- MAX_RUN_S (최대 실행시간, 기본 1800초)에 도달해도 안전 종료한다.
# 전체 코드
https://github.com/X-Kusitms-1/ZIGHANG_ENTERPRISE_PROJECT_BE.git
GitHub - X-Kusitms-1/ZIGHANG_ENTERPRISE_PROJECT_BE
Contribute to X-Kusitms-1/ZIGHANG_ENTERPRISE_PROJECT_BE development by creating an account on GitHub.
github.com
다음시간에는 해당 코드를 cronjob 리소스를 통해 k3s에 배포하여, 주기적으로 크롤링이 일어나게 하는 작업을 해보겠다.
'Infra' 카테고리의 다른 글
| k3s 마스터 노드 변경하기(1) - 사전 점검 및 datastore 모드 확정 (0) | 2026.02.21 |
|---|---|
| k3s 마스터 노드 변경하기(0) - 배경 및 결론 (1) | 2026.02.21 |
| ArgoCD UI Permission Error 해결 (3) | 2025.08.24 |
| EKS 클러스터 및 노드 그룹 생성, Kubectl 설치 및 연결 (0) | 2025.06.24 |
| 쿠버네티스 리소스 관리 pod 과 deployment (2) | 2025.06.22 |
