telecup/telecup_cli.py
2020-07-19 09:44:13 +03:00

409 lines
16 KiB
Python

import asyncio
import datetime
import hashlib
import math
import os
import random
import sys
import textwrap
import time
import typing
from collections import namedtuple
from concurrent.futures.thread import ThreadPoolExecutor
from functools import wraps
import click
from telethon.errors import BadRequestError
from telethon.helpers import generate_random_long
from telethon import TelegramClient, connection
from telethon.tl.custom.message import Message
from telethon.tl.functions.upload import SaveBigFilePartRequest
from telethon.tl.functions.upload import GetFileRequest
from telethon.tl.types import InputFileBig
from telethon.tl.types.upload import File
from telethon.utils import get_input_location
MESSAGE_BLOCK_START = '_cup_start_'
MESSAGE_HEADER = f'TeleCup File Uploader https://lnurl.ru/telecup'
MESSAGE_BLOCK_END = '_cup_end_'
UploadInfo = namedtuple('UploadInfo', ['file_list', 'real_size'])
DownloadInfo = typing.NamedTuple('DownloadInfo', [('dialog', str), ('message', Message), ('part_info', dict)])
class Config:
def __init__(self):
self.session_file = '.telegram-session'
self.dialog = 'me'
self.client: typing.Optional[TelegramClient] = None
pass_config = click.make_pass_decorator(Config, ensure=True)
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
currentLoop = asyncio.get_event_loop()
return currentLoop.run_until_complete(f(*args, **kwargs))
return wrapper
def parse_file_size(humanString: str):
humanString = humanString.strip()
units = {"k": pow(2, 10), "m": pow(2, 20), "g": pow(2, 30)}
for unit in units.keys():
if humanString.lower().endswith(unit):
return int(float(humanString[:-1]) * units[unit])
return int(humanString)
def format_file_size(bytesCount: int):
for unit in ['', 'K', 'M', 'G']:
if bytesCount < pow(2, 10):
return '%3.2f%s' % (bytesCount, unit)
bytesCount /= pow(2, 10)
return '%.2f%s' % (bytesCount, 'T')
def parse_message(caption: str):
if MESSAGE_BLOCK_START not in caption or MESSAGE_BLOCK_END not in caption:
return None
blockStartPos = caption.index(MESSAGE_BLOCK_START)
blockEndPos = caption.index(MESSAGE_BLOCK_END)
block = caption[blockStartPos + len(MESSAGE_BLOCK_START):blockEndPos]
fileInfo = dict()
for line in block.split('\n'):
if len(line) > 0 and line[0] == '^':
name, value = line[1:].split('=')
fileInfo[name.strip()] = value.strip()
return fileInfo
def retrieve_app_hash(app_config):
app_id = None
app_hash = None
proxy = None
config_path = os.path.realpath(app_config)
if os.path.isfile(config_path):
with open(config_path, 'r') as f:
values = {line.split('=')[0].strip(): line.split('=')[1].strip() for line in f.readlines() if len(line.strip()) > 0}
if 'APP_ID' in values and 'APP_HASH' in values:
app_id = values['APP_ID']
app_hash = values['APP_HASH']
if 'PROXY_HOST' in values and 'PROXY_PORT' in values and 'PROXY_SECRET' in values:
proxy = (values['PROXY_HOST'], int(values['PROXY_PORT']), values['PROXY_SECRET'])
if not (app_id and app_hash):
click.echo('You have to create Telegram App configuration before using this tool')
click.echo('Please, visit https://my.telegram.org/apps page and create an app')
click.pause()
app_id = click.prompt('App id', type=click.INT)
app_hash = click.prompt('App hash').strip()
use_proxy = click.confirm('Use MTProto proxy? Required in some countries like Russia and Iran', default=False)
if use_proxy:
proxy_host = click.prompt('Proxy host')
proxy_port = click.prompt('Proxy port', type=click.INT)
proxy_secret = click.prompt('Proxy secret')
proxy = (proxy_host, proxy_port, proxy_secret)
with open(config_path, 'w') as f:
f.write(f'APP_ID={app_id}\nAPP_HASH={app_hash}\n')
if use_proxy:
f.write(f'PROXY_HOST={proxy_host}\nPROXY_PORT={proxy_port}\nPROXY_SECRET={proxy_secret}\n')
return app_hash, app_id, proxy
async def check_logged_in(config: Config):
await config.client.connect()
if not await config.client.is_user_authorized():
raise click.ClickException('You are not authorized. Please log in first')
async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedStreamSize: int, partId: str, dialog: str, maxFileSize):
fileId = generate_random_long()
chunkSize = pow(2, 19)
maxChunksInFile = maxFileSize / chunkSize
fillByte = 0x00
inputFiles = []
currentFileChunkPos = 0
currentFileTotalChunks = math.ceil(expectedStreamSize / chunkSize) \
if expectedStreamSize < maxFileSize \
else math.ceil(maxFileSize / chunkSize)
currentFileBytesWritten = 0
realBytesWritten = 0
buffer = source.read(chunkSize)
if not len(buffer):
raise click.ClickException('Input stream is empty')
currentLoop = asyncio.get_running_loop()
currentExecutor = ThreadPoolExecutor(max_workers=1)
def readNextPortion(readBytes=chunkSize):
return source.read(readBytes)
lastRealTimeMeasurement = time.time()
while True:
bufLen = len(buffer)
if not bufLen:
break
if realBytesWritten + bufLen > expectedStreamSize:
raise click.ClickException('Stream is larger than expected file size')
if bufLen < chunkSize:
buffer = buffer + bytearray([fillByte] * (chunkSize - bufLen))
taskSend = client(SaveBigFilePartRequest(
fileId, currentFileChunkPos, currentFileTotalChunks, buffer
))
taskGetNextBuffer = currentLoop.run_in_executor(currentExecutor, readNextPortion, chunkSize)
clientResponse, buffer = await asyncio.gather(taskSend, taskGetNextBuffer, loop=currentLoop, return_exceptions=False)
currentFileChunkPos += 1
currentFileBytesWritten += bufLen
realBytesWritten += bufLen
if currentFileChunkPos % 20 == 0:
click.echo(f'{format_file_size(realBytesWritten)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t', nl=False)
# lastRealTimeMeasurement = time.time()
if bufLen < chunkSize:
break
if currentFileChunkPos == maxChunksInFile:
readyFile = InputFileBig(
fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin'
)
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
{MESSAGE_BLOCK_START}
{MESSAGE_HEADER}
#telecup_part #telecup_part_{partId}
^part_id={partId}
^part={len(inputFiles) + 1}
^real_size={currentFileBytesWritten}
{MESSAGE_BLOCK_END}
'''))
inputFiles.append(readyFile)
# Start a new file
fileId = generate_random_long()
currentFileTotalChunks = math.ceil((expectedStreamSize - realBytesWritten) / chunkSize) \
if (expectedStreamSize - realBytesWritten) < maxFileSize \
else math.ceil(maxFileSize / chunkSize)
currentFileChunkPos = 0
currentFileBytesWritten = 0
if currentFileBytesWritten > 0:
if currentFileChunkPos < currentFileTotalChunks:
buffer = bytes(bytearray([fillByte] * chunkSize))
for newChunkPos in range(currentFileChunkPos, currentFileTotalChunks):
await client(SaveBigFilePartRequest(
fileId, newChunkPos, currentFileTotalChunks, buffer
))
readyFile = InputFileBig(
fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin'
)
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
{MESSAGE_BLOCK_START}
{MESSAGE_HEADER}
#telecup_part #telecup_part_{partId}
^part_id={partId}
^part={len(inputFiles) + 1}
^real_size={currentFileBytesWritten}
{MESSAGE_BLOCK_END}
'''))
inputFiles.append(readyFile)
return UploadInfo(inputFiles, realBytesWritten)
async def download_part(client: TelegramClient, dest: typing.BinaryIO, dInfo: DownloadInfo):
dcId, inputFileLocation = get_input_location(dInfo.message)
chunkSize = pow(2, 19)
realSize = int(dInfo.part_info['real_size'])
totalBytesDownloaded = 0
lastRealTimeMeasurement = time.time()
buffer = None
currentLoop = asyncio.get_running_loop()
currentExecutor = ThreadPoolExecutor(max_workers=1)
def writeBuffer(buf):
if buf:
dest.write(buf)
while totalBytesDownloaded < realSize:
offset = totalBytesDownloaded
limit = chunkSize
try:
taskReceive = client(GetFileRequest(
inputFileLocation,
offset,
limit,
precise=False,
cdn_supported=False
))
taskWriteBuffer = currentLoop.run_in_executor(currentExecutor, writeBuffer, buffer)
downloadResult, writeResult = await asyncio.gather(taskReceive, taskWriteBuffer, loop=currentLoop, return_exceptions=False)
except BadRequestError as e:
if 'expire' in str(e.message).lower():
click.echo('Reloading message...', err=True)
newMessage = await client.get_messages(dInfo.dialog, ids=dInfo.message.id)
dcId, inputFileLocation = get_input_location(newMessage)
continue
else:
raise e
buffer = downloadResult.bytes
bufLen = len(buffer)
totalBytesDownloaded += bufLen
if totalBytesDownloaded > realSize:
extraBytes = totalBytesDownloaded - realSize
buffer = buffer[:-extraBytes]
if totalBytesDownloaded % (pow(2, 20) * 10) == 0:
click.echo(
f"{dInfo.part_info['part']}p {format_file_size(totalBytesDownloaded)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t",
nl=False, err=True
)
writeBuffer(buffer)
async def download_file(client: TelegramClient, dest: typing.BinaryIO, fileInfo: dict, dialog: str):
partMessagesSearchResults = client.iter_messages(dialog, search=f"#telecup_part_{fileInfo['part_id']}")
partMessages: typing.List[typing.Optional[DownloadInfo]] = [None] * int(fileInfo['total_parts'])
async for msg in partMessagesSearchResults:
partInfo = parse_message(msg.message)
if not partInfo or partInfo['part_id'] != fileInfo['part_id']:
continue
partMessages[int(partInfo['part']) - 1] = DownloadInfo(dialog, msg, partInfo)
if any(part is None for part in partMessages):
raise click.ClickException('Missing some parts')
partMessages.sort(key=lambda dInfo: int(dInfo.part_info['part']))
for part in partMessages:
await download_part(client, dest, part)
click.echo(f"Part {part.part_info['part']} ready", err=True)
@click.group()
@click.option('--session-file', default='.telegram-session', show_default=True,
help='File where telegram session information will be stored (unique for each account)')
@click.option('--app-config', default='.telecup.cfg', show_default=True, help='App configuration file')
@click.option('--dialog', default='me', help='Name of the conversation, where files will be stored [default: Saved Messages]')
@pass_config
@coro
async def cli(config: Config, session_file, app_config, dialog):
config.session_file = session_file
config.dialog = dialog
APP_HASH, APP_ID, proxy = retrieve_app_hash(app_config)
if not proxy:
client = TelegramClient(session_file, APP_ID, APP_HASH)
else:
client = TelegramClient(session_file, APP_ID, APP_HASH,
connection=connection.ConnectionTcpMTProxyAbridged,
proxy=proxy)
config.client = client
@cli.command()
@pass_config
@coro
async def login(config: Config):
"""
Log into your telegram account interactively and save the login information to session file
"""
await config.client.start()
me = await config.client.get_me()
click.echo(f'You are logged in as {me.first_name}')
click.echo('To switch user: either pass another session-file or remove existing')
@cli.command(name='list')
@pass_config
@coro
async def list_files(config: Config):
"""
List all files, uploaded to your account by TeleCup
"""
await check_logged_in(config)
fileMessages = config.client.iter_messages(config.dialog, search='#telecup_file')
async for msg in fileMessages:
fileInfo = parse_message(msg.message)
if not fileInfo:
continue
timestamp = int(fileInfo['created_at'])
createdAtFormatted = datetime.datetime.utcfromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M')
click.echo(f"{fileInfo['name']}\t{format_file_size(int(fileInfo['real_size']))}\t{createdAtFormatted}")
@cli.command()
@click.option('--part-size', default='1g', show_default=True, help='Maximum size of a single file')
@click.argument('filename')
@click.argument('stream_size')
@pass_config
@coro
async def upload(config: Config, part_size, filename, stream_size):
"""
Upload a new file to your account
"""
source = sys.stdin.buffer
estimatedBytes = parse_file_size(stream_size)
partSize = parse_file_size(part_size)
if partSize > pow(2, 20) * 1536:
raise click.ClickException('Part size must be less than 1.5 Gib')
await check_logged_in(config)
nameHash = hashlib.sha256(filename.encode('utf-8')).hexdigest()
existingMessages = await config.client.get_messages(config.dialog, search=f'#telecup_file_{nameHash}')
if len(existingMessages) > 0:
raise click.ClickException(f'File with name {filename} already exists in dialog {config.dialog}')
partId = '%030x' % random.randrange(pow(16, 32))
uploadInfo = await upload_file(config.client, source, estimatedBytes, partId, config.dialog, partSize)
await config.client.send_message(config.dialog, textwrap.dedent(f'''
{MESSAGE_BLOCK_START}
{MESSAGE_HEADER}
#telecup_file #telecup_file_{nameHash}
^name={filename}
^part_id={partId}
^real_size={uploadInfo.real_size}
^total_parts={len(uploadInfo.file_list)}
^created_at={int(time.time())}
{MESSAGE_BLOCK_END}
'''))
click.echo('OK')
await config.client.disconnect()
@cli.command()
@click.argument('filename')
@pass_config
@coro
async def download(config: Config, filename):
"""
Download an existing file from your account
"""
destination = sys.stdout.buffer
await check_logged_in(config)
try:
filename = filename.strip()
nameHash = hashlib.sha256(filename.encode('utf-8')).hexdigest()
fileMessages = await config.client.get_messages(config.dialog, search=f'#telecup_file_{nameHash}')
if len(fileMessages) == 0:
click.echo(f'`{filename}` not found', err=True)
return
msg = fileMessages[0]
fileInfo = parse_message(msg.message)
if fileInfo and fileInfo['name'] == filename:
await download_file(config.client, destination, fileInfo, config.dialog)
click.echo('OK', err=True)
else:
click.echo('Message FileInfo corrupt', err=True)
finally:
await config.client.disconnect()
if __name__ == '__main__':
cli()