import logging |
import os |
import requests |
import sys |
import aiogram |
import random |
import asyncio |
from typing import List |
from requests.exceptions import ConnectionError |
from aiogram import Bot, Dispatcher, F, types |
from aiogram.fsm.storage.memory import MemoryStorage |
from aiogram.types import ContentType, Message |
from aiogram_media_group import MediaGroupFilter, media_group_handler |
from dotenv import load_dotenv |
from vk_api import VkApi |
from vk_api.upload import VkUpload |
from vk_api.utils import get_random_id |
from PIL import Image, UnidentifiedImageError |
from rlottie_python import LottieAnimation |
from moviepy.editor import VideoFileClip |
load_dotenv() |
logging.basicConfig(level=logging.INFO) |
VK_API_TOKEN = os.getenv('VK_API_TOKEN') |
VK_GROUP_ID = os.getenv('VK_GROUP_ID') |
vk_session = VkApi(token=VK_API_TOKEN) |
vk = vk_session.get_api() |
uploader = VkUpload(vk) |
PROXY_URL = os.getenv('PROXY_URL') |
proxy = PROXY_URL |
proxies = {"http": proxy, "https": proxy} |
bot = Bot(token=TELEGRAM_API_TOKEN, proxy=proxies) |
storage = MemoryStorage() |
dp = Dispatcher(storage=storage, proxy=proxies) |
def add_entry(message_id, post_id): |
with open('data.txt', 'a') as f: |
f.write(f'{message_id}:{post_id}\n') |
def get_entry(message_id) -> int: |
with open('data.txt', 'r') as f: |
for line in f.readlines(): |
if int(line.split(':')[0]) == message_id: |
return int(line.split(':')[1]) |
raise KeyError(f'{message_id} is not in the file!') |
def create_vk_post(text: str, message_id, photo_list=None, video_list=None, doc_list=None, audio_list=None, gif_list=None): |
photos, videos, docs, audios, gifs = [], [], [], [], [] |
try: |
if photo_list: |
photos = uploader.photo_wall(photos=photo_list, group_id=VK_GROUP_ID) |
if video_list: |
videos = [uploader.video(video_file=path, group_id=int(VK_GROUP_ID)) for path in video_list] |
if doc_list: |
for doc in doc_list: |
if os.path.exists(doc): |
with open(doc, 'rb') as f: |
doc_info = uploader.document(doc=f, title=os.path.basename(doc)) |
docs.append(doc_info) |
if audio_list: |
for audio in audio_list: |
if os.path.exists(audio): |
with open(audio, 'rb') as f: |
audio_info = uploader.audio(audio=f, artist="Unknown Artist", title=os.path.basename(audio)) |
audios.append(audio_info) |
if gif_list: |
for gif in gif_list: |
if os.path.exists(gif): |
with open(gif, 'rb') as f: |
gif_info = uploader.document(doc=f, title=os.path.basename(gif)) |
gifs.append(gif_info) |
attachments = [f'photo{photo.get("owner_id")}_{photo["id"]}' for photo in photos if "owner_id" in photo and "id" in photo] |
attachments += [f'video{video["owner_id"]}_{video["video_id"]}' for video in videos if "owner_id" in video and "video_id" in video] |
attachments += [f'doc{doc["doc"]["owner_id"]}_{doc["doc"]["id"]}' for doc in docs if "doc" in doc and "owner_id" in doc["doc"] and "id" in doc["doc"]] |
attachments += [f'audio{audio["owner_id"]}_{audio["id"]}' for audio in audios if "owner_id" in audio and "id" in audio] |
attachments += [f'doc{gif["doc"]["owner_id"]}_{gif["doc"]["id"]}' for gif in gifs if "doc" in gif and "owner_id" in gif["doc"] and "id" in gif["doc"]] |
source_link = f'https://t.me/{TELEGRAM_CHANNEL_USERNAME}/{message_id}' |
post_text = f'{text}\n\nИсточник: {source_link}' |
response = vk.wall.post( |
owner_id=f'-{VK_GROUP_ID}', |
message=post_text, |
attachments=','.join(attachments), |
from_group=1, |
copyright=source_link, |
random_id=get_random_id() |
) |
post_id = response['post_id'] |
add_entry(message_id, post_id) |
except Exception as e: |
logging.error(f'Error while creating VK post: {e}') |
def edit_vk_post(post_id, new_text, message_id): |
old_post = vk.wall.getById(posts=f'-{VK_GROUP_ID}_{post_id}')[0] |
attachments = [f'{attachment["type"]}{attachment[attachment["type"]]["owner_id"]}_{attachment[attachment["type"]]["id"]}' for attachment in old_post.get('attachments', [])] |
source_link = f'https://t.me/{TELEGRAM_CHANNEL_USERNAME}/{message_id}' |
edited_text = f'{new_text}\n\nИсточник: {source_link}' |
vk.wall.edit( |
message=edited_text, |
post_id=post_id, |
from_group=1, |
owner_id=f'-{VK_GROUP_ID}', |
copyright=source_link, |
attachments=attachments |
) |
async def download_file_with_retries(file_id: str, destination: str, retries: int = 3, delay: int = 5): |
os.makedirs(os.path.dirname(destination), exist_ok=True) |
for attempt in range(retries): |
try: |
file = await bot.get_file(file_id) |
await bot.download_file(file.file_path, destination) |
return True |
except Exception as e: |
logging.error(f'Error while downloading file (attempt {attempt + 1}/{retries}): {e}') |
if attempt < retries - 1: |
await asyncio.sleep(delay) |
return False |
@media_group_handler |
async def album_handler(messages: List[Message]): |
random_number = random.randint(1000000, 9999999) |
c = 0 |
photo_list = [] |
video_list = [] |
doc_list = [] |
audio_list = [] |
gif_list = [] |
text = None |
for message in messages: |
if message.caption and not text: |
text = message.caption |
if message.photo: |
path = f'./files/photo_{random_number}_{c}.jpg' |
if await download_file_with_retries(message.photo[-1].file_id, path): |
photo_list.append(path) |
elif message.video: |
path = f'./files/video_{random_number}_{c}.mp4' |
if await download_file_with_retries(message.video.file_id, path): |
video_list.append(path) |
elif message.document: |
path = f'./files/doc_{random_number}_{c}_{message.document.file_name}' |
if await download_file_with_retries(message.document.file_id, path): |
doc_list.append(path) |
elif message.audio: |
path = f'./files/audio_{random_number}_{c}_{message.audio.file_name}' |
if await download_file_with_retries(message.audio.file_id, path): |
audio_list.append(path) |
elif message.animation: |
path = f'./files/gif_{random_number}_{c}.mp4' |
if await download_file_with_retries(message.animation.file_id, path): |
gif_list.append(path) |
c += 1 |
message_id = messages[0].message_id |
post_text = text if text else '' |
await asyncio.to_thread(create_vk_post, post_text, message_id, photo_list, video_list, doc_list, audio_list, gif_list) |
for path in photo_list + video_list + doc_list + audio_list + gif_list: |
os.remove(path) |
async def photo_video_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
if message.photo: |
path = f'./files/photo_{random_number}.jpg' |
if await download_file_with_retries(message.photo[-1].file_id, path): |
await asyncio.to_thread(create_vk_post, text, message.message_id, [path]) |
os.remove(path) |
elif message.video: |
path = f'./files/video_{random_number}.mp4' |
if await download_file_with_retries(message.video.file_id, path): |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, [path]) |
os.remove(path) |
async def document_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
path = f'./files/doc_{random_number}_{message.document.file_name}' |
if await download_file_with_retries(message.document.file_id, path): |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, None, [path]) |
os.remove(path) |
async def audio_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
audio_paths = [] |
for i, audio in enumerate(message.audio): |
path = f'./files/audio_{random_number}_{i}_{audio.file_name}' |
if await download_file_with_retries(audio.file_id, path): |
if os.path.exists(path): |
audio_paths.append(path) |
if audio_paths: |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, None, None, audio_paths) |
for path in audio_paths: |
os.remove(path) |
async def video_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
path = f'./files/video_{random_number}_{message.video.file_name}' |
if await download_file_with_retries(message.video.file_id, path): |
if os.path.exists(path): |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, [path]) |
os.remove(path) |
async def text_handler(message: Message): |
await asyncio.to_thread(create_vk_post, message.text, message.message_id) |
async def edited_handler(message: Message): |
if message.message_id is None: |
return |
try: |
post_id = get_entry(message.message_id) |
except KeyError: |
return |
text = message.text or message.caption or '' |
await asyncio.to_thread(edit_vk_post, post_id, text, message.message_id) |
async def sticker_handler(message: Message): |
random_number = random.randint(1000000, 9999999) |
path = f'./files/sticker_{random_number}.{"tgs" if message.sticker.is_animated else "webp"}' |
if await download_file_with_retries(message.sticker.file_id, path): |
if message.sticker.is_animated or message.sticker.is_video: |
try: |
gif_path = f'./files/sticker_{random_number}.gif' |
if message.sticker.is_animated: |
animation = LottieAnimation.from_tgs(path) |
frames = [animation.render_pillow_frame(frame_num=i) for i in range(animation.lottie_animation_get_totalframe())] |
frames[0].save(gif_path, save_all=True, append_images=frames[1:], loop=0, duration=1000 // animation.lottie_animation_get_framerate()) |
else: |
clip = VideoFileClip(path) |
clip.write_gif(gif_path) |
clip.close() |
os.remove(path) |
await asyncio.to_thread(create_vk_post, '', message.message_id, None, None, None, None, [gif_path]) |
os.remove(gif_path) |
except Exception as e: |
logging.error(f'Error while converting animated sticker: {e}') |
try: |
os.remove(path) |
except PermissionError: |
pass |
else: |
try: |
png_path = f'./files/sticker_{random_number}.png' |
try: |
with Image.open(path) as im: |
width, height = im.size |
scale = 1.0 |
width_new, height_new = int(width * scale), int(height * scale) |
im_resized = im.resize((width_new, height_new), Image.LANCZOS) |
im_resized.save(png_path, 'PNG') |
except UnidentifiedImageError as e: |
raise |
os.remove(path) |
await asyncio.to_thread(create_vk_post, '', message.message_id, [png_path], None, None, None, None) |
os.remove(png_path) |
except Exception as e: |
logging.error(f'Error while converting static sticker: {e}') |
if os.path.exists(path): |
os.remove(path) |
async def voice_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
path = f'./files/voice_{random_number}.ogg' |
download_task = asyncio.create_task(download_file_with_retries(message.voice.file_id, path, retries=5, delay=10)) |
try: |
await download_task |
if not os.path.exists(path): |
return |
except Exception as e: |
return |
file_size = os.path.getsize(path) |
max_size = 10 * 1024 * 1024 |
if file_size > max_size: |
os.remove(path) |
return |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, None, None, [path]) |
os.remove(path) |
async def animation_handler(message: Message): |
text = message.caption or '' |
random_number = random.randint(1000000, 9999999) |
path = f'./files/animation_{random_number}.mp4' |
retries = 3 |
delay = 5 |
for attempt in range(retries): |
try: |
if await download_file_with_retries(message.animation.file_id, path): |
gif_path = f'./files/animation_{random_number}.gif' |
clip = VideoFileClip(path) |
clip.write_gif(gif_path) |
clip.close() |
os.remove(path) |
await asyncio.to_thread(create_vk_post, text, message.message_id, None, None, None, None, [gif_path]) |
os.remove(gif_path) |
break |
except Exception as e: |
if attempt < retries - 1: |
await asyncio.sleep(delay) |
else: |
logging.error(f'Failed to download animation {message.animation.file_id} after {retries} attempts') |
async def main(): |
dp.channel_post.register(album_handler, MediaGroupFilter()) |
dp.channel_post.register(photo_video_handler, F.content_type.in_([ContentType.PHOTO, ContentType.VIDEO])) |
dp.channel_post.register(document_handler, F.content_type == ContentType.DOCUMENT) |
dp.channel_post.register(audio_handler, F.content_type == ContentType.AUDIO) |
dp.channel_post.register(video_handler, F.content_type == ContentType.VIDEO) |
dp.channel_post.register(text_handler, F.content_type == ContentType.TEXT) |
dp.channel_post.register(sticker_handler, F.content_type == ContentType.STICKER) |
dp.channel_post.register(animation_handler, F.content_type == ContentType.ANIMATION) |
dp.channel_post.register(voice_handler, F.content_type == ContentType.VOICE) |
dp.edited_channel_post.register(edited_handler) |
await dp.start_polling(bot) |
if __name__ == '__main__': |
logging.info('Starting bot') |
asyncio.run(main()) |