More precize control over existing parts

This commit is contained in:
Egor Aristov 2020-06-10 08:35:54 +03:00
parent 5dca2487cd
commit bf01c21a01

View File

@ -3,6 +3,7 @@ import datetime
import hashlib import hashlib
import math import math
import os import os
import random
import sys import sys
import textwrap import textwrap
import time import time
@ -116,7 +117,7 @@ async def check_logged_in(config: Config):
raise click.ClickException('You are not authorized. Please log in first') raise click.ClickException('You are not authorized. Please log in first')
async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedStreamSize: int, nameHash: str, dialog: str, maxFileSize): async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedStreamSize: int, partId: str, dialog: str, maxFileSize):
fileId = generate_random_long() fileId = generate_random_long()
chunkSize = pow(2, 19) chunkSize = pow(2, 19)
maxChunksInFile = maxFileSize / chunkSize maxChunksInFile = maxFileSize / chunkSize
@ -130,8 +131,8 @@ async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedS
currentFileBytesWritten = 0 currentFileBytesWritten = 0
realBytesWritten = 0 realBytesWritten = 0
nextBuffer = source.read(chunkSize) buffer = source.read(chunkSize)
if not len(nextBuffer): if not len(buffer):
raise click.ClickException('Input stream is empty') raise click.ClickException('Input stream is empty')
currentLoop = asyncio.get_running_loop() currentLoop = asyncio.get_running_loop()
currentExecutor = ThreadPoolExecutor(max_workers=1) currentExecutor = ThreadPoolExecutor(max_workers=1)
@ -141,7 +142,6 @@ async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedS
lastRealTimeMeasurement = time.time() lastRealTimeMeasurement = time.time()
while True: while True:
buffer = nextBuffer
bufLen = len(buffer) bufLen = len(buffer)
if not bufLen: if not bufLen:
break break
@ -154,14 +154,14 @@ async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedS
fileId, currentFileChunkPos, currentFileTotalChunks, buffer fileId, currentFileChunkPos, currentFileTotalChunks, buffer
)) ))
taskGetNextBuffer = currentLoop.run_in_executor(currentExecutor, readNextPortion, chunkSize) taskGetNextBuffer = currentLoop.run_in_executor(currentExecutor, readNextPortion, chunkSize)
clientResponse, nextBuffer = await asyncio.gather(taskSend, taskGetNextBuffer, loop=currentLoop, return_exceptions=False) clientResponse, buffer = await asyncio.gather(taskSend, taskGetNextBuffer, loop=currentLoop, return_exceptions=False)
currentFileChunkPos += 1 currentFileChunkPos += 1
currentFileBytesWritten += bufLen currentFileBytesWritten += bufLen
realBytesWritten += bufLen realBytesWritten += bufLen
if currentFileChunkPos % 10 == 0: if currentFileChunkPos % 20 == 0:
click.echo(f'{format_file_size(realBytesWritten)}b {(time.time() - lastRealTimeMeasurement):.3f}s \t ', nl=False) click.echo(f'{format_file_size(realBytesWritten)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t', nl=False)
lastRealTimeMeasurement = time.time() # lastRealTimeMeasurement = time.time()
if bufLen < chunkSize: if bufLen < chunkSize:
break break
@ -173,8 +173,8 @@ async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedS
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f''' await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
{MESSAGE_BLOCK_START} {MESSAGE_BLOCK_START}
{MESSAGE_HEADER} {MESSAGE_HEADER}
#telecup_part #telecup_part_{nameHash} #telecup_part #telecup_part_{partId}
^name_hash={nameHash} ^part_id={partId}
^part={len(inputFiles) + 1} ^part={len(inputFiles) + 1}
^real_size={currentFileBytesWritten} ^real_size={currentFileBytesWritten}
{MESSAGE_BLOCK_END} {MESSAGE_BLOCK_END}
@ -203,8 +203,8 @@ async def upload_file(client: TelegramClient, source: typing.BinaryIO, expectedS
await client.send_file(dialog, readyFile, caption=textwrap.dedent(f''' await client.send_file(dialog, readyFile, caption=textwrap.dedent(f'''
{MESSAGE_BLOCK_START} {MESSAGE_BLOCK_START}
{MESSAGE_HEADER} {MESSAGE_HEADER}
#telecup_part #telecup_part_{nameHash} #telecup_part #telecup_part_{partId}
^name_hash={nameHash} ^part_id={partId}
^part={len(inputFiles) + 1} ^part={len(inputFiles) + 1}
^real_size={currentFileBytesWritten} ^real_size={currentFileBytesWritten}
{MESSAGE_BLOCK_END} {MESSAGE_BLOCK_END}
@ -218,6 +218,8 @@ async def download_part(client: TelegramClient, dest: typing.BinaryIO, dInfo: Do
chunkSize = pow(2, 20) chunkSize = pow(2, 20)
realSize = int(dInfo.part_info['real_size']) realSize = int(dInfo.part_info['real_size'])
totalBytesDownloaded = 0 totalBytesDownloaded = 0
lastRealTimeMeasurement = time.time()
while totalBytesDownloaded < realSize: while totalBytesDownloaded < realSize:
offset = totalBytesDownloaded offset = totalBytesDownloaded
limit = chunkSize if offset + chunkSize < realSize else realSize - offset limit = chunkSize if offset + chunkSize < realSize else realSize - offset
@ -241,15 +243,19 @@ async def download_part(client: TelegramClient, dest: typing.BinaryIO, dInfo: Do
buffer = buffer[:-extraBytes] buffer = buffer[:-extraBytes]
dest.write(buffer) dest.write(buffer)
click.echo(f"Part {dInfo.part_info['part']}: {format_file_size(totalBytesDownloaded)} ready", err=True) if totalBytesDownloaded % (pow(2, 20) * 10) == 0:
click.echo(
f"{dInfo.part_info['part']}p {format_file_size(totalBytesDownloaded)}b {(time.time() - lastRealTimeMeasurement):.3f}s\t",
nl=False, err=True
)
async def download_file(client: TelegramClient, dest: typing.BinaryIO, fileInfo: dict, dialog: str): async def download_file(client: TelegramClient, dest: typing.BinaryIO, fileInfo: dict, dialog: str):
partMessagesSearchResults = client.iter_messages(dialog, search=f"#telecup_part_{fileInfo['name_hash']}") partMessagesSearchResults = client.iter_messages(dialog, search=f"#telecup_part_{fileInfo['part_id']}")
partMessages: typing.List[typing.Optional[DownloadInfo]] = [None] * int(fileInfo['total_parts']) partMessages: typing.List[typing.Optional[DownloadInfo]] = [None] * int(fileInfo['total_parts'])
async for msg in partMessagesSearchResults: async for msg in partMessagesSearchResults:
partInfo = parse_message(msg.message) partInfo = parse_message(msg.message)
if not partInfo or partInfo['name_hash'] != fileInfo['name_hash']: if not partInfo or partInfo['part_id'] != fileInfo['part_id']:
continue continue
partMessages[int(partInfo['part']) - 1] = DownloadInfo(msg, partInfo) partMessages[int(partInfo['part']) - 1] = DownloadInfo(msg, partInfo)
if any(part is None for part in partMessages): if any(part is None for part in partMessages):
@ -334,13 +340,14 @@ async def upload(config: Config, part_size, filename, stream_size):
if len(existingMessages) > 0: if len(existingMessages) > 0:
raise click.ClickException(f'File with name {filename} already exists in dialog {config.dialog}') raise click.ClickException(f'File with name {filename} already exists in dialog {config.dialog}')
uploadInfo = await upload_file(config.client, source, estimatedBytes, nameHash, config.dialog, partSize) partId = '%030x' % random.randrange(pow(16, 32))
uploadInfo = await upload_file(config.client, source, estimatedBytes, partId, config.dialog, partSize)
await config.client.send_message(config.dialog, textwrap.dedent(f''' await config.client.send_message(config.dialog, textwrap.dedent(f'''
{MESSAGE_BLOCK_START} {MESSAGE_BLOCK_START}
{MESSAGE_HEADER} {MESSAGE_HEADER}
#telecup_file #telecup_file_{nameHash} #telecup_file #telecup_file_{nameHash}
^name={filename} ^name={filename}
^name_hash={nameHash} ^part_id={partId}
^real_size={uploadInfo.real_size} ^real_size={uploadInfo.real_size}
^total_parts={len(uploadInfo.file_list)} ^total_parts={len(uploadInfo.file_list)}
^created_at={int(time.time())} ^created_at={int(time.time())}