|
import telegram |
|
from telegram.ext import Updater,CommandHandler,MessageHandler,Filters,ConversationHandler |
|
import os, shutil |
|
from telegram import ReplyKeyboardRemove, ReplyKeyboardMarkup |
|
from youtube import * |
|
import threading |
|
|
|
|
|
PORT = int(os.environ.get('PORT',5000)) |
|
token = '5728368003:AAFGFs7r1LAJB-flrJ9et5J7Okzgw1_4s_Y' |
|
|
|
START_CO, GET_WORD, GET_NUMBER,GET_CHANNEL_URL, GET_URL, CONFIRMATION = range(1, 7) |
|
|
|
reply_keyboeard_start = [['Download entire channel'],['Download with searching word'], ['Download one video'], ['See processes'], ['exit']] |
|
markup_start = ReplyKeyboardMarkup(reply_keyboeard_start,resize_keyboard=True, one_time_keyboard=True) |
|
|
|
reply_keyboeard_back = [['back', 'π home', 'exit']] |
|
markup_back = ReplyKeyboardMarkup(reply_keyboeard_back,resize_keyboard=True, one_time_keyboard=True) |
|
|
|
reply_keyboeard_confirmation = [['I confirm'], ['π home', 'exit']] |
|
markup_confirmation = ReplyKeyboardMarkup(reply_keyboeard_confirmation,resize_keyboard=True, one_time_keyboard=True) |
|
|
|
|
|
|
|
def start(update,context): |
|
update.message.reply_text('Choose between options : ', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
def start_co(update, context): |
|
user = update.message.from_user |
|
text = update.message.text |
|
|
|
remake_folder(str(user.id)) |
|
|
|
if text == 'Download entire channel': |
|
update.message.reply_text('Enter URL of one video on channel you want to download all of that videos.', reply_markup = markup_back) |
|
return(GET_CHANNEL_URL) |
|
|
|
elif text == 'Download with searching word': |
|
update.message.reply_text('Enter word you want to search.', reply_markup = markup_back) |
|
return(GET_WORD) |
|
|
|
elif text == 'Download one video': |
|
update.message.reply_text('Enter link of that video.', reply_markup = markup_back) |
|
return(GET_URL) |
|
|
|
def get_channel_url(update,context): |
|
user_data = context.user_data |
|
text = update.message.text |
|
|
|
if text == 'back': |
|
update.message.reply_text('Choose ...', reply_markup = markup_start) |
|
return(START_CO) |
|
id = find_channel_id(text) |
|
if id : |
|
list_of_urls = get_videos_from_channel(id) |
|
if list_of_urls: |
|
user_data['list_of_urls'] = list_of_urls |
|
update.message.reply_text(f'There is {len(list_of_urls)} videos on this channel', reply_markup = markup_confirmation) |
|
return(CONFIRMATION) |
|
else: |
|
update.message.reply_text('Can not find id of channel', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
def get_word_for_search(update, context): |
|
user_data = context.user_data |
|
text = update.message.text |
|
|
|
if text == 'back': |
|
update.message.reply_text('Choose ...', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
user_data['search_word'] = text |
|
update.message.reply_text('How many videos you wanna download ?', reply_markup = markup_back) |
|
return(GET_NUMBER) |
|
|
|
def get_number_of_videos(update, context): |
|
user_data = context.user_data |
|
number = update.message.text |
|
|
|
if number == 'back': |
|
update.message.reply_text('Enter word you want to search.', reply_markup = markup_back) |
|
return(GET_WORD) |
|
|
|
try: |
|
number = int(number) |
|
except: |
|
update.message.reply_text('Wrong input', reply_markup = markup_back) |
|
return(GET_NUMBER) |
|
|
|
list_of_urls = find_videos_with_search(user_data['search_word'], number) |
|
if list_of_urls: |
|
user_data['list_of_urls'] = list_of_urls |
|
|
|
text = f''' |
|
Search word : {user_data['search_word']} |
|
Number of videos : {number} |
|
If it is ok pleas confirm.''' |
|
update.message.reply_text(text, reply_markup = markup_confirmation) |
|
return(CONFIRMATION) |
|
|
|
def one_video_download(update, context): |
|
user = update.message.from_user |
|
text = update.message.text |
|
url = text.strip() |
|
|
|
if text == 'back': |
|
update.message.reply_text('Choose ...', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
try: |
|
status = Download(url, user.id) |
|
if status: |
|
update.message.reply_video(video = open(status, 'rb'), reply_markup = markup_start) |
|
|
|
return(START_CO) |
|
else: |
|
update.message.reply_text(f"could not download the video {url}", reply_markup = markup_start) |
|
return(START_CO) |
|
except: |
|
update.message.reply_text(f"could not download {url}", reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
|
|
|
|
def do_downloading(user_data, user, update): |
|
|
|
for url in user_data['list_of_urls']: |
|
try: |
|
status = Download(url['url'], user.id) |
|
if status: |
|
update.message.reply_video(video = open(status, 'rb'), caption = url['title']) |
|
|
|
else: |
|
update.message.reply_text(f"could not download the video {url['url']}") |
|
continue |
|
except: |
|
update.message.reply_text(f"could not download {url['url']}", reply_markup = ReplyKeyboardRemove()) |
|
continue |
|
|
|
def how_many_thread_is_alive(update, context): |
|
user_data = context.user_data |
|
|
|
counter = 0 |
|
if user_data.get('thread'): |
|
for i in user_data['thread']: |
|
if i.is_alive(): |
|
counter += 1 |
|
|
|
update.message.reply_text(f'there is {counter} process is going.', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
def confirmation(update, context): |
|
user_data = context.user_data |
|
user = update.message.from_user |
|
text = update.message.text |
|
|
|
if text != 'I confirm': |
|
update.message.reply_text('Choose ...', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
t = threading.Thread(target=do_downloading, args=(user_data, user, update)) |
|
t.start() |
|
user_data['thread'].append(t) |
|
|
|
|
|
update.message.reply_text('finish proces', reply_markup = markup_start) |
|
return(START_CO) |
|
|
|
|
|
def stop_conversation(update,context): |
|
update.message.reply_text('goodbye' , reply_markup = ReplyKeyboardRemove()) |
|
return(ConversationHandler.END) |
|
|
|
def cancle(update,context): |
|
update.message.reply_text('bye' , reply_markup = ReplyKeyboardRemove()) |
|
return(ConversationHandler.END) |
|
|
|
def timeout(update, context): |
|
user = update.message.from_user |
|
try: |
|
remake_folder(str(user.id)) |
|
except: |
|
pass |
|
|
|
update.message.reply_text('the time is out.',reply_markup = ReplyKeyboardRemove()) |
|
|
|
def error(update,context): |
|
print(update,context.error) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bot = telegram.Bot(token=token) |
|
|
|
def main(): |
|
updater = Updater(token, use_context=True) |
|
dp = updater.dispatcher |
|
|
|
|
|
conv_handler = ConversationHandler( |
|
entry_points = [CommandHandler('start', start)], |
|
|
|
|
|
states = states, |
|
|
|
fallbacks = [CommandHandler('cancle', start), CommandHandler('start', start), MessageHandler(Filters.regex('^exit$'), stop_conversation), |
|
MessageHandler(Filters.regex('^π home$'), start_co)], |
|
|
|
conversation_timeout = 50000, |
|
) |
|
|
|
|
|
dp.add_handler(conv_handler) |
|
|
|
dp.add_error_handler(error) |
|
|
|
print('trying to connect to telegram api ...') |
|
|
|
updater.start_polling() |
|
|
|
|
|
|
|
|
|
|
|
print('connected to telegram api : 200 ') |
|
|
|
updater.idle() |
|
|
|
|
|
|
|
def remake_folder(folder_name): |
|
|
|
folder_name = f'Downloads/{folder_name}' |
|
|
|
if os.path.exists(folder_name): |
|
for filename in os.listdir(folder_name): |
|
file_path = os.path.join(folder_name, filename) |
|
try: |
|
if os.path.isfile(file_path) or os.path.islink(file_path): |
|
os.unlink(file_path) |
|
elif os.path.isdir(file_path): |
|
shutil.rmtree(file_path) |
|
except Exception as e: |
|
print('Failed to delete %s. Reason: %s' % (file_path, e)) |
|
|
|
else: |
|
os.mkdir(folder_name) |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
same = [CommandHandler('cancle', cancle), MessageHandler(Filters.regex('^exit$'), stop_conversation), MessageHandler(Filters.regex('^π home$'), start)] |
|
|
|
|
|
states = { |
|
START_CO : [CommandHandler('start', start), |
|
MessageHandler(Filters.regex('^Download entire channel$'), start_co), |
|
MessageHandler(Filters.regex('^Download with searching word$'), start_co), |
|
MessageHandler(Filters.regex('^Download one video$'), start_co), |
|
MessageHandler(Filters.regex('^See processes$'), how_many_thread_is_alive), |
|
], |
|
|
|
GET_WORD : same + [CommandHandler('start', start), MessageHandler(Filters.text , get_word_for_search)], |
|
|
|
GET_NUMBER : same + [CommandHandler('start', start), MessageHandler(Filters.text , get_number_of_videos)], |
|
|
|
GET_CHANNEL_URL : same + [CommandHandler('start', start), MessageHandler(Filters.text , get_channel_url)], |
|
|
|
GET_URL : same + [CommandHandler('start', start), MessageHandler(Filters.text , one_video_download)], |
|
|
|
CONFIRMATION : [CommandHandler('start', start), MessageHandler(Filters.regex('^I confirm$'), confirmation)], |
|
|
|
|
|
|
|
} |
|
|
|
main() |