diff --git a/requirements.txt b/requirements.txt index 389ea499..f12b5437 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ pydantic==2.11.4 python-dotenv==1.1.0 python-telegram-bot==22.1 python-twitter-v2==0.9.2 +tweepy==4.14.0 Requests==2.32.3 SQLAlchemy==2.0.41 starlette==0.46.2 diff --git a/services/runner/tasks/tweet_task.py b/services/runner/tasks/tweet_task.py index 10ae8fb1..394dd3a1 100644 --- a/services/runner/tasks/tweet_task.py +++ b/services/runner/tasks/tweet_task.py @@ -10,8 +10,16 @@ QueueMessageType, XCredsFilter, ) +import re +from io import BytesIO +from urllib.parse import urlparse + +import requests +import tweepy + from lib.logger import configure_logger from lib.twitter import TwitterService +from lib.utils import extract_image_urls from services.discord import create_discord_service from services.runner.base import BaseTask, JobContext, RunnerConfig, RunnerResult @@ -34,6 +42,56 @@ def __init__(self, config: Optional[RunnerConfig] = None): self._pending_messages: Optional[List[QueueMessage]] = None self.twitter_service = None + def _get_extension(self, url: str) -> str: + path = urlparse(url).path.lower() + for ext in [".png", ".jpg", ".jpeg", ".gif"]: + if path.endswith(ext): + return ext + return ".jpg" + + def _post_tweet_with_media( + self, + image_url: str, + text: str, + reply_id: Optional[str] = None, + ): + try: + headers = { + "User-Agent": "Mozilla/5.0" + } + response = requests.get(image_url, headers=headers, timeout=10) + response.raise_for_status() + auth = tweepy.OAuth1UserHandler( + self.twitter_service.consumer_key, + self.twitter_service.consumer_secret, + self.twitter_service.access_token, + self.twitter_service.access_secret, + ) + api = tweepy.API(auth) + extension = self._get_extension(image_url) + media = api.media_upload( + filename=f"image{extension}", + file=BytesIO(response.content), + ) + + client = tweepy.Client( + consumer_key=self.twitter_service.consumer_key, + consumer_secret=self.twitter_service.consumer_secret, + access_token=self.twitter_service.access_token, + access_token_secret=self.twitter_service.access_secret, + ) + + result = client.create_tweet( + text=text, + media_ids=[media.media_id_string], + reply_in_reply_to_tweet_id=reply_id, + ) + if result and result.data: + return type("Obj", (), {"id": result.data["id"]})() + except Exception as e: + logger.error(f"Failed to post tweet with media: {str(e)}") + return None + async def _initialize_twitter_service(self, dao_id: UUID) -> bool: """Initialize Twitter service with credentials for the given DAO.""" try: @@ -186,17 +244,32 @@ async def _process_tweet_message( ) # Extract tweet text directly from the message format - tweet_text = message.message["message"] + original_text = message.message["message"] logger.info(f"Sending tweet for DAO {message.dao_id}") - logger.debug(f"Tweet content: {tweet_text}") + logger.debug(f"Tweet content: {original_text}") + + # Look for image URLs in the text + image_urls = extract_image_urls(original_text) + image_url = image_urls[0] if image_urls else None + tweet_text = original_text + + if image_url: + tweet_text = re.sub(re.escape(image_url), "", original_text).strip() + tweet_text = re.sub(r"\s+", " ", tweet_text) # Prepare tweet parameters tweet_params = {"text": tweet_text} if message.tweet_id: tweet_params["reply_in_reply_to_tweet_id"] = message.tweet_id - # Send tweet using Twitter service - tweet_response = await self.twitter_service._apost_tweet(**tweet_params) + if image_url: + tweet_response = self._post_tweet_with_media( + image_url=image_url, + text=tweet_text, + reply_id=message.tweet_id, + ) + else: + tweet_response = await self.twitter_service._apost_tweet(**tweet_params) if not tweet_response: return TweetProcessingResult( @@ -214,8 +287,13 @@ async def _process_tweet_message( discord_service = create_discord_service() if discord_service: - discord_result = discord_service.send_message(tweet_text) - logger.info(f"Discord message sent: {discord_result['success']}") + embeds = None + if image_url: + embeds = [{"image": {"url": image_url}}] + discord_result = discord_service.send_message(tweet_text, embeds=embeds) + logger.info( + f"Discord message sent: {discord_result['success']}" + ) except Exception as e: logger.warning(f"Failed to send Discord message: {str(e)}") @@ -281,4 +359,4 @@ async def _execute_impl(self, context: JobContext) -> List[TweetProcessingResult return results -tweet_task = TweetTask() \ No newline at end of file +tweet_task = TweetTask()