|
import os |
|
import google_auth_oauthlib.flow |
|
from google.oauth2.credentials import Credentials |
|
from googleapiclient.discovery import build |
|
from google.auth.transport.requests import Request |
|
import discord |
|
import logging |
|
import re |
|
import asyncio |
|
import subprocess |
|
from huggingface_hub import InferenceClient |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from youtube_transcript_api.formatters import TextFormatter |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
credentials_path = 'JSON_TOKEN.json' |
|
token_path = 'token.json' |
|
auth_code_path = 'auth_code.txt' |
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()]) |
|
|
|
|
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
intents.messages = True |
|
intents.guilds = True |
|
intents.guild_messages = True |
|
|
|
|
|
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus", token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"] |
|
creds = None |
|
|
|
def authorize(): |
|
flow = google_auth_oauthlib.flow.InstalledAppFlow.from_client_secrets_file( |
|
credentials_path, SCOPES) |
|
auth_url, _ = flow.authorization_url(prompt='consent') |
|
print('Please go to this URL and finish the authentication: {}'.format(auth_url)) |
|
|
|
print(f'Enter the authorization code in the file {auth_code_path} and press Enter') |
|
input('Press Enter after saving the authorization code...') |
|
|
|
if not os.path.exists(auth_code_path): |
|
raise FileNotFoundError(f"'{auth_code_path}' ํ์ผ์ด ์กด์ฌํ์ง ์์ต๋๋ค.") |
|
|
|
with open(auth_code_path, 'r') as file: |
|
code = file.read().strip() |
|
|
|
flow.fetch_token(code=code) |
|
creds = flow.credentials |
|
|
|
with open(token_path, 'w') as token: |
|
token.write(creds.to_json()) |
|
|
|
return creds |
|
|
|
|
|
if os.path.exists(token_path): |
|
creds = Credentials.from_authorized_user_file(token_path, SCOPES) |
|
else: |
|
creds = authorize() |
|
|
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
creds = authorize() |
|
|
|
|
|
youtube_service = build('youtube', 'v3', credentials=creds) |
|
|
|
|
|
SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) |
|
|
|
class MyClient(discord.Client): |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.is_processing = False |
|
|
|
async def on_ready(self): |
|
logging.info(f'{self.user}๋ก ๋ก๊ทธ์ธ๋์์ต๋๋ค!') |
|
|
|
|
|
subprocess.Popen(["python", "web.py"]) |
|
logging.info("Web.py server has been started.") |
|
|
|
|
|
channel = self.get_channel(SPECIFIC_CHANNEL_ID) |
|
if channel: |
|
await channel.send("์ ํ๋ธ ๋น๋์ค URL์ ์
๋ ฅํ๋ฉด, ์๋ง๊ณผ ๋๊ธ์ ๊ธฐ๋ฐ์ผ๋ก ๋ต๊ธ์ ์์ฑํฉ๋๋ค.") |
|
|
|
async def on_message(self, message): |
|
if message.author == self.user: |
|
return |
|
if not self.is_message_in_specific_channel(message): |
|
return |
|
if self.is_processing: |
|
return |
|
self.is_processing = True |
|
try: |
|
video_id = extract_video_id(message.content) |
|
if video_id: |
|
transcript = await get_best_available_transcript(video_id) |
|
comments = await get_video_comments(video_id) |
|
if comments and transcript: |
|
replies = await generate_replies(comments, transcript) |
|
await create_thread_and_send_replies(message, video_id, comments, replies) |
|
await post_replies_to_youtube(video_id, comments, replies) |
|
else: |
|
await message.channel.send("์๋ง์ด๋ ๋๊ธ์ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค.") |
|
else: |
|
await message.channel.send("์ ํจํ ์ ํ๋ธ ๋น๋์ค URL์ ์ ๊ณตํด ์ฃผ์ธ์.") |
|
finally: |
|
self.is_processing = False |
|
|
|
def is_message_in_specific_channel(self, message): |
|
|
|
return message.channel.id == SPECIFIC_CHANNEL_ID or ( |
|
isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID |
|
) |
|
|
|
def extract_video_id(url): |
|
""" |
|
YouTube ๋น๋์ค URL์์ ๋น๋์ค ID๋ฅผ ์ถ์ถํฉ๋๋ค. |
|
""" |
|
video_id = None |
|
youtube_regex = ( |
|
r'(https?://)?(www\.)?' |
|
'(youtube|youtu|youtube-nocookie)\.(com|be)/' |
|
'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') |
|
|
|
match = re.match(youtube_regex, url) |
|
if match: |
|
video_id = match.group(6) |
|
logging.debug(f'Extracted video ID: {video_id}') |
|
return video_id |
|
|
|
async def get_best_available_transcript(video_id): |
|
""" |
|
YouTube ๋น๋์ค์ ์๋ง์ ๊ฐ์ ธ์ต๋๋ค. |
|
""" |
|
try: |
|
|
|
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ko']) |
|
except Exception as e: |
|
logging.warning(f'Error fetching Korean transcript: {e}') |
|
try: |
|
|
|
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) |
|
except Exception as e: |
|
logging.warning(f'Error fetching English transcript: {e}') |
|
try: |
|
|
|
transcripts = YouTubeTranscriptApi.list_transcripts(video_id) |
|
transcript = transcripts.find_manually_created_transcript().fetch() |
|
except Exception as e: |
|
logging.error(f'Error fetching alternative transcript: {e}') |
|
return None |
|
|
|
|
|
formatter = TextFormatter() |
|
transcript_text = formatter.format_transcript(transcript) |
|
logging.debug(f'Fetched transcript: {transcript_text}') |
|
return transcript_text |
|
|
|
async def get_video_comments(video_id): |
|
""" |
|
YouTube ๋น๋์ค์ ๋๊ธ์ ๊ฐ์ ธ์ต๋๋ค. |
|
""" |
|
comments = [] |
|
response = youtube_service.commentThreads().list( |
|
part='snippet', |
|
videoId=video_id, |
|
maxResults=100 |
|
).execute() |
|
|
|
for item in response.get('items', []): |
|
comment = item['snippet']['topLevelComment']['snippet']['textOriginal'] |
|
comment_id = item['snippet']['topLevelComment']['id'] |
|
comments.append((comment, comment_id)) |
|
|
|
logging.debug(f'Fetched comments: {comments}') |
|
return comments |
|
|
|
async def generate_replies(comments, transcript): |
|
""" |
|
๋๊ธ๊ณผ ์๋ง์ ๊ธฐ๋ฐ์ผ๋ก LLM ๋ต๊ธ์ ์์ฑํฉ๋๋ค. |
|
""" |
|
replies = [] |
|
for comment, _ in comments: |
|
messages = [ |
|
{"role": "system", "content": f"๋น๋์ค ์๋ง: {transcript}"}, |
|
{"role": "user", "content": comment} |
|
] |
|
loop = asyncio.get_event_loop() |
|
response = await loop.run_in_executor(None, lambda: hf_client.chat_completion( |
|
messages, max_tokens=400, temperature=0.7, top_p=0.85)) |
|
|
|
if response.choices and response.choices[0].message: |
|
reply = response.choices[0].message['content'].strip() |
|
else: |
|
reply = "๋ต๊ธ์ ์์ฑํ ์ ์์ต๋๋ค." |
|
replies.append(reply) |
|
|
|
logging.debug(f'Generated replies: {replies}') |
|
return replies |
|
|
|
async def create_thread_and_send_replies(message, video_id, comments, replies): |
|
""" |
|
๋๊ธ๊ณผ ๋ต๊ธ์ ์๋ก์ด ์ฐ๋ ๋์ ์ ์กํฉ๋๋ค. |
|
""" |
|
thread = await message.channel.create_thread(name=f"{message.author.name}์ ๋๊ธ ๋ต๊ธ", message=message) |
|
for (comment, _), reply in zip(comments, replies): |
|
embed = discord.Embed(description=f"**๋๊ธ**: {comment}\n**๋ต๊ธ**: {reply}") |
|
await thread.send(embed=embed) |
|
|
|
async def post_replies_to_youtube(video_id, comments, replies): |
|
""" |
|
์์ฑ๋ ๋ต๊ธ์ YouTube ๋๊ธ๋ก ๊ฒ์ํฉ๋๋ค. |
|
""" |
|
for (comment, comment_id), reply in zip(comments, replies): |
|
try: |
|
response = youtube_service.comments().insert( |
|
part='snippet', |
|
body={ |
|
'snippet': { |
|
'parentId': comment_id, |
|
'textOriginal': reply |
|
} |
|
} |
|
).execute() |
|
logging.debug(f'Posted reply to comment: {comment_id} with response: {response}') |
|
except Exception as e: |
|
logging.error(f'Error posting reply to comment {comment_id}: {e}') |
|
logging.debug(f'Response: {e.resp} | Content: {e.content}') |
|
|
|
if __name__ == "__main__": |
|
discord_client = MyClient(intents=intents) |
|
discord_client.run(os.getenv('DISCORD_TOKEN')) |
|
|