본문 바로가기
카테고리 없음

텔레그램 공지 채널 파싱 (feat. upbit)

by 조기정 2025. 11. 28.

업비트 신규 상장 공지를 텔레그램으로 파싱해 자동 감지하는 구조 정리

 

업비트 신규 상장(또는 신규 거래지원) 공지는 상장 직후 유동성이 집중되기 때문에, 단순 알림 수준을 넘어 전략 진입 트리거로 활용할 수 있다. 본 글에서는 텔레그램 채널 upbit_news를 구독하면서, 신규 상장 공지만 필터링해 코인 심볼/마켓 정보를 구조화하는 파싱 모듈을 정리한다.

1. 전체 구조 개요

구현된 모듈의 역할은 다음 네 가지로 요약된다.

  1. 텔레그램 클라이언트로 upbit_news 채널 메시지 수신
  2. 메시지 텍스트에서 신규 상장/신규 거래지원 공지만 필터링
  3. 공지 본문에서
    • 코인 이름
    • 심볼
    • 상장 마켓(KRW/BTC/USDT)
    • 공지 URL
      을 정규표현식으로 추출해 구조화
  4. 추출 결과를 메모리 버퍼와 파일로 저장해, 이후 전략 로직에서 활용 가능하도록 제공

핵심 구성 요소는 다음과 같다.

  • Telethon 기반 TelegramClient
  • 정규표현식 기반 파싱 함수 parse_upbit_announcement
  • 최근 N개 알림을 저장하는 MEM(deque)
  • 마지막 처리 메시지 ID를 기록하는 last_id.txt
  • 알림 원문 로그 파일 upbit_alerts.txt

2. 텔레그램 클라이언트 초기화

텔레그램 세션과 상태는 환경 변수와 로컬 파일을 조합해 관리한다.

 
API_ID   = int(os.getenv("TG_API_ID", "---"))
API_HASH = os.getenv("TG_API_HASH", "------")
PHONE    = os.getenv("TG_PHONE", "+----")
CHANNEL  = os.getenv("TG_CHANNEL", "upbit_news")

SESS_TXT = os.getenv("TG_SESSION_STR_PATH", "tg_session.txt")  # StringSession 저장 파일
STATE    = os.getenv("TG_STATE_PATH", "last_id.txt")           # 마지막 처리 ID
ALERT_TXT= os.getenv("TG_ALERT_TXT", "upbit_alerts.txt")       # 알람 원문 저장
MEM_MAX  = int(os.getenv("TG_MEM_MAX", "500"))

메모리 버퍼는 다음처럼 정의된다.

 
from collections import deque

# rec: {'name','symbol','markets','url','src_id','src_date','raw'}
MEM = deque(maxlen=MEM_MAX)

이 버퍼에 신규 상장 알림을 구조화된 dict 형태로 push해 두면, 이후 다른 모듈에서 get_mem_latest()로 최근 상장 내역을 바로 참조할 수 있다.


3. 신규 상장 공지 판별 로직

업비트 공지 채널에는 신규 상장뿐 아니라 투자유의 지정, 거래지원 종료 등 다양한 공지가 섞여 있다. 따라서 먼저 “신규 상장/거래지원 공지”만 골라내는 필터가 필요하다.

3.1 포지티브/네거티브 키워드 정의

 
# URL 추출
_URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE)

# "빅타임 (BIGTIME)" 형태의 이름/심볼 패턴
_NAME_SYMBOL_RE = re.compile(r'([가-힣A-Za-z0-9·\-\s]+?)\s*\(([A-Z0-9\-]{2,15})\)')

# 마켓 토큰(KRW/BTC/USDT)
_MARKET_TOKEN_RE = re.compile(r'\b(KRW|BTC|USDT)\b')

# 신규 상장/거래지원 포지티브 키워드
_LISTING_POS_RE = re.compile(
    r'('
    r'신규\s*(디지털\s*자산\s*)?(거래지원|상장)'
    r'|디지털\s*자산\s*추가'
    r'|KRW\s*마켓\s*디지털\s*자산\s*추가'
    r'|BTC\s*마켓\s*디지털\s*자산\s*추가'
    r'|USDT\s*마켓\s*디지털\s*자산\s*추가'
    r')',
    re.IGNORECASE
)

# 거래지원 종료/유의 지정 네거티브 키워드
_LISTING_NEG_RE = re.compile(
    r'('
    r'거래지원\s*(종료|중단)'
    r'|상장\s*(폐지|폐지\s*안내)'
    r'|투자\s*유의'
    r'|유의\s*(지정|안내)'
    r')',
    re.IGNORECASE
)

3.2 상장 공지 여부 판별 함수

 
def _is_new_listing(text: str) -> bool:
    """업비트 신규 상장/신규 거래지원 공지 여부 판단."""
    if not text:
        return False
    if _LISTING_NEG_RE.search(text):
        return False
    return bool(_LISTING_POS_RE.search(text))
  • 내용이 비어 있으면 바로 제외.
  • 먼저 네거티브 키워드(거래지원 종료, 상장 폐지, 투자유의 등)가 포함되면 제외.
  • 그 다음 포지티브 키워드(신규 상장/거래지원/디지털 자산 추가 등)가 포함되면 신규 상장 공지로 간주.

4. 공지 본문 파싱: 이름, 심볼, 마켓, URL 추출

실제 파싱은 parse_upbit_announcement()에서 처리한다.

 
from typing import List, Dict, Optional

def parse_upbit_announcement(text: str) -> List[Dict[str, Optional[str]]]:
    """
    업비트 텍스트에서 '신규 상장/신규 거래지원' 디지털 자산 리스트 파싱.

    반환 예:
    [
        {
            "name": "빅타임",
            "symbol": "BIGTIME",
            "markets": ["KRW", "BTC"],
            "url": "https://upbit.com/service_center/notice?id=...",
        },
        ...
    ]
    """
    if not _is_new_listing(text):
        return []

    # 1) 공지 URL 추출
    urls = _URL_RE.findall(text)
    url = next((u for u in urls if "upbit.com" in u), urls[0] if urls else None)

    # 2) 상장 마켓(KRW/BTC/USDT) 추출
    markets = list(dict.fromkeys(_MARKET_TOKEN_RE.findall(text)))

    # 3) 이름/심볼 패턴 추출
    items: List[Dict[str, Optional[str]]] = []
    for name, symbol in _NAME_SYMBOL_RE.findall(text):
        nm = name.strip(" \n\t-:·")
        if not nm:
            continue
        if "공지사항" in nm or "확인하기" in nm:
            continue

        items.append(
            {
                "name": nm,
                "symbol": symbol.strip(),
                "markets": markets or None,
                "url": url,
            }
        )

    # 4) 심볼 기준으로 dedup
    out: Dict[str, Dict[str, Optional[str]]] = {}
    for it in items:
        out[it["symbol"]] = it

    return list(out.values())

4.1 예시 입력과 출력

예를 들어 공지 내용이 다음과 같다고 가정한다.

[업비트] KRW 마켓 디지털 자산 추가 (빅타임(BIGTIME))
신규 디지털 자산 거래지원을 안내드립니다.

자세히 보기: https://upbit.com/service_center/notice?id=12345

이 텍스트를 parse_upbit_announcement(text)에 넣으면 대략 다음과 같은 구조의 리스트가 반환된다.

[
    {
        "name": "빅타임",
        "symbol": "BIGTIME",
        "markets": ["KRW"],
        "url": "https://upbit.com/service_center/notice?id=12345",
    }
]

이 정보만 있으면 이후 단계에서:

  • symbol → 바이낸스/선물 심볼로 매핑 (예: BIGTIMEUSDT)
  • markets → 업비트 상장 마켓 기준 전략 분기
  • url → 사용자 알림/로그 링크 제공

등을 바로 적용할 수 있다.


5. 상태 관리: last_id, 세션, 알림 로그

5.1 마지막 메시지 ID 저장

중복 처리를 방지하기 위해 마지막으로 처리한 텔레그램 메시지 ID를 파일에 저장한다.

 
def _load_last_id(path: str) -> int:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return int(f.read().strip() or 0)
    except Exception:
        return 0


def _save_last_id(path: str, last_id: int) -> None:
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(str(last_id))
    os.replace(tmp, path)

이 값은 폴링 루프 시작 시 읽어오고, 매번 최신 메시지까지 처리한 후 다시 파일에 기록한다.

5.2 텔레그램 세션 문자열 관리

Telethon의 StringSession은 다음과 같이 파일에 보관한다.

 
def _load_last_id(path: str) -> int:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return int(f.read().strip() or 0)
    except Exception:
        return 0


def _save_last_id(path: str, last_id: int) -> None:
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(str(last_id))
    os.replace(tmp, path)

최초 1회 콘솔에서 인증 후, 세션 문자열을 파일에 저장해 두고 이후부터는 동일 세션으로 자동 접속한다.

5.3 알림 원문 로그

원문 텍스트는 별도로 파일에 적재해 두어, 나중에 파싱 로직 검증이나 리플레이에 사용할 수 있다.

 
def _append_alert_text(path: str, when_iso: str, raw: str) -> None:
    with open(path, "a", encoding="utf-8") as f:
        f.write("[%s]\n%s\n\n" % (when_iso, raw))

6. 비동기 폴링 루프: run_forever_async

실제 동작은 비동기 루프에서 이루어진다.

async def run_forever_async(poll_sec: int = 8, batch: int = 150) -> None:
    """
    Jupyter/IPython에서는:
        await run_forever_async()
    로 호출.
    """
    last_id = _load_last_id(STATE)
    sess_str = _load_session_str(SESS_TXT)

    async with TelegramClient(StringSession(sess_str), API_ID, API_HASH) as client:
        # 최초 1회만 콘솔 인증(전화번호/코드/2FA)
        await client.start(phone=PHONE)

        new_str = client.session.save()
        if new_str and new_str != sess_str:
            _save_session_str(SESS_TXT, new_str)

        entity = await client.get_entity(CHANNEL)
        print("[INFO] watching:", CHANNEL, "last_id:", last_id)

        while True:
            try:
                msgs = await client.get_messages(entity, limit=batch)
                if msgs:
                    msgs = [m for m in msgs if isinstance(m, Message)]
                    msgs.sort(key=lambda m: m.id)

                    for msg in msgs:
                        if msg.id <= last_id:
                            continue

                        txt = (msg.text or msg.message or "") if msg else ""
                        parsed = parse_upbit_announcement(txt)

                        if parsed:
                            for it in parsed:
                                rec = {
                                    **it,
                                    "src_id": msg.id,
                                    "src_date": msg.date.isoformat(),
                                    "raw": txt,
                                }
                                MEM.append(rec)
                                _append_alert_text(ALERT_TXT, rec["src_date"], txt)
                                print(
                                    "[ALERT]",
                                    rec["symbol"],
                                    rec["name"],
                                    rec["markets"],
                                    rec["src_date"],
                                )

                        last_id = max(last_id, msg.id)

                    _save_last_id(STATE, last_id)

                await asyncio.sleep(poll_sec)

            except KeyboardInterrupt:
                print("\n[INFO] stopped.")
                break
            except Exception as e:
                print("[WARN]", type(e).__name__, str(e))
                await asyncio.sleep(poll_sec)

동작 순서는 다음과 같다.

  1. last_id와 텔레그램 세션 문자열 로드
  2. TelegramClient 컨텍스트 진입 및 start()
  3. CHANNEL(예: upbit_news) 엔티티 조회
  4. while True 루프에서 주기적으로 메시지 배치 조회
  5. 각 메시지에 대해
    • 이전에 처리한 ID 이하면 스킵
    • 텍스트 추출 → parse_upbit_announcement() 호출
    • 신규 상장 공지가 파싱되면 MEM과 ALERT_TXT에 기록
    • last_id 업데이트 및 파일 저장
  6. 예외 발생 시 경고 출력 후 일정 시간 대기

실제 런타임 환경에서는:

 
import asyncio
asyncio.run(run_forever_async(poll_sec=8, batch=150))

와 같이 호출하면, 업비트 신규 상장 공지가 올라올 때마다 콘솔에 [ALERT] SYMBOL 이름 [마켓] 시간 형태의 로그가 출력되며, 동시에 구조화 데이터가 MEM과 로그 파일에 누적된다.

 

== 전체 코드 ==

#=============== 텔레그램 변수 부분 ===============
# -*- coding: utf-8 -*-
import os
import re
import asyncio
from collections import deque
from typing import List, Dict, Optional

from telethon import TelegramClient
from telethon.sessions import StringSession
from telethon.tl.custom.message import Message

# === 환경 변수 ===
API_ID   = int(os.getenv("TG_API_ID", "----"))
API_HASH = os.getenv("TG_API_HASH", "----")
PHONE    = os.getenv("TG_PHONE", "+8210--------")              # 전화번호
CHANNEL  = os.getenv("TG_CHANNEL", "upbit_news")               # 감시 채널(@ 없이 username)
SESS_TXT = os.getenv("TG_SESSION_STR_PATH", "tg_session.txt")  # StringSession 저장 파일
STATE    = os.getenv("TG_STATE_PATH", "last_id.txt")           # 마지막 처리 ID 저장
ALERT_TXT= os.getenv("TG_ALERT_TXT", "upbit_alerts.txt")       # 알람 원문 저장 파일
MEM_MAX  = int(os.getenv("TG_MEM_MAX", "500"))
#=============== ---------------- ===============


# === 메모리 버퍼 ===
# rec: {'name','symbol','markets','url','src_id','src_date','raw'}
MEM = deque(maxlen=MEM_MAX)

# === 파싱 로직 ===
_URL_RE = re.compile(r'https?://[^\s)]+', re.IGNORECASE)
_NAME_SYMBOL_RE = re.compile(r'([가-힣A-Za-z0-9·\-\s]+?)\s*\(([A-Z0-9\-]{2,15})\)')
_MARKET_TOKEN_RE = re.compile(r'\b(KRW|BTC|USDT)\b')

# 신규 상장/거래지원 포지티브 키워드
_LISTING_POS_RE = re.compile(
    r'('
    r'신규\s*(디지털\s*자산\s*)?(거래지원|상장)'
    r'|디지털\s*자산\s*추가'
    r'|KRW\s*마켓\s*디지털\s*자산\s*추가'
    r'|BTC\s*마켓\s*디지털\s*자산\s*추가'
    r'|USDT\s*마켓\s*디지털\s*자산\s*추가'
    r')',
    re.IGNORECASE
)

# 종료/유의 공지 네거티브 키워드
_LISTING_NEG_RE = re.compile(
    r'('
    r'거래지원\s*(종료|중단)'
    r'|상장\s*(폐지|폐지\s*안내)'
    r'|투자\s*유의'
    r'|유의\s*(지정|안내)'
    r')',
    re.IGNORECASE
)


def _is_new_listing(text: str) -> bool:
    """업비트 신규 상장/신규 거래지원 공지 여부 판단."""
    if not text:
        return False
    if _LISTING_NEG_RE.search(text):
        return False
    return bool(_LISTING_POS_RE.search(text))


def parse_upbit_announcement(text: str) -> List[Dict[str, Optional[str]]]:
    """
    업비트 텍스트에서 '신규 상장/신규 거래지원' 디지털 자산 리스트 파싱.

    반환 예:
    [
        {
            "name": "빅타임",
            "symbol": "BIGTIME",
            "markets": ["KRW", "BTC"],
            "url": "https://upbit.com/service_center/notice?id=...",
            ...
        },
        ...
    ]
    """
    if not _is_new_listing(text):
        return []

    # URL
    urls = _URL_RE.findall(text)
    url = next((u for u in urls if "upbit.com" in u), urls[0] if urls else None)

    # 마켓(KRW/BTC/USDT)
    markets = list(dict.fromkeys(_MARKET_TOKEN_RE.findall(text)))

    # 이름(심볼)
    items: List[Dict[str, Optional[str]]] = []
    for name, symbol in _NAME_SYMBOL_RE.findall(text):
        nm = name.strip(" \n\t-:·")
        if not nm:
            continue
        if "공지사항" in nm or "확인하기" in nm:
            continue
        items.append(
            {
                "name": nm,
                "symbol": symbol.strip(),
                "markets": markets or None,
                "url": url,
            }
        )

    # 심볼 기준 dedup
    out: Dict[str, Dict[str, Optional[str]]] = {}
    for it in items:
        out[it["symbol"]] = it
    return list(out.values())


# === 파일 유틸 ===
def _load_last_id(path: str) -> int:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return int(f.read().strip() or 0)
    except Exception:
        return 0


def _save_last_id(path: str, last_id: int) -> None:
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(str(last_id))
    os.replace(tmp, path)


def _load_session_str(path: str) -> Optional[str]:
    try:
        with open(path, "r", encoding="utf-8") as f:
            s = f.read().strip()
            return s or None
    except Exception:
        return None


def _save_session_str(path: str, s: str) -> None:
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        f.write(s)
    os.replace(tmp, path)


def _append_alert_text(path: str, when_iso: str, raw: str) -> None:
    with open(path, "a", encoding="utf-8") as f:
        f.write("[%s]\n%s\n\n" % (when_iso, raw))


# === 외부 확인용 함수 ===
def get_mem_latest(n: int = 20) -> List[Dict]:
    n = max(1, min(n, MEM_MAX))
    return list(MEM)[-n:]


def clear_mem() -> None:
    MEM.clear()


# === 비동기 폴링 루프 ===
async def run_forever_async(poll_sec: int = 8, batch: int = 150) -> None:
    """
    Jupyter/IPython에서는:
        await run_forever_async()
    로 호출.
    """
    last_id = _load_last_id(STATE)
    sess_str = _load_session_str(SESS_TXT)

    async with TelegramClient(StringSession(sess_str), API_ID, API_HASH) as client:
        # 최초 1회만 콘솔 인증(전화번호/코드/2FA)
        await client.start(phone=PHONE)

        new_str = client.session.save()
        if new_str and new_str != sess_str:
            _save_session_str(SESS_TXT, new_str)

        entity = await client.get_entity(CHANNEL)
        print("[INFO] watching:", CHANNEL, "last_id:", last_id)

        while True:
            try:
                msgs = await client.get_messages(entity, limit=batch)
                if msgs:
                    msgs = [m for m in msgs if isinstance(m, Message)]
                    msgs.sort(key=lambda m: m.id)

                    for msg in msgs:
                        if msg.id <= last_id:
                            continue

                        # 5) 메시지 텍스트 추출
                        txt = (msg.text or msg.message or "") if msg else ""
                        parsed = parse_upbit_announcement(txt)

                        if parsed:
                            for it in parsed:
                                rec = {
                                    **it,
                                    "src_id": msg.id,
                                    "src_date": msg.date.isoformat(),
                                    "raw": txt,
                                }
                                MEM.append(rec)
                                _append_alert_text(ALERT_TXT, rec["src_date"], txt)
                                print(
                                    "[ALERT]",
                                    rec["symbol"],
                                    rec["name"],
                                    rec["markets"],
                                    rec["src_date"],
                                )

                        last_id = max(last_id, msg.id)

                    _save_last_id(STATE, last_id)

                await asyncio.sleep(poll_sec)

            except KeyboardInterrupt:
                print("\n[INFO] stopped.")
                break
            except Exception as e:
                print("[WARN]", type(e).__name__, str(e))
                await asyncio.sleep(poll_sec)