mirror of
https://github.com/HackTricks-wiki/hacktricks-cloud.git
synced 2025-12-12 15:50:19 -08:00
Recreating repository history for branch master
This commit is contained in:
381
scripts/translator.py
Normal file
381
scripts/translator.py
Normal file
@@ -0,0 +1,381 @@
|
||||
import argparse
|
||||
import os
|
||||
from openai import OpenAI #pip3 install openai
|
||||
import time
|
||||
import shutil
|
||||
import tempfile
|
||||
import subprocess
|
||||
import sys
|
||||
import tiktoken
|
||||
import concurrent.futures
|
||||
from tqdm import tqdm #pip3 install tqdm
|
||||
import traceback
|
||||
|
||||
|
||||
|
||||
MASTER_BRANCH = "master"
|
||||
VERBOSE = True
|
||||
MAX_TOKENS = 10000 #gpt-4-1106-preview
|
||||
|
||||
def reportTokens(prompt, model):
|
||||
encoding = tiktoken.encoding_for_model(model)
|
||||
# print number of tokens in light gray, with first 50 characters of prompt in green. if truncated, show that it is truncated
|
||||
#print("\033[37m" + str(len(encoding.encode(prompt))) + " tokens\033[0m" + " in prompt: " + "\033[92m" + prompt[:50] + "\033[0m" + ("..." if len(prompt) > 50 else ""))
|
||||
return len(encoding.encode(prompt))
|
||||
|
||||
|
||||
def check_git_dir(path):
|
||||
if os.path.isdir(os.path.join(path, '.git')):
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_branch_files(branch):
|
||||
"""Get a list of all files in a branch."""
|
||||
command = f"git ls-tree -r --name-only {branch}"
|
||||
result = subprocess.run(command.split(), stdout=subprocess.PIPE)
|
||||
files = result.stdout.decode().splitlines()
|
||||
return set(files)
|
||||
|
||||
def delete_unique_files(branch):
|
||||
"""Delete files that are unique to branch2."""
|
||||
# Get the files in each branch
|
||||
files_branch1 = get_branch_files(MASTER_BRANCH)
|
||||
files_branch2 = get_branch_files(branch)
|
||||
|
||||
# Find the files that are in branch2 but not in branch1
|
||||
unique_files = files_branch2 - files_branch1
|
||||
|
||||
if unique_files:
|
||||
# Switch to the second branch
|
||||
subprocess.run(["git", "checkout", branch])
|
||||
|
||||
# Delete the unique files from the second branch
|
||||
for file in unique_files:
|
||||
subprocess.run(["git", "rm", file])
|
||||
|
||||
subprocess.run(["git", "checkout", MASTER_BRANCH])
|
||||
|
||||
print(f"[+] Deleted {len(unique_files)} files from branch: {branch}")
|
||||
|
||||
|
||||
def cp_translation_to_repo_dir_and_check_gh_branch(branch, temp_folder, translate_files):
|
||||
branch_exists = subprocess.run(['git', 'show-ref', '--verify', '--quiet', 'refs/heads/' + branch])
|
||||
# If branch doesn't exist, create it
|
||||
if branch_exists.returncode != 0:
|
||||
subprocess.run(['git', 'checkout', '-b', branch])
|
||||
else:
|
||||
subprocess.run(['git', 'checkout', branch])
|
||||
|
||||
# Walk through source directory
|
||||
for dirpath, dirnames, filenames in os.walk(temp_folder):
|
||||
# Compute destination path
|
||||
dest_path = os.path.join(os.getcwd(), os.path.relpath(dirpath, temp_folder))
|
||||
|
||||
# Create directory structure in destination, if not already present
|
||||
if not os.path.exists(dest_path):
|
||||
os.makedirs(dest_path)
|
||||
|
||||
# Copy each file from source to destination
|
||||
for file_name in filenames:
|
||||
src_file = os.path.join(dirpath, file_name)
|
||||
shutil.copy2(src_file, dest_path)
|
||||
|
||||
print(f"Translated files copied to branch: {branch}")
|
||||
|
||||
if translate_files:
|
||||
subprocess.run(['git', 'add', "-A"])
|
||||
subprocess.run(['git', 'commit', '-m', f"Translated {translate_files} to {branch}"[:72]])
|
||||
subprocess.run(['git', 'checkout', MASTER_BRANCH])
|
||||
print("Commit created and moved to master branch")
|
||||
else:
|
||||
print("No commiting anything, leaving in language branch")
|
||||
|
||||
|
||||
def translate_text(language, text, file_path, model, cont=0, slpitted=False, client=None):
|
||||
if not text:
|
||||
return text
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a professional hacker, translator and writer. You write everything super clear and as concise as possible without loosing information. Do not return invalid Unicode output."},
|
||||
{"role": "system", "content": f"The following is content from a hacking book about hacking techiques. The following content is from the file {file_path}. Translate the relevant English text to {language} and return the translation keeping excatly the same markdown and html syntax. Do not translate things like code, hacking technique names, hacking word, cloud/SaaS platform names (like Workspace, aws, gcp...), the word 'leak', pentesting, and markdown tags. Also don't add any extra stuff apart from the translation and markdown syntax."},
|
||||
{"role": "user", "content": text},
|
||||
]
|
||||
try:
|
||||
response = client.chat.completions.create(
|
||||
model=model,
|
||||
messages=messages,
|
||||
temperature=0
|
||||
)
|
||||
except Exception as e:
|
||||
print("Python Exception: " + str(e))
|
||||
if cont > 6:
|
||||
print(f"Page {file_path} could not be translated due to count with text: {text}\nReturning text as is.")
|
||||
return text
|
||||
if "exceeded your current quota" in str(e).lower():
|
||||
print("Critical error: Quota exceeded")
|
||||
exit(1)
|
||||
|
||||
if "is currently overloaded" in str(e).lower():
|
||||
print("Overloaded, waiting 30 seconds")
|
||||
time.sleep(30)
|
||||
|
||||
elif "timeout" in str(e).lower():
|
||||
print("Timeout, waiting 30 seconds")
|
||||
cont += 1
|
||||
time.sleep(30)
|
||||
|
||||
elif "rate limit" in str(e).lower():
|
||||
print("Rate limit, waiting 60 seconds")
|
||||
cont += 1
|
||||
time.sleep(60)
|
||||
|
||||
elif "maximum context length" in str(e).lower() or "generated invalid unicode output" in str(e).lower():
|
||||
if "maximum context length" in str(e).lower():
|
||||
print("Maximum context length, splitting text in two and translating separately")
|
||||
|
||||
elif "generated invalid unicode output" in str(e).lower():
|
||||
print("Invalid unicode error detected.")
|
||||
|
||||
if slpitted:
|
||||
#print(f"Page {file_path} could not be translated with text: {text}")
|
||||
print(f"Page {file_path} could not be translated.\nReturning text as is.")
|
||||
return text
|
||||
|
||||
text1 = text.split('\n')[:len(text.split('\n'))//2]
|
||||
text2 = text.split('\n')[len(text.split('\n'))//2:]
|
||||
return translate_text(language, '\n'.join(text1), file_path, model, cont, False, client) + '\n' + translate_text(language, '\n'.join(text2), file_path, model, cont, True, client)
|
||||
|
||||
print("Retrying translation")
|
||||
return translate_text(language, text, file_path, model, cont, False, client)
|
||||
|
||||
response_message = response.choices[0].message.content.strip()
|
||||
|
||||
# Sometimes chatgpt modified the number of "#" at the beginning of the text, so we need to fix that. This is specially important for the first line of the MD that mucst have only 1 "#"
|
||||
cont2 = 0
|
||||
while (text.startswith('# ') and not response_message[cont2:].startswith('# ')):
|
||||
cont2 += 1
|
||||
if cont2 > 3:
|
||||
cont2 = 0
|
||||
print(f"Error with initial '#', something went wrong, recheck: {response_message[:30]}")
|
||||
break
|
||||
|
||||
response_message = response_message[cont2:]
|
||||
|
||||
return response_message
|
||||
|
||||
|
||||
def split_text(text, model):
|
||||
global MAX_TOKENS
|
||||
lines = text.split('\n')
|
||||
chunks = []
|
||||
chunk = ''
|
||||
in_code_block = False
|
||||
|
||||
for line in lines:
|
||||
# If we are in a code block, just add the code to the chunk
|
||||
if line.startswith('```'):
|
||||
|
||||
# If we are in a code block, finish it with the "```"
|
||||
if in_code_block:
|
||||
chunk += line + '\n'
|
||||
|
||||
in_code_block = not in_code_block
|
||||
chunks.append(chunk.strip())
|
||||
chunk = ''
|
||||
|
||||
# If a code block is started, add the "```" to the chunk
|
||||
if in_code_block:
|
||||
chunk += line + '\n'
|
||||
|
||||
continue
|
||||
|
||||
|
||||
if (line.startswith('#') and reportTokens(chunk + "\n" + line.strip(), model) > MAX_TOKENS*0.8) or \
|
||||
reportTokens(chunk + "\n" + line.strip(), model) > MAX_TOKENS:
|
||||
|
||||
chunks.append(chunk.strip())
|
||||
chunk = ''
|
||||
|
||||
chunk += line.strip() + '\n'
|
||||
|
||||
chunks.append(chunk.strip())
|
||||
return chunks
|
||||
|
||||
|
||||
def copy_gitbook_dir(source_path, dest_path):
|
||||
folder_name = ".gitbook/"
|
||||
source_folder = os.path.join(source_path, folder_name)
|
||||
destination_folder = os.path.join(dest_path, folder_name)
|
||||
if not os.path.exists(source_folder):
|
||||
print(f"Error: {source_folder} does not exist.")
|
||||
else:
|
||||
# Copy the .gitbook folder
|
||||
shutil.copytree(source_folder, destination_folder)
|
||||
print(f"Copied .gitbook folder from {source_folder} to {destination_folder}")
|
||||
|
||||
def copy_summary(source_path, dest_path):
|
||||
file_name = "SUMMARY.md"
|
||||
source_filepath = os.path.join(source_path, file_name)
|
||||
dest_filepath = os.path.join(dest_path, file_name)
|
||||
shutil.copy2(source_filepath, dest_filepath)
|
||||
print("[+] Copied SUMMARY.md")
|
||||
|
||||
def translate_file(language, file_path, file_dest_path, model, client):
|
||||
global VERBOSE
|
||||
|
||||
if file_path.endswith('SUMMARY.md'):
|
||||
return
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
content_chunks = split_text(content, model)
|
||||
|
||||
translated_content = ''
|
||||
start_time = time.time()
|
||||
for chunk in content_chunks:
|
||||
# Don't trasnlate code blocks
|
||||
if chunk.startswith('```'):
|
||||
translated_content += chunk + '\n'
|
||||
else:
|
||||
translated_content += translate_text(language, chunk, file_path, model, cont=0, slpitted=False, client=client) + '\n'
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
# make sure directory exists
|
||||
os.makedirs(os.path.dirname(file_dest_path), exist_ok=True)
|
||||
with open(file_dest_path, 'w', encoding='utf-8') as f:
|
||||
f.write(translated_content)
|
||||
|
||||
#if VERBOSE:
|
||||
print(f"Page {file_path} translated in {elapsed_time:.2f} seconds")
|
||||
|
||||
|
||||
def translate_directory(language, source_path, dest_path, model, num_threads, client):
|
||||
all_markdown_files = []
|
||||
for subdir, dirs, files in os.walk(source_path):
|
||||
for file in files:
|
||||
if file.endswith('.md') and file != "SUMMARY.md":
|
||||
source_filepath = os.path.join(subdir, file)
|
||||
dest_filepath = os.path.join(dest_path, os.path.relpath(source_filepath, source_path))
|
||||
all_markdown_files.append((source_filepath, dest_filepath))
|
||||
|
||||
print(f"Translating {len(all_markdown_files)} files")
|
||||
|
||||
#with tqdm(total=len(all_markdown_files), desc="Translating Files") as pbar:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = []
|
||||
for source_filepath, dest_filepath in all_markdown_files:
|
||||
if os.path.exists(dest_filepath):
|
||||
continue
|
||||
os.makedirs(os.path.dirname(dest_filepath), exist_ok=True)
|
||||
future = executor.submit(translate_file, language, source_filepath, dest_filepath, model, client)
|
||||
futures.append(future)
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
future.result()
|
||||
#pbar.update()
|
||||
except Exception as exc:
|
||||
tb = traceback.format_exc()
|
||||
print(f'Translation generated an exception: {exc}')
|
||||
print("Traceback:", tb)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("- Version 1.1.1")
|
||||
# Set up argparse
|
||||
parser = argparse.ArgumentParser(description='Translate gitbook and copy to a new branch.')
|
||||
parser.add_argument('-d', '--directory', action='store_true', help='Translate a full directory.')
|
||||
parser.add_argument('-l', '--language', required=True, help='Target language for translation.')
|
||||
parser.add_argument('-b', '--branch', required=True, help='Branch name to copy translated files.')
|
||||
parser.add_argument('-k', '--api-key', required=True, help='API key to use.')
|
||||
parser.add_argument('-m', '--model', default="gpt-4o-mini", help='The openai model to use. By default: gpt-4o-mini')
|
||||
parser.add_argument('-o', '--org-id', help='The org ID to use (if not set the default one will be used).')
|
||||
parser.add_argument('-f', '--file-paths', help='If this is set, only the indicated files will be translated (" , " separated).')
|
||||
parser.add_argument('-n', '--dont-cd', action='store_false', help="If this is true, the script won't change the current directory.")
|
||||
parser.add_argument('-t', '--threads', default=5, type=int, help="Number of threads to use to translate a directory.")
|
||||
#parser.add_argument('-v', '--verbose', action='store_false', help="Get the time it takes to translate each page.")
|
||||
args = parser.parse_args()
|
||||
|
||||
source_folder = os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0])))
|
||||
dest_folder = tempfile.mkdtemp()
|
||||
language = args.language.capitalize()
|
||||
branch = args.branch
|
||||
model = args.model
|
||||
org_id = args.org_id
|
||||
num_threads = args.threads
|
||||
#VERBOSE = args.verbose
|
||||
|
||||
client = OpenAI(
|
||||
api_key=args.api_key,
|
||||
organization=org_id
|
||||
)
|
||||
|
||||
# Start with the current directory.
|
||||
current_dir = os.getcwd()
|
||||
|
||||
# Check if model is gpt-3.5
|
||||
if "gpt-3.5" in model:
|
||||
MAX_TOKENS = 2000
|
||||
|
||||
# Check the current directory
|
||||
if check_git_dir(current_dir):
|
||||
print('Found .git directory in current directory: ' + current_dir)
|
||||
else:
|
||||
# Check the parent directory
|
||||
parent_dir = os.path.dirname(current_dir)
|
||||
if check_git_dir(parent_dir):
|
||||
print('Found .git directory in parent directory: ' + parent_dir)
|
||||
|
||||
# Change the current working directory to the parent directory
|
||||
os.chdir(parent_dir)
|
||||
print('Current working directory has been changed to: ' + os.getcwd())
|
||||
else:
|
||||
print('No .git directory found in current or parent directory. Exiting.')
|
||||
exit(1)
|
||||
|
||||
current_dir = os.getcwd()
|
||||
print(f"The translated files will be copied to {current_dir}, make sure this is the expected folder.")
|
||||
|
||||
if not args.dont_cd:
|
||||
# Change to the parent directory
|
||||
os.chdir(source_folder)
|
||||
|
||||
translate_files = None # Need to initialize it here to avoid error
|
||||
if args.file_paths:
|
||||
# Translate only the indicated file
|
||||
translate_files = [f for f in args.file_paths.split(' , ') if f]
|
||||
for file_path in translate_files:
|
||||
#with tqdm(total=len(all_markdown_files), desc="Translating Files") as pbar:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = []
|
||||
future = executor.submit(translate_file, language, file_path, os.path.join(dest_folder, file_path), model, client)
|
||||
futures.append(future)
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
try:
|
||||
future.result()
|
||||
#pbar.update()
|
||||
except Exception as exc:
|
||||
print(f'Translation generated an exception: {exc}')
|
||||
|
||||
# Delete possibly removed files from the master branch
|
||||
delete_unique_files(branch)
|
||||
|
||||
elif args.directory:
|
||||
# Translate everything
|
||||
translate_directory(language, source_folder, dest_folder, model, num_threads, client)
|
||||
|
||||
else:
|
||||
print("You need to indicate either a directory or a list of files to translate.")
|
||||
exit(1)
|
||||
|
||||
# Copy summary
|
||||
copy_summary(source_folder, dest_folder)
|
||||
|
||||
# Copy .gitbook folder
|
||||
copy_gitbook_dir(source_folder, dest_folder)
|
||||
|
||||
# Create the branch and copy the translated files
|
||||
cp_translation_to_repo_dir_and_check_gh_branch(branch, dest_folder, translate_files)
|
||||
Reference in New Issue
Block a user