import datetime import hashlib import math import os import sys import textwrap import time import typing from collections import namedtuple import click from telethon.helpers import generate_random_long from telethon.sync import TelegramClient, connection from telethon.tl.custom.message import Message from telethon.tl.functions.upload import SaveBigFilePartRequest from telethon.tl.functions.upload import GetFileRequest from telethon.tl.types import InputFileBig from telethon.tl.types.upload import File from telethon.utils import get_input_location MESSAGE_BLOCK_START = '_cup_start_' MESSAGE_HEADER = f'TeleCup File Uploader https://lnurl.ru/telecup' MESSAGE_BLOCK_END = '_cup_end_' UploadInfo = namedtuple('UploadInfo', ['file_list', 'real_size']) DownloadInfo = namedtuple('DownloadInfo', ['message', 'part_info']) class Config: def __init__(self): self.session_file = '.telegram-session' self.dialog = 'me' self.client: typing.Optional[TelegramClient] = None pass_config = click.make_pass_decorator(Config, ensure=True) def parse_file_size(humanString: str): humanString = humanString.strip() units = {"k": pow(2, 10), "m": pow(2, 20), "g": pow(2, 30)} for unit in units.keys(): if humanString.lower().endswith(unit): return int(float(humanString[:-1]) * units[unit]) return int(humanString) def format_file_size(bytesCount: int): for unit in ['', 'K', 'M', 'G']: if bytesCount < pow(2, 10): return '%3.2f%s' % (bytesCount, unit) bytesCount /= pow(2, 10) return '%.2f%s' % (bytesCount, 'T') def parse_message(caption: str): if MESSAGE_BLOCK_START not in caption or MESSAGE_BLOCK_END not in caption: return None blockStartPos = caption.index(MESSAGE_BLOCK_START) blockEndPos = caption.index(MESSAGE_BLOCK_END) block = caption[blockStartPos + len(MESSAGE_BLOCK_START):blockEndPos] fileInfo = dict() for line in block.split('\n'): if len(line) > 0 and line[0] == '^': name, value = line[1:].split('=') fileInfo[name.strip()] = value.strip() return fileInfo def retrieve_app_hash(app_config): app_id = None app_hash = None proxy = None config_path = os.path.realpath(app_config) if os.path.isfile(config_path): with open(config_path, 'r') as f: values = {line.split('=')[0].strip(): line.split('=')[1].strip() for line in f.readlines() if len(line.strip()) > 0} if 'APP_ID' in values and 'APP_HASH' in values: app_id = values['APP_ID'] app_hash = values['APP_HASH'] if 'PROXY_HOST' in values and 'PROXY_PORT' in values and 'PROXY_SECRET' in values: proxy = (values['PROXY_HOST'], int(values['PROXY_PORT']), values['PROXY_SECRET']) if not (app_id and app_hash): click.echo('You have to create Telegram App configuration before using this tool') click.echo('Please, visit https://my.telegram.org/apps page and create an app') click.pause() app_id = click.prompt('App id', type=click.INT) app_hash = click.prompt('App hash').strip() use_proxy = click.confirm('Use MTProto proxy? Required in some countries like Russia and Iran', default=False) if use_proxy: proxy_host = click.prompt('Proxy host') proxy_port = click.prompt('Proxy port', type=click.INT) proxy_secret = click.prompt('Proxy secret') proxy = (proxy_host, proxy_port, proxy_secret) with open(config_path, 'w') as f: f.write(f'APP_ID={app_id}\nAPP_HASH={app_hash}\n') if use_proxy: f.write(f'PROXY_HOST={proxy_host}\nPROXY_PORT={proxy_port}\nPROXY_SECRET={proxy_secret}\n') return app_hash, app_id, proxy def check_logged_in(config: Config): config.client.connect() if not config.client.is_user_authorized(): raise click.ClickException('You are not authorized. Please log in first') def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedStreamSize: int, nameHash: str, dialog: str, maxFileSize): fileId = generate_random_long() chunkSize = pow(2, 19) maxChunksInFile = maxFileSize / chunkSize fillByte = 0x00 inputFiles = [] currentFileChunkPos = 0 currentFileTotalChunks = math.ceil(expectedStreamSize / chunkSize) \ if expectedStreamSize < maxFileSize \ else math.ceil(maxFileSize / chunkSize) currentFileBytesWritten = 0 realBytesWritten = 0 while True: buffer = source.read(chunkSize) bufLen = len(buffer) if not bufLen: break if realBytesWritten + bufLen > expectedStreamSize: raise click.ClickException('Stream is larger than expected file size') if bufLen < chunkSize: buffer = buffer + bytearray([fillByte] * (chunkSize - bufLen)) client(SaveBigFilePartRequest( fileId, currentFileChunkPos, currentFileTotalChunks, buffer )) currentFileChunkPos += 1 currentFileBytesWritten += bufLen realBytesWritten += bufLen if currentFileChunkPos % 10 == 0: click.echo(f'{format_file_size(realBytesWritten)} bytes sent') if bufLen < chunkSize: break if currentFileChunkPos == maxChunksInFile: readyFile = InputFileBig( fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin' ) client.send_file(dialog, readyFile, caption=textwrap.dedent(f''' {MESSAGE_BLOCK_START} {MESSAGE_HEADER} #telecup_part #telecup_part_{nameHash} ^name_hash={nameHash} ^part={len(inputFiles) + 1} ^real_size={currentFileBytesWritten} {MESSAGE_BLOCK_END} ''')) inputFiles.append(readyFile) # Start a new file fileId = generate_random_long() currentFileTotalChunks = math.ceil((expectedStreamSize - realBytesWritten) / chunkSize) \ if (expectedStreamSize - realBytesWritten) < maxFileSize \ else math.ceil(maxFileSize / chunkSize) currentFileChunkPos = 0 currentFileBytesWritten = 0 if currentFileBytesWritten > 0: if currentFileChunkPos < currentFileTotalChunks: buffer = bytes(bytearray([fillByte] * chunkSize)) for newChunkPos in range(currentFileChunkPos, currentFileTotalChunks): client(SaveBigFilePartRequest( fileId, newChunkPos, currentFileTotalChunks, buffer )) readyFile = InputFileBig( fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin' ) client.send_file(dialog, readyFile, caption=textwrap.dedent(f''' {MESSAGE_BLOCK_START} {MESSAGE_HEADER} #telecup_part #telecup_part_{nameHash} ^name_hash={nameHash} ^part={len(inputFiles) + 1} ^real_size={currentFileBytesWritten} {MESSAGE_BLOCK_END} ''')) inputFiles.append(readyFile) return UploadInfo(inputFiles, realBytesWritten) def download_part(client: TelegramClient, dest: typing.BinaryIO, dInfo: DownloadInfo): dcId, inputFileLocation = get_input_location(dInfo.message) chunkSize = pow(2, 20) realSize = int(dInfo.part_info['real_size']) totalBytesDownloaded = 0 while totalBytesDownloaded < realSize: offset = totalBytesDownloaded limit = chunkSize if offset + chunkSize < realSize else realSize - offset if limit % pow(2, 12) != 0: extraBytes = pow(2, 12) - limit % pow(2, 12) limit += extraBytes downloadResult: File = client(GetFileRequest( inputFileLocation, offset, limit, precise=False, cdn_supported=False )) buffer = downloadResult.bytes bufLen = len(buffer) totalBytesDownloaded += bufLen if totalBytesDownloaded > realSize: extraBytes = totalBytesDownloaded - realSize buffer = buffer[:-extraBytes] dest.write(buffer) click.echo(f"Part {dInfo.part_info['part']}: {format_file_size(totalBytesDownloaded)} ready", err=True) def download_file(client: TelegramClient, dest: typing.BinaryIO, fileInfo: dict, dialog: str): partMessagesSearchResults = client.iter_messages(dialog, search=f"#telecup_part_{fileInfo['name_hash']}") partMessages: typing.List[typing.Optional[DownloadInfo]] = [None] * int(fileInfo['total_parts']) for msg in partMessagesSearchResults: partInfo = parse_message(msg.message) if not partInfo or partInfo['name_hash'] != fileInfo['name_hash']: continue partMessages[int(partInfo['part']) - 1] = DownloadInfo(msg, partInfo) if any(part is None for part in partMessages): raise click.ClickException('Missing some parts') partMessages.sort(key=lambda dInfo: int(dInfo.part_info['part'])) for part in partMessages: download_part(client, dest, part) click.echo(f"Part {part.part_info['part']} ready", err=True) @click.group() @click.option('--session-file', default='.telegram-session', show_default=True, help='File where telegram session information will be stored (unique for each account)') @click.option('--app-config', default='.telecup.cfg', show_default=True, help='App configuration file') @click.option('--dialog', default='me', help='Name of the conversation, where files will be stored [default: Saved Messages]') @pass_config def cli(config: Config, session_file, app_config, dialog): config.session_file = session_file config.dialog = dialog APP_HASH, APP_ID, proxy = retrieve_app_hash(app_config) if not proxy: client = TelegramClient(session_file, APP_ID, APP_HASH) else: client = TelegramClient(session_file, APP_ID, APP_HASH, connection=connection.ConnectionTcpMTProxyAbridged, proxy=proxy) config.client = client @cli.command() @pass_config def login(config: Config): """ Log into your telegram account interactively and save the login information to session file """ config.client.start() click.echo(f'You are logged in as {config.client.get_me().first_name}') click.echo('To switch user: either pass another session-file or remove existing') @cli.command(name='list') @pass_config def list_files(config: Config): """ List all files, uploaded to your account by TeleCup """ check_logged_in(config) fileMessages: typing.Iterator[Message] = config.client.iter_messages(config.dialog, search='#telecup_file') for msg in fileMessages: fileInfo = parse_message(msg.message) if not fileInfo: continue timestamp = int(fileInfo['created_at']) createdAtFormatted = datetime.datetime.utcfromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M') click.echo(f"{fileInfo['name']}\t{format_file_size(int(fileInfo['real_size']))}\t{createdAtFormatted}") @cli.command() @click.option('--part-size', default='1g', show_default=True, help='Maximum size of a single file') @click.argument('filename') @click.argument('stream_size') @pass_config def upload(config: Config, part_size, filename, stream_size): """ Upload a new file to your account """ source = sys.stdin.buffer estimatedBytes = parse_file_size(stream_size) partSize = parse_file_size(part_size) if partSize > pow(2, 20) * 1536: raise click.ClickException('Part size must be less than 1.5 Gib') check_logged_in(config) nameHash = hashlib.sha256(filename.encode('utf-8')).hexdigest() existingMessages = config.client.get_messages(config.dialog, search=f'#telecup_file_{nameHash}') if len(existingMessages) > 0: raise click.ClickException(f'File with name {filename} already exists in dialog {config.dialog}') uploadInfo = upload_file(config.client, source, estimatedBytes, nameHash, config.dialog, partSize) if uploadInfo.real_size == 0: raise click.ClickException('Input stream is empty') config.client.send_message(config.dialog, textwrap.dedent(f''' {MESSAGE_BLOCK_START} {MESSAGE_HEADER} #telecup_file #telecup_file_{nameHash} ^name={filename} ^name_hash={nameHash} ^real_size={uploadInfo.real_size} ^total_parts={len(uploadInfo.file_list)} ^created_at={int(time.time())} {MESSAGE_BLOCK_END} ''')) click.echo('OK') config.client.disconnect() @cli.command() @click.argument('filename') @pass_config def download(config: Config, filename): """ Download an existing file from your account """ destination = sys.stdout.buffer check_logged_in(config) fileMessages: typing.Iterator[Message] = config.client.iter_messages(config.dialog, search='#telecup_file') for msg in fileMessages: fileInfo = parse_message(msg.message) if fileInfo and ( fileInfo['name'].startswith(filename.strip()) or fileInfo['name_hash'].startswith(filename.strip()) ): download_file(config.client, destination, fileInfo, config.dialog) click.echo('OK', err=True) config.client.disconnect()