업비트 신규 상장 공지를 텔레그램으로 파싱해 자동 감지하는 구조 정리
업비트 신규 상장(또는 신규 거래지원) 공지는 상장 직후 유동성이 집중되기 때문에, 단순 알림 수준을 넘어 전략 진입 트리거로 활용할 수 있다. 본 글에서는 텔레그램 채널 upbit_news를 구독하면서, 신규 상장 공지만 필터링해 코인 심볼/마켓 정보를 구조화하는 파싱 모듈을 정리한다.
1. 전체 구조 개요
구현된 모듈의 역할은 다음 네 가지로 요약된다.
- 텔레그램 클라이언트로 upbit_news 채널 메시지 수신
- 메시지 텍스트에서 신규 상장/신규 거래지원 공지만 필터링
- 공지 본문에서
- 코인 이름
- 심볼
- 상장 마켓(KRW/BTC/USDT)
- 공지 URL
을 정규표현식으로 추출해 구조화
- 추출 결과를 메모리 버퍼와 파일로 저장해, 이후 전략 로직에서 활용 가능하도록 제공
핵심 구성 요소는 다음과 같다.
- Telethon 기반 TelegramClient
- 정규표현식 기반 파싱 함수 parse_upbit_announcement
- 최근 N개 알림을 저장하는 MEM(deque)
- 마지막 처리 메시지 ID를 기록하는 last_id.txt
- 알림 원문 로그 파일 upbit_alerts.txt
2. 텔레그램 클라이언트 초기화
텔레그램 세션과 상태는 환경 변수와 로컬 파일을 조합해 관리한다.
API_ID = int(os.getenv("TG_API_ID", "---"))
API_HASH = os.getenv("TG_API_HASH", "------")
PHONE = os.getenv("TG_PHONE", "+----")
CHANNEL = os.getenv("TG_CHANNEL", "upbit_news")
SESS_TXT = os.getenv("TG_SESSION_STR_PATH", "tg_session.txt") # StringSession 저장 파일
STATE = os.getenv("TG_STATE_PATH", "last_id.txt") # 마지막 처리 ID
ALERT_TXT= os.getenv("TG_ALERT_TXT", "upbit_alerts.txt") # 알람 원문 저장
MEM_MAX = int(os.getenv("TG_MEM_MAX", "500"))
메모리 버퍼는 다음처럼 정의된다.
from collections import deque
# rec: {'name','symbol','markets','url','src_id','src_date','raw'}
MEM = deque(maxlen=MEM_MAX)
이 버퍼에 신규 상장 알림을 구조화된 dict 형태로 push해 두면, 이후 다른 모듈에서 get_mem_latest()로 최근 상장 내역을 바로 참조할 수 있다.
3. 신규 상장 공지 판별 로직
업비트 공지 채널에는 신규 상장뿐 아니라 투자유의 지정, 거래지원 종료 등 다양한 공지가 섞여 있다. 따라서 먼저 “신규 상장/거래지원 공지”만 골라내는 필터가 필요하다.
3.1 포지티브/네거티브 키워드 정의
# URL 추출
_URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE)
# "빅타임 (BIGTIME)" 형태의 이름/심볼 패턴
_NAME_SYMBOL_RE = re.compile(r'([가-힣A-Za-z0-9·\-\s]+?)\s*\(([A-Z0-9\-]{2,15})\)')
# 마켓 토큰(KRW/BTC/USDT)
_MARKET_TOKEN_RE = re.compile(r'\b(KRW|BTC|USDT)\b')
# 신규 상장/거래지원 포지티브 키워드
_LISTING_POS_RE = re.compile(
r'('
r'신규\s*(디지털\s*자산\s*)?(거래지원|상장)'
r'|디지털\s*자산\s*추가'
r'|KRW\s*마켓\s*디지털\s*자산\s*추가'
r'|BTC\s*마켓\s*디지털\s*자산\s*추가'
r'|USDT\s*마켓\s*디지털\s*자산\s*추가'
r')',
re.IGNORECASE
)
# 거래지원 종료/유의 지정 네거티브 키워드
_LISTING_NEG_RE = re.compile(
r'('
r'거래지원\s*(종료|중단)'
r'|상장\s*(폐지|폐지\s*안내)'
r'|투자\s*유의'
r'|유의\s*(지정|안내)'
r')',
re.IGNORECASE
)
3.2 상장 공지 여부 판별 함수
def _is_new_listing(text: str) -> bool:
"""업비트 신규 상장/신규 거래지원 공지 여부 판단."""
if not text:
return False
if _LISTING_NEG_RE.search(text):
return False
return bool(_LISTING_POS_RE.search(text))
- 내용이 비어 있으면 바로 제외.
- 먼저 네거티브 키워드(거래지원 종료, 상장 폐지, 투자유의 등)가 포함되면 제외.
- 그 다음 포지티브 키워드(신규 상장/거래지원/디지털 자산 추가 등)가 포함되면 신규 상장 공지로 간주.
4. 공지 본문 파싱: 이름, 심볼, 마켓, URL 추출
실제 파싱은 parse_upbit_announcement()에서 처리한다.
from typing import List, Dict, Optional
def parse_upbit_announcement(text: str) -> List[Dict[str, Optional[str]]]:
"""
업비트 텍스트에서 '신규 상장/신규 거래지원' 디지털 자산 리스트 파싱.
반환 예:
[
{
"name": "빅타임",
"symbol": "BIGTIME",
"markets": ["KRW", "BTC"],
"url": "https://upbit.com/service_center/notice?id=...",
},
...
]
"""
if not _is_new_listing(text):
return []
# 1) 공지 URL 추출
urls = _URL_RE.findall(text)
url = next((u for u in urls if "upbit.com" in u), urls[0] if urls else None)
# 2) 상장 마켓(KRW/BTC/USDT) 추출
markets = list(dict.fromkeys(_MARKET_TOKEN_RE.findall(text)))
# 3) 이름/심볼 패턴 추출
items: List[Dict[str, Optional[str]]] = []
for name, symbol in _NAME_SYMBOL_RE.findall(text):
nm = name.strip(" \n\t-:·")
if not nm:
continue
if "공지사항" in nm or "확인하기" in nm:
continue
items.append(
{
"name": nm,
"symbol": symbol.strip(),
"markets": markets or None,
"url": url,
}
)
# 4) 심볼 기준으로 dedup
out: Dict[str, Dict[str, Optional[str]]] = {}
for it in items:
out[it["symbol"]] = it
return list(out.values())
4.1 예시 입력과 출력
예를 들어 공지 내용이 다음과 같다고 가정한다.
[업비트] KRW 마켓 디지털 자산 추가 (빅타임(BIGTIME))
신규 디지털 자산 거래지원을 안내드립니다.
…
자세히 보기: https://upbit.com/service_center/notice?id=12345
이 텍스트를 parse_upbit_announcement(text)에 넣으면 대략 다음과 같은 구조의 리스트가 반환된다.
[
{
"name": "빅타임",
"symbol": "BIGTIME",
"markets": ["KRW"],
"url": "https://upbit.com/service_center/notice?id=12345",
}
]
이 정보만 있으면 이후 단계에서:
- symbol → 바이낸스/선물 심볼로 매핑 (예: BIGTIMEUSDT)
- markets → 업비트 상장 마켓 기준 전략 분기
- url → 사용자 알림/로그 링크 제공
등을 바로 적용할 수 있다.
5. 상태 관리: last_id, 세션, 알림 로그
5.1 마지막 메시지 ID 저장
중복 처리를 방지하기 위해 마지막으로 처리한 텔레그램 메시지 ID를 파일에 저장한다.
def _load_last_id(path: str) -> int:
try:
with open(path, "r", encoding="utf-8") as f:
return int(f.read().strip() or 0)
except Exception:
return 0
def _save_last_id(path: str, last_id: int) -> None:
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
f.write(str(last_id))
os.replace(tmp, path)
이 값은 폴링 루프 시작 시 읽어오고, 매번 최신 메시지까지 처리한 후 다시 파일에 기록한다.
5.2 텔레그램 세션 문자열 관리
Telethon의 StringSession은 다음과 같이 파일에 보관한다.
def _load_last_id(path: str) -> int:
try:
with open(path, "r", encoding="utf-8") as f:
return int(f.read().strip() or 0)
except Exception:
return 0
def _save_last_id(path: str, last_id: int) -> None:
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
f.write(str(last_id))
os.replace(tmp, path)
최초 1회 콘솔에서 인증 후, 세션 문자열을 파일에 저장해 두고 이후부터는 동일 세션으로 자동 접속한다.
5.3 알림 원문 로그
원문 텍스트는 별도로 파일에 적재해 두어, 나중에 파싱 로직 검증이나 리플레이에 사용할 수 있다.
def _append_alert_text(path: str, when_iso: str, raw: str) -> None:
with open(path, "a", encoding="utf-8") as f:
f.write("[%s]\n%s\n\n" % (when_iso, raw))
6. 비동기 폴링 루프: run_forever_async
실제 동작은 비동기 루프에서 이루어진다.
async def run_forever_async(poll_sec: int = 8, batch: int = 150) -> None:
"""
Jupyter/IPython에서는:
await run_forever_async()
로 호출.
"""
last_id = _load_last_id(STATE)
sess_str = _load_session_str(SESS_TXT)
async with TelegramClient(StringSession(sess_str), API_ID, API_HASH) as client:
# 최초 1회만 콘솔 인증(전화번호/코드/2FA)
await client.start(phone=PHONE)
new_str = client.session.save()
if new_str and new_str != sess_str:
_save_session_str(SESS_TXT, new_str)
entity = await client.get_entity(CHANNEL)
print("[INFO] watching:", CHANNEL, "last_id:", last_id)
while True:
try:
msgs = await client.get_messages(entity, limit=batch)
if msgs:
msgs = [m for m in msgs if isinstance(m, Message)]
msgs.sort(key=lambda m: m.id)
for msg in msgs:
if msg.id <= last_id:
continue
txt = (msg.text or msg.message or "") if msg else ""
parsed = parse_upbit_announcement(txt)
if parsed:
for it in parsed:
rec = {
**it,
"src_id": msg.id,
"src_date": msg.date.isoformat(),
"raw": txt,
}
MEM.append(rec)
_append_alert_text(ALERT_TXT, rec["src_date"], txt)
print(
"[ALERT]",
rec["symbol"],
rec["name"],
rec["markets"],
rec["src_date"],
)
last_id = max(last_id, msg.id)
_save_last_id(STATE, last_id)
await asyncio.sleep(poll_sec)
except KeyboardInterrupt:
print("\n[INFO] stopped.")
break
except Exception as e:
print("[WARN]", type(e).__name__, str(e))
await asyncio.sleep(poll_sec)
동작 순서는 다음과 같다.
- last_id와 텔레그램 세션 문자열 로드
- TelegramClient 컨텍스트 진입 및 start()
- CHANNEL(예: upbit_news) 엔티티 조회
- while True 루프에서 주기적으로 메시지 배치 조회
- 각 메시지에 대해
- 이전에 처리한 ID 이하면 스킵
- 텍스트 추출 → parse_upbit_announcement() 호출
- 신규 상장 공지가 파싱되면 MEM과 ALERT_TXT에 기록
- last_id 업데이트 및 파일 저장
- 예외 발생 시 경고 출력 후 일정 시간 대기
실제 런타임 환경에서는:
import asyncio
asyncio.run(run_forever_async(poll_sec=8, batch=150))
와 같이 호출하면, 업비트 신규 상장 공지가 올라올 때마다 콘솔에 [ALERT] SYMBOL 이름 [마켓] 시간 형태의 로그가 출력되며, 동시에 구조화 데이터가 MEM과 로그 파일에 누적된다.
== 전체 코드 ==
#=============== 텔레그램 변수 부분 ===============
# -*- coding: utf-8 -*-
import os
import re
import asyncio
from collections import deque
from typing import List, Dict, Optional
from telethon import TelegramClient
from telethon.sessions import StringSession
from telethon.tl.custom.message import Message
# === 환경 변수 ===
API_ID = int(os.getenv("TG_API_ID", "----"))
API_HASH = os.getenv("TG_API_HASH", "----")
PHONE = os.getenv("TG_PHONE", "+8210--------") # 전화번호
CHANNEL = os.getenv("TG_CHANNEL", "upbit_news") # 감시 채널(@ 없이 username)
SESS_TXT = os.getenv("TG_SESSION_STR_PATH", "tg_session.txt") # StringSession 저장 파일
STATE = os.getenv("TG_STATE_PATH", "last_id.txt") # 마지막 처리 ID 저장
ALERT_TXT= os.getenv("TG_ALERT_TXT", "upbit_alerts.txt") # 알람 원문 저장 파일
MEM_MAX = int(os.getenv("TG_MEM_MAX", "500"))
#=============== ---------------- ===============
# === 메모리 버퍼 ===
# rec: {'name','symbol','markets','url','src_id','src_date','raw'}
MEM = deque(maxlen=MEM_MAX)
# === 파싱 로직 ===
_URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE)
_NAME_SYMBOL_RE = re.compile(r'([가-힣A-Za-z0-9·\-\s]+?)\s*\(([A-Z0-9\-]{2,15})\)')
_MARKET_TOKEN_RE = re.compile(r'\b(KRW|BTC|USDT)\b')
# 신규 상장/거래지원 포지티브 키워드
_LISTING_POS_RE = re.compile(
r'('
r'신규\s*(디지털\s*자산\s*)?(거래지원|상장)'
r'|디지털\s*자산\s*추가'
r'|KRW\s*마켓\s*디지털\s*자산\s*추가'
r'|BTC\s*마켓\s*디지털\s*자산\s*추가'
r'|USDT\s*마켓\s*디지털\s*자산\s*추가'
r')',
re.IGNORECASE
)
# 종료/유의 공지 네거티브 키워드
_LISTING_NEG_RE = re.compile(
r'('
r'거래지원\s*(종료|중단)'
r'|상장\s*(폐지|폐지\s*안내)'
r'|투자\s*유의'
r'|유의\s*(지정|안내)'
r')',
re.IGNORECASE
)
def _is_new_listing(text: str) -> bool:
"""업비트 신규 상장/신규 거래지원 공지 여부 판단."""
if not text:
return False
if _LISTING_NEG_RE.search(text):
return False
return bool(_LISTING_POS_RE.search(text))
def parse_upbit_announcement(text: str) -> List[Dict[str, Optional[str]]]:
"""
업비트 텍스트에서 '신규 상장/신규 거래지원' 디지털 자산 리스트 파싱.
반환 예:
[
{
"name": "빅타임",
"symbol": "BIGTIME",
"markets": ["KRW", "BTC"],
"url": "https://upbit.com/service_center/notice?id=...",
...
},
...
]
"""
if not _is_new_listing(text):
return []
# URL
urls = _URL_RE.findall(text)
url = next((u for u in urls if "upbit.com" in u), urls[0] if urls else None)
# 마켓(KRW/BTC/USDT)
markets = list(dict.fromkeys(_MARKET_TOKEN_RE.findall(text)))
# 이름(심볼)
items: List[Dict[str, Optional[str]]] = []
for name, symbol in _NAME_SYMBOL_RE.findall(text):
nm = name.strip(" \n\t-:·")
if not nm:
continue
if "공지사항" in nm or "확인하기" in nm:
continue
items.append(
{
"name": nm,
"symbol": symbol.strip(),
"markets": markets or None,
"url": url,
}
)
# 심볼 기준 dedup
out: Dict[str, Dict[str, Optional[str]]] = {}
for it in items:
out[it["symbol"]] = it
return list(out.values())
# === 파일 유틸 ===
def _load_last_id(path: str) -> int:
try:
with open(path, "r", encoding="utf-8") as f:
return int(f.read().strip() or 0)
except Exception:
return 0
def _save_last_id(path: str, last_id: int) -> None:
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
f.write(str(last_id))
os.replace(tmp, path)
def _load_session_str(path: str) -> Optional[str]:
try:
with open(path, "r", encoding="utf-8") as f:
s = f.read().strip()
return s or None
except Exception:
return None
def _save_session_str(path: str, s: str) -> None:
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
f.write(s)
os.replace(tmp, path)
def _append_alert_text(path: str, when_iso: str, raw: str) -> None:
with open(path, "a", encoding="utf-8") as f:
f.write("[%s]\n%s\n\n" % (when_iso, raw))
# === 외부 확인용 함수 ===
def get_mem_latest(n: int = 20) -> List[Dict]:
n = max(1, min(n, MEM_MAX))
return list(MEM)[-n:]
def clear_mem() -> None:
MEM.clear()
# === 비동기 폴링 루프 ===
async def run_forever_async(poll_sec: int = 8, batch: int = 150) -> None:
"""
Jupyter/IPython에서는:
await run_forever_async()
로 호출.
"""
last_id = _load_last_id(STATE)
sess_str = _load_session_str(SESS_TXT)
async with TelegramClient(StringSession(sess_str), API_ID, API_HASH) as client:
# 최초 1회만 콘솔 인증(전화번호/코드/2FA)
await client.start(phone=PHONE)
new_str = client.session.save()
if new_str and new_str != sess_str:
_save_session_str(SESS_TXT, new_str)
entity = await client.get_entity(CHANNEL)
print("[INFO] watching:", CHANNEL, "last_id:", last_id)
while True:
try:
msgs = await client.get_messages(entity, limit=batch)
if msgs:
msgs = [m for m in msgs if isinstance(m, Message)]
msgs.sort(key=lambda m: m.id)
for msg in msgs:
if msg.id <= last_id:
continue
# 5) 메시지 텍스트 추출
txt = (msg.text or msg.message or "") if msg else ""
parsed = parse_upbit_announcement(txt)
if parsed:
for it in parsed:
rec = {
**it,
"src_id": msg.id,
"src_date": msg.date.isoformat(),
"raw": txt,
}
MEM.append(rec)
_append_alert_text(ALERT_TXT, rec["src_date"], txt)
print(
"[ALERT]",
rec["symbol"],
rec["name"],
rec["markets"],
rec["src_date"],
)
last_id = max(last_id, msg.id)
_save_last_id(STATE, last_id)
await asyncio.sleep(poll_sec)
except KeyboardInterrupt:
print("\n[INFO] stopped.")
break
except Exception as e:
print("[WARN]", type(e).__name__, str(e))
await asyncio.sleep(poll_sec)