390 lines
15 KiB
Python
390 lines
15 KiB
Python
import asyncio
|
|
import datetime
|
|
import hashlib
|
|
import math
|
|
import os
|
|
import random
|
|
import sys
|
|
import textwrap
|
|
import time
|
|
import typing
|
|
from collections import namedtuple
|
|
from concurrent.futures.thread import ThreadPoolExecutor
|
|
from functools import wraps
|
|
|
|
import click
|
|
from telethon.errors import BadRequestError
|
|
from telethon.helpers import generate_random_long
|
|
from telethon import TelegramClient, connection
|
|
from telethon.tl.custom.message import Message
|
|
from telethon.tl.functions.upload import SaveBigFilePartRequest
|
|
from telethon.tl.functions.upload import GetFileRequest
|
|
from telethon.tl.types import InputFileBig
|
|
from telethon.tl.types.upload import File
|
|
from telethon.utils import get_input_location
|
|
|
|
MESSAGE_BLOCK_START = '_cup_start_'
|
|
MESSAGE_HEADER = f'TeleCup File Uploader https://lnurl.ru/telecup'
|
|
MESSAGE_BLOCK_END = '_cup_end_'
|
|
|
|
UploadInfo = namedtuple('UploadInfo', ['file_list', 'real_size'])
|
|
DownloadInfo = typing.NamedTuple('DownloadInfo', [('message', Message), ('part_info', dict)])
|
|
|
|
|
|
class Config:
|
|
def __init__(self):
|
|
self.session_file = '.telegram-session'
|
|
self.dialog = 'me'
|
|
self.client: typing.Optional[TelegramClient] = None
|
|
|
|
|
|
pass_config = click.make_pass_decorator(Config, ensure=True)
|
|
|
|
|
|
def coro(f):
|
|
@wraps(f)
|
|
def wrapper(*args, **kwargs):
|
|
currentLoop = asyncio.get_event_loop()
|
|
return currentLoop.run_until_complete(f(*args, **kwargs))
|
|
return wrapper
|
|
|
|
|
|
def parse_file_size(humanString: str):
|
|
humanString = humanString.strip()
|
|
units = {"k": pow(2, 10), "m": pow(2, 20), "g": pow(2, 30)}
|
|
for unit in units.keys():
|
|
if humanString.lower().endswith(unit):
|
|
return int(float(humanString[:-1]) * units[unit])
|
|
return int(humanString)
|
|
|
|
|
|
def format_file_size(bytesCount: int):
|
|
for unit in ['', 'K', 'M', 'G']:
|
|
if bytesCount < pow(2, 10):
|
|
return '%3.2f%s' % (bytesCount, unit)
|
|
bytesCount /= pow(2, 10)
|
|
return '%.2f%s' % (bytesCount, 'T')
|
|
|
|
|
|
def parse_message(caption: str):
|
|
if MESSAGE_BLOCK_START not in caption or MESSAGE_BLOCK_END not in caption:
|
|
return None
|
|
blockStartPos = caption.index(MESSAGE_BLOCK_START)
|
|
blockEndPos = caption.index(MESSAGE_BLOCK_END)
|
|
block = caption[blockStartPos + len(MESSAGE_BLOCK_START):blockEndPos]
|
|
fileInfo = dict()
|
|
for line in block.split('\n'):
|
|
if len(line) > 0 and line[0] == '^':
|
|
name, value = line[1:].split('=')
|
|
fileInfo[name.strip()] = value.strip()
|
|
return fileInfo
|
|
|
|
|
|
def retrieve_app_hash(app_config):
|
|
app_id = None
|
|
app_hash = None
|
|
proxy = None
|
|
config_path = os.path.realpath(app_config)
|
|
if os.path.isfile(config_path):
|
|
with open(config_path, 'r') as f:
|
|
values = {line.split('=')[0].strip(): line.split('=')[1].strip() for line in f.readlines() if len(line.strip()) > 0}
|
|
if 'APP_ID' in values and 'APP_HASH' in values:
|
|
app_id = values['APP_ID']
|
|
app_hash = values['APP_HASH']
|
|
if 'PROXY_HOST' in values and 'PROXY_PORT' in values and 'PROXY_SECRET' in values:
|
|
proxy = (values['PROXY_HOST'], int(values['PROXY_PORT']), values['PROXY_SECRET'])
|
|
if not (app_id and app_hash):
|
|
click.echo('You have to create Telegram App configuration before using this tool')
|
|
click.echo('Please, visit https://my.telegram.org/apps page and create an app')
|
|
click.pause()
|
|
app_id = click.prompt('App id', type=click.INT)
|
|
app_hash = click.prompt('App hash').strip()
|
|
use_proxy = click.confirm('Use MTProto proxy? Required in some countries like Russia and Iran', default=False)
|
|
if use_proxy:
|
|
proxy_host = click.prompt('Proxy host')
|
|
proxy_port = click.prompt('Proxy port', type=click.INT)
|
|
proxy_secret = click.prompt('Proxy secret')
|
|
proxy = (proxy_host, proxy_port, proxy_secret)
|
|
with open(config_path, 'w') as f:
|
|
f.write(f'APP_ID={app_id}\nAPP_HASH={app_hash}\n')
|
|
if use_proxy:
|
|
f.write(f'PROXY_HOST={proxy_host}\nPROXY_PORT={proxy_port}\nPROXY_SECRET={proxy_secret}\n')
|
|
return app_hash, app_id, proxy
|
|
|
|
|
|
async def check_logged_in(config: Config):
|
|
await config.client.connect()
|
|
if not await config.client.is_user_authorized():
|
|
raise click.ClickException('You are not authorized. Please log in first')
|
|
|
|
|
|
async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedStreamSize: int, partId: str, dialog: str, maxFileSize):
|
|
fileId = generate_random_long()
|
|
chunkSize = pow(2, 19)
|
|
maxChunksInFile = maxFileSize / chunkSize
|
|
fillByte = 0x00
|
|
|
|
inputFiles = []
|
|
currentFileChunkPos = 0
|
|
currentFileTotalChunks = math.ceil(expectedStreamSize / chunkSize) \
|
|
if expectedStreamSize < maxFileSize \
|
|
else math.ceil(maxFileSize / chunkSize)
|
|
currentFileBytesWritten = 0
|
|
realBytesWritten = 0
|
|
|
|
buffer = source.read(chunkSize)
|
|
if not len(buffer):
|
|
raise click.ClickException('Input stream is empty')
|
|
currentLoop = asyncio.get_running_loop()
|
|
currentExecutor = ThreadPoolExecutor(max_workers=1)
|
|
|
|
def readNextPortion(readBytes=chunkSize):
|
|
return source.read(readBytes)
|
|
|
|
lastRealTimeMeasurement = time.time()
|
|
while True:
|
|
bufLen = len(buffer)
|
|
if not bufLen:
|
|
break
|
|
if realBytesWritten + bufLen > expectedStreamSize:
|
|
raise click.ClickException('Stream is larger than expected file size')
|
|
if bufLen < chunkSize:
|
|
buffer = buffer + bytearray([fillByte] * (chunkSize - bufLen))
|
|
|
|
taskSend = client(SaveBigFilePartRequest(
|
|
fileId, currentFileChunkPos, currentFileTotalChunks, buffer
|
|
))
|
|
taskGetNextBuffer = currentLoop.run_in_executor(currentExecutor, readNextPortion, chunkSize)
|
|
clientResponse, buffer = await asyncio.gather(taskSend, taskGetNextBuffer, loop=currentLoop, return_exceptions=False)
|
|
|
|
currentFileChunkPos += 1
|
|
currentFileBytesWritten += bufLen
|
|
realBytesWritten += bufLen
|
|
if currentFileChunkPos % 20 == 0:
|
|
click.echo(f'{format_file_size(realBytesWritten)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t', nl=False)
|
|
# lastRealTimeMeasurement = time.time()
|
|
|
|
if bufLen < chunkSize:
|
|
break
|
|
|
|
if currentFileChunkPos == maxChunksInFile:
|
|
readyFile = InputFileBig(
|
|
fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin'
|
|
)
|
|
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
|
|
{MESSAGE_BLOCK_START}
|
|
{MESSAGE_HEADER}
|
|
#telecup_part #telecup_part_{partId}
|
|
^part_id={partId}
|
|
^part={len(inputFiles) + 1}
|
|
^real_size={currentFileBytesWritten}
|
|
{MESSAGE_BLOCK_END}
|
|
'''))
|
|
inputFiles.append(readyFile)
|
|
|
|
# Start a new file
|
|
fileId = generate_random_long()
|
|
currentFileTotalChunks = math.ceil((expectedStreamSize - realBytesWritten) / chunkSize) \
|
|
if (expectedStreamSize - realBytesWritten) < maxFileSize \
|
|
else math.ceil(maxFileSize / chunkSize)
|
|
currentFileChunkPos = 0
|
|
currentFileBytesWritten = 0
|
|
|
|
if currentFileBytesWritten > 0:
|
|
if currentFileChunkPos < currentFileTotalChunks:
|
|
buffer = bytes(bytearray([fillByte] * chunkSize))
|
|
for newChunkPos in range(currentFileChunkPos, currentFileTotalChunks):
|
|
await client(SaveBigFilePartRequest(
|
|
fileId, newChunkPos, currentFileTotalChunks, buffer
|
|
))
|
|
|
|
readyFile = InputFileBig(
|
|
fileId, currentFileTotalChunks, f'{int(time.time() * 1000)}.bin'
|
|
)
|
|
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
|
|
{MESSAGE_BLOCK_START}
|
|
{MESSAGE_HEADER}
|
|
#telecup_part #telecup_part_{partId}
|
|
^part_id={partId}
|
|
^part={len(inputFiles) + 1}
|
|
^real_size={currentFileBytesWritten}
|
|
{MESSAGE_BLOCK_END}
|
|
'''))
|
|
inputFiles.append(readyFile)
|
|
return UploadInfo(inputFiles, realBytesWritten)
|
|
|
|
|
|
async def download_part(client: TelegramClient, dest: typing.BinaryIO, dInfo: DownloadInfo):
|
|
dcId, inputFileLocation = get_input_location(dInfo.message)
|
|
chunkSize = pow(2, 20)
|
|
realSize = int(dInfo.part_info['real_size'])
|
|
totalBytesDownloaded = 0
|
|
lastRealTimeMeasurement = time.time()
|
|
|
|
while totalBytesDownloaded < realSize:
|
|
offset = totalBytesDownloaded
|
|
limit = chunkSize if offset + chunkSize < realSize else realSize - offset
|
|
if limit % pow(2, 12) != 0:
|
|
extraBytes = pow(2, 12) - limit % pow(2, 12)
|
|
limit += extraBytes
|
|
|
|
try:
|
|
downloadResult: File = await client(GetFileRequest(
|
|
inputFileLocation,
|
|
offset,
|
|
limit,
|
|
precise=False,
|
|
cdn_supported=False
|
|
))
|
|
except BadRequestError as e:
|
|
if 'expire' in str(e.message).lower():
|
|
click.echo('Reloading message...', err=True)
|
|
newMessage = await client.get_messages(ids=dInfo.message.id)
|
|
dcId, inputFileLocation = get_input_location(newMessage)
|
|
continue
|
|
buffer = downloadResult.bytes
|
|
bufLen = len(buffer)
|
|
totalBytesDownloaded += bufLen
|
|
|
|
if totalBytesDownloaded > realSize:
|
|
extraBytes = totalBytesDownloaded - realSize
|
|
buffer = buffer[:-extraBytes]
|
|
|
|
dest.write(buffer)
|
|
if totalBytesDownloaded % (pow(2, 20) * 10) == 0:
|
|
click.echo(
|
|
f"{dInfo.part_info['part']}p {format_file_size(totalBytesDownloaded)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t",
|
|
nl=False, err=True
|
|
)
|
|
|
|
|
|
async def download_file(client: TelegramClient, dest: typing.BinaryIO, fileInfo: dict, dialog: str):
|
|
partMessagesSearchResults = client.iter_messages(dialog, search=f"#telecup_part_{fileInfo['part_id']}")
|
|
partMessages: typing.List[typing.Optional[DownloadInfo]] = [None] * int(fileInfo['total_parts'])
|
|
async for msg in partMessagesSearchResults:
|
|
partInfo = parse_message(msg.message)
|
|
if not partInfo or partInfo['part_id'] != fileInfo['part_id']:
|
|
continue
|
|
partMessages[int(partInfo['part']) - 1] = DownloadInfo(msg, partInfo)
|
|
if any(part is None for part in partMessages):
|
|
raise click.ClickException('Missing some parts')
|
|
partMessages.sort(key=lambda dInfo: int(dInfo.part_info['part']))
|
|
for part in partMessages:
|
|
await download_part(client, dest, part)
|
|
click.echo(f"Part {part.part_info['part']} ready", err=True)
|
|
|
|
|
|
@click.group()
|
|
@click.option('--session-file', default='.telegram-session', show_default=True,
|
|
help='File where telegram session information will be stored (unique for each account)')
|
|
@click.option('--app-config', default='.telecup.cfg', show_default=True, help='App configuration file')
|
|
@click.option('--dialog', default='me', help='Name of the conversation, where files will be stored [default: Saved Messages]')
|
|
@pass_config
|
|
@coro
|
|
async def cli(config: Config, session_file, app_config, dialog):
|
|
config.session_file = session_file
|
|
config.dialog = dialog
|
|
|
|
APP_HASH, APP_ID, proxy = retrieve_app_hash(app_config)
|
|
|
|
if not proxy:
|
|
client = TelegramClient(session_file, APP_ID, APP_HASH)
|
|
else:
|
|
client = TelegramClient(session_file, APP_ID, APP_HASH,
|
|
connection=connection.ConnectionTcpMTProxyAbridged,
|
|
proxy=proxy)
|
|
config.client = client
|
|
|
|
|
|
@cli.command()
|
|
@pass_config
|
|
@coro
|
|
async def login(config: Config):
|
|
"""
|
|
Log into your telegram account interactively and save the login information to session file
|
|
"""
|
|
await config.client.start()
|
|
me = await config.client.get_me()
|
|
click.echo(f'You are logged in as {me.first_name}')
|
|
click.echo('To switch user: either pass another session-file or remove existing')
|
|
|
|
|
|
@cli.command(name='list')
|
|
@pass_config
|
|
@coro
|
|
async def list_files(config: Config):
|
|
"""
|
|
List all files, uploaded to your account by TeleCup
|
|
"""
|
|
await check_logged_in(config)
|
|
fileMessages = config.client.iter_messages(config.dialog, search='#telecup_file')
|
|
async for msg in fileMessages:
|
|
fileInfo = parse_message(msg.message)
|
|
if not fileInfo:
|
|
continue
|
|
timestamp = int(fileInfo['created_at'])
|
|
createdAtFormatted = datetime.datetime.utcfromtimestamp(timestamp).strftime('%d.%m.%Y %H:%M')
|
|
click.echo(f"{fileInfo['name']}\t{format_file_size(int(fileInfo['real_size']))}\t{createdAtFormatted}")
|
|
|
|
|
|
@cli.command()
|
|
@click.option('--part-size', default='1g', show_default=True, help='Maximum size of a single file')
|
|
@click.argument('filename')
|
|
@click.argument('stream_size')
|
|
@pass_config
|
|
@coro
|
|
async def upload(config: Config, part_size, filename, stream_size):
|
|
"""
|
|
Upload a new file to your account
|
|
"""
|
|
source = sys.stdin.buffer
|
|
estimatedBytes = parse_file_size(stream_size)
|
|
partSize = parse_file_size(part_size)
|
|
if partSize > pow(2, 20) * 1536:
|
|
raise click.ClickException('Part size must be less than 1.5 Gib')
|
|
await check_logged_in(config)
|
|
|
|
nameHash = hashlib.sha256(filename.encode('utf-8')).hexdigest()
|
|
existingMessages = await config.client.get_messages(config.dialog, search=f'#telecup_file_{nameHash}')
|
|
if len(existingMessages) > 0:
|
|
raise click.ClickException(f'File with name {filename} already exists in dialog {config.dialog}')
|
|
|
|
partId = '%030x' % random.randrange(pow(16, 32))
|
|
uploadInfo = await upload_file(config.client, source, estimatedBytes, partId, config.dialog, partSize)
|
|
await config.client.send_message(config.dialog, textwrap.dedent(f'''
|
|
{MESSAGE_BLOCK_START}
|
|
{MESSAGE_HEADER}
|
|
#telecup_file #telecup_file_{nameHash}
|
|
^name={filename}
|
|
^part_id={partId}
|
|
^real_size={uploadInfo.real_size}
|
|
^total_parts={len(uploadInfo.file_list)}
|
|
^created_at={int(time.time())}
|
|
{MESSAGE_BLOCK_END}
|
|
'''))
|
|
click.echo('OK')
|
|
await config.client.disconnect()
|
|
|
|
|
|
@cli.command()
|
|
@click.argument('filename')
|
|
@pass_config
|
|
@coro
|
|
async def download(config: Config, filename):
|
|
"""
|
|
Download an existing file from your account
|
|
"""
|
|
destination = sys.stdout.buffer
|
|
await check_logged_in(config)
|
|
|
|
fileMessages = config.client.iter_messages(config.dialog, search='#telecup_file')
|
|
async for msg in fileMessages:
|
|
fileInfo = parse_message(msg.message)
|
|
if fileInfo and (
|
|
fileInfo['name'] == filename.strip() or
|
|
fileInfo['name_hash'].startswith(filename.strip())
|
|
):
|
|
await download_file(config.client, destination, fileInfo, config.dialog)
|
|
click.echo('OK', err=True)
|
|
await config.client.disconnect()
|