from os.path import dirname, basename, join, abspath as path from time import time from datetime import datetime as dt import click from urllib.parse import quote as url class Log: def __init__(self, path, name): self.path = path self.name = name def log(self, msg, name=None): if name is None: name = self.name now = dt.today() with open(self.path, 'a', encoding='utf-8') as file: file.write(f'{now.year}-{str(now.month).rjust(2, "0")}-{str(now.day).rjust(2, "0")} ' f'{str(now.hour).rjust(2, "0")}:{str(now.minute).rjust(2, "0")}:{str(now.second).rjust(2, "0")},{str(now.microsecond)[:3]}' ' | ' f'{name}' ' | ' f'{msg}\n') log = Log(join(dirname(__file__), 'hfm.log'), 'hfm-py') output = '' def huffman(data): units = {} # getting element-wise info for c in data: if c in units: units[c] += 1 else: units[c] = 1 codes = dict.fromkeys(units.keys(), '') units = sorted([([u], units[u]) for u in units], key=lambda u: u[1]) while units: # creating Haffman table if len(units) > 2: b = int(units[0][1] + units[1][1] > units[1][1] + units[2][1]) else: b = 0 for c in units[b][0]: codes[c] = '0' + codes[c] for c in units[1 + b][0]: codes[c] = '1' + codes[c] units[2 * b] = units[b][0] + units[1 + b][0], units[b][1] + units[1 + b][1] if len(units) > 2: del units[1] units.sort(key=lambda u: u[1]) else: del units break return codes def shichiro_encode(table): table = ';'.join([f'{k};{table[k]}' for k in table]).split(';') bits = '' for i in range(len(table)): if i % 2: code = table[i] bits += bin(len(code))[2:].rjust(8,'0') bits += code else: bits += bin(int(table[i]))[2:].rjust(8, '0') continue return bits def shichiro_decode(byts): bits=''.join(byts) dec = [] table = {} stack = '' i = 0 c = 0 while i < len(bits) and len(bits) - i >= 8: if c % 2 == 0: dec.append(bits[i:i+8]) i += 8 c += 1 continue bitlen = int(bits[i:i+8], 2) i += 8 dec.append(bits[i:i+bitlen]) i += bitlen c += 1 for i in range(0, len(dec), 2): table[dec[i + 1]] = int(dec[i], 2) return table def compress_file(filename): global log, output log.log(f"Loading '{filename}'...") with open(filename, 'rb') as file: # get data data = list(map(int, file.read())) log.log(f'Original size: {len(data)} bytes.') log.log('Creating Huffman table...') hf = huffman(data) table = shichiro_encode(hf) log.log('Embedding Huffman table...') tablen = bin(len(table))[2:] # embed the table bits = '' bitlen = bin(len(tablen))[2:] while len(bitlen) > 7: bits += '1' + bitlen[:7] bitlen = bitlen[7:] bits += bitlen.rjust(8, '0') + bin(len(bitlen))[2:].rjust(8, '0') + tablen + table log.log(f'Huffman table size: {len(bits)} bits.') log.log('Compressing...') for i in range(len(data)): # encode to Haffman bits += hf[data[i]] out = [] for i in range(0, len(bits), 8): out.append(int(bits[i:i+8].ljust(8, '0'), 2)) out.append(len(bits) % 8) log.log(f'Compressed size: {len(out)} bytes.') log.log(f"Saving to '{filename}.hfm'...") with open(f'{filename}.hfm', 'wb') as file: # save Haffman code file.write(bytes(out)) log.log('SUCCESSFULLY COMPRESSED') output += f'"origSize":{len(data)},' output += f'"compSize":{len(out)},' def decompress_file(filename): global log, output log.log(f"Loading '{filename}'...") with open(filename, 'rb') as file: # get data data = [bin(byte)[2:].rjust(8, '0') for byte in file.read()] os = len(data) data[-2] = data[-2][:int(data[-1], 2)] del data[-1] log.log('Extracting Huffman table...') bitlen = '' # extract the table i = 0 while 1: if data[i][0] == '1': bitlen += data[i][1:] else: bitlen += data[i][int(data[i + 1], 2):] break i += 1 del data[:i + 2] data = ''.join(data) bitlen = int(bitlen, 2) tablen = int(data[:bitlen], 2) table = shichiro_decode(data[bitlen:tablen+bitlen]) data = data[bitlen+tablen:] stack = '' out = [] log.log('Decompressing...') for c in data: # decode Haffman stack += c if stack in table: out.append(int(table[stack])) stack = '' if filename[-4:] == '.hfm': filename = filename[:-4] log.log(f"Saving to '{filename}'...") with open(f'{filename}', 'wb') as file: # save decoded data file.write(bytes(out)) log.log(f'SUCCESSFULLY DECOMPRESSED') output += f'"compSize":{os},' output += f'"origSize":{len(out)},' @click.command(options_metavar='[-c / -d]') @click.argument('files', nargs=-1, metavar='') @click.option('-c/-d', 'comp', default=True, help='Compress/decompress mode selectors.') def CLI(files, comp): global log, output log.log(f'hfm {"-c" * comp}{"-d" * (not comp)} "' + "\" \"".join(files) + '"') for file in files: output += '{' stime = time() if comp: try: compress_file(path(file)) wtime = time() - stime output += '"status":true,' output += f'"time":{round(wtime, 3)},' output += f'"dlink":"./files/{url(basename(file)) + ".hfm"}"' except Exception as e: output += f'"status":false' else: try: decompress_file(path(file)) wtime = time() - stime output += '"status":true,' output += f'"time":{round(wtime, 3)},' output += f'"dlink":"./files/{url(basename(file)[:-4])}"' except Exception as e: output += f'"status":false' output += '}' print(output) if __name__ == '__main__': CLI()