pbnj

snippet #14

crack.py


#!/usr/bin/env python3
"""
==============================================================================
Ultimate D'Agapeyeff Cipher Solver (Parallel Memetic Algorithm)
==============================================================================
Author: Cryptography Research Community (Synthesized & Optimized)
Year: 2026

Architecture:
- Outer Loop: Genetic/Population representation across Multiprocessing workers.
- Inner Loop: Iterated Local Search (Simulated Annealing) for rapid convergence.
- Search Space: Double Columnar Transposition (14!) x Polybius Square (25!)
- Optimization: Pre-computed index mapping replaces string dictionary lookups.
==============================================================================
"""

import math
import random
import time
import sys
import os
import multiprocessing as mp
import signal

# ============================================================================
# CIPHERTEXT CONFIGURATION
# ============================================================================
# Standard 395-digit D'Agapeyeff Cipher. 
# Theory dictates truncating the trailing '000' yields exactly 196 pairs (14x14 grid)
RAW_CIPHER = (
    "756282859162916481649174858464747482848381638181747482626475838284917574658375"
    "757593636565816381758575756462829285746382757483816581848564856485856382726283"
    "628181728164637582816483638285816363630474819191846385846564856562946262859185"
    "917491727564657571658362647481828462826491819365626484849183857491816572748383"
    "858283646272626562837592726382827272838285847582818372846282837581647574858162"
    "92000"
)

GRID_SIZE = 14
ROWS =['6', '7', '8', '9', '0']
COLS =['1', '2', '3', '4', '5']
ALPHABET = "ABCDEFGHIKLMNOPQRSTUVWXYZ" # 25 letters (I/J combined)

# ============================================================================
# QUADGRAM & FITNESS SCORING
# ============================================================================
def load_quadgrams(filename="english_quadgrams.txt"):
    """Load quadgrams from file, or use a robust built-in fallback."""
    quadgrams = {}
    if os.path.exists(filename):
        try:
            total = 0
            with open(filename, 'r') as f:
                for line in f:
                    q, c = line.strip().split()
                    c = int(c)
                    quadgrams[q] = c
                    total += c
            for q in quadgrams:
                quadgrams[q] = math.log10(quadgrams[q] / total)
            floor = math.log10(0.01 / total)
            print(f"[+] Loaded {len(quadgrams)} quadgrams from {filename}")
            return quadgrams, floor
        except Exception as e:
            print(f"[-] Error loading {filename}: {e}. Using fallback.")
    
    # --- Robust Fallback for standalone execution ---
    common =[
        ('TION', -2.1), ('NTHE', -2.2), ('THER', -2.3), ('THAT', -2.3),
        ('OFTH', -2.4), ('FTHE', -2.4), ('THES', -2.5), ('WITH', -2.5),
        ('INTH', -2.5), ('ATIO', -2.6), ('OTHE', -2.6), ('TTHE', -2.7),
        ('DTHE', -2.7), ('INGT', -2.8), ('ETHE', -2.8), ('SAND', -2.8),
        ('STHE', -2.8), ('HERE', -2.9), ('THEC', -2.9), ('MENT', -2.9),
        ('THIS', -2.9), ('OULD', -3.0), ('IGHT', -3.0), ('HAVE', -3.0),
        ('WHIC', -3.1), ('FROM', -3.1), ('CODE', -3.1), ('CIPH', -3.2)
    ]
    for q, score in common:
        quadgrams[q] = score
        
    print("[!] Using built-in approximation quadgrams.")
    return quadgrams, -5.0

def fast_score(text, quadgrams, floor):
    """Highly optimized text scoring function."""
    score = 0.0
    # Quadgram scoring
    for i in range(len(text) - 3):
        score += quadgrams.get(text[i:i+4], floor)
    
    # English word heuristics bonuses
    if "THE" in text: score += 1.5
    if "AND" in text: score += 1.0
    if "THAT" in text: score += 2.0
    if "WITH" in text: score += 2.0
    if "CIPHER" in text: score += 5.0
    if "CODE" in text: score += 5.0
    
    # Penalize extreme letter repeats
    max_run = 1
    current_run = 1
    for i in range(1, len(text)):
        if text[i] == text[i-1]:
            current_run += 1
            max_run = max(max_run, current_run)
        else:
            current_run = 1
    if max_run > 4:
        score -= (max_run * 2.0)
        
    return score

# ============================================================================
# FAST INDEX-BASED TRANSPOSITION & POLYBIUS
# ============================================================================
def prepare_cipher(raw_text):
    """Converts the raw cipher into integer indices (0-24) for O(1) lookups."""
    # Truncate the '000' padding at the end to fit the perfect 196 pair grid
    clean = raw_text.replace(" ", "")
    if clean.endswith("000"):
        clean = clean[:-3]
        
    pairs = [clean[i:i+2] for i in range(0, len(clean), 2)]
    
    indices =[]
    for p in pairs:
        try:
            r = ROWS.index(p[0])
            c = COLS.index(p[1])
            indices.append(r * 5 + c)
        except ValueError:
            indices.append(-1) # Handle invalid pairs gracefully
            
    return indices

def double_transpose(indices, row_key, col_key):
    """Fast 1D array double transposition (Row & Column)."""
    out = [0] * (GRID_SIZE * GRID_SIZE)
    idx = 0
    for col in col_key:
        for row in row_key:
            out[idx] = indices[row * GRID_SIZE + col]
            idx += 1
    return out

# ============================================================================
# SIMULATED ANNEALING WORKER
# ============================================================================
def init_worker():
    # Ignore Ctrl+C in child processes so main handles it gracefully
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def annealing_worker(args):
    """Inner loop: Runs Iterated Local Search (ILS)."""
    base_indices, quadgrams, floor, iterations = args
    
    # Random initial state
    curr_row_key = list(range(GRID_SIZE))
    curr_col_key = list(range(GRID_SIZE))
    curr_square = list(ALPHABET)
    
    random.shuffle(curr_row_key)
    random.shuffle(curr_col_key)
    random.shuffle(curr_square)
    
    # Score initial state
    transposed = double_transpose(base_indices, curr_row_key, curr_col_key)
    text = "".join(curr_square[idx] if idx != -1 else '?' for idx in transposed)
    curr_score = fast_score(text, quadgrams, floor)
    
    # Best tracking
    best_row_key = curr_row_key[:]
    best_col_key = curr_col_key[:]
    best_square = curr_square[:]
    best_score = curr_score
    best_text = text
    
    temp = 50.0
    cooling = 0.9995
    
    for i in range(iterations):
        # 1. Mutate State
        new_row_key = curr_row_key[:]
        new_col_key = curr_col_key[:]
        new_square = curr_square[:]
        
        mutation_type = random.random()
        if mutation_type < 0.3:
            # Mutate Column Transposition
            a, b = random.sample(range(GRID_SIZE), 2)
            new_col_key[a], new_col_key[b] = new_col_key[b], new_col_key[a]
        elif mutation_type < 0.6:
            # Mutate Polybius Square
            a, b = random.sample(range(25), 2)
            new_square[a], new_square[b] = new_square[b], new_square[a]
        else:
            # Mutate Row Transposition
            a, b = random.sample(range(GRID_SIZE), 2)
            new_row_key[a], new_row_key[b] = new_row_key[b], new_row_key[a]

        # 2. Evaluate
        transposed = double_transpose(base_indices, new_row_key, new_col_key)
        text = "".join(new_square[idx] if idx != -1 else '?' for idx in transposed)
        score = fast_score(text, quadgrams, floor)
        
        # 3. Metropolis Acceptance Criterion
        delta = score - curr_score
        if delta > 0 or (temp > 0.001 and random.random() < math.exp(delta / temp)):
            curr_row_key = new_row_key
            curr_col_key = new_col_key
            curr_square = new_square
            curr_score = score
            
            if score > best_score:
                best_row_key = curr_row_key[:]
                best_col_key = curr_col_key[:]
                best_square = curr_square[:]
                best_score = score
                best_text = text
                
        # 4. Cooling & ILS Reheating
        temp *= cooling
        if temp < 0.05:
            temp = 20.0 # Reheat to escape local minimum
            
    return best_score, best_row_key, best_col_key, best_square, best_text

# ============================================================================
# MAIN ORCHESTRATOR
# ============================================================================
def display_polybius(square):
    """Format the Polybius square beautifully."""
    print("      1   2   3   4   5")
    print("    +---+---+---+---+---+")
    for r_idx, r_label in enumerate(ROWS):
        row_str = f"  {r_label} |"
        for c_idx in range(5):
            row_str += f" {square[r_idx * 5 + c_idx]} |"
        print(row_str)
        print("    +---+---+---+---+---+")

def crack_cipher():
    print("=" * 75)
    print("  ULTIMATE D'AGAPEYEFF CIPHER SOLVER (PARALLEL MEMETIC ALGORITHM)  ")
    print("=" * 75)
    
    base_indices = prepare_cipher(RAW_CIPHER)
    quadgrams, floor = load_quadgrams()
    
    workers = mp.cpu_count()
    print(f"[+] Launching {workers} parallel workers...")
    print(f"[+] Search space: 14! x 14! x 25! (Double Transposition + Polybius)")
    print("=" * 75)
    
    pool = mp.Pool(workers, initializer=init_worker)
    best_overall_score = -float('inf')
    
    start_time = time.time()
    generation = 1
    
    try:
        while True:
            # Dispatch generation (50,000 inner loops per worker)
            jobs =[(base_indices, quadgrams, floor, 50000) for _ in range(workers)]
            results = pool.map(annealing_worker, jobs)
            
            # Process results
            for score, rk, ck, sq, text in results:
                if score > best_overall_score:
                    best_overall_score = score
                    
                    elapsed = int(time.time() - start_time)
                    
                    # 1. Print to console
                    print(f"\n[*** NEW GLOBAL BEST FOUND ***] - Gen {generation} | Elapsed: {elapsed}s")
                    print(f"Fitness Score: {score:.4f}")
                    print(f"Row Key: {rk}")
                    print(f"Col Key: {ck}")
                    print("\nPolybius Square:")
                    display_polybius(sq)
                    print("\nDecoded Snippet:")
                    print(f"--> {text[:100]}...\n")
                    
                    # 2. Save to file so you don't lose it!
                    with open("daga_best_finds.txt", "a", encoding="utf-8") as f:
                        f.write(f"\n[{time.strftime('%Y-%m-%d %H:%M:%S')}] Gen {generation} | Score: {score:.4f}\n")
                        f.write(f"Row Key: {rk}\nCol Key: {ck}\n")
                        f.write(f"Square: {''.join(sq)}\n")
                        f.write(f"Text: {text}\n")
                        f.write("-" * 50 + "\n")
                    
            generation += 1
            
            # Non-obtrusive alive ping
            if generation % 10 == 0:
                elapsed = int(time.time() - start_time)
                sys.stdout.write(f"\r[~] Running Generation {generation} (Elapsed: {elapsed}s). Best: {best_overall_score:.4f}")
                sys.stdout.flush()

    except KeyboardInterrupt:
        print("\n\n[!] Interrupted by user. Shutting down gracefully...")
        pool.terminate()
        pool.join()
        print("[+] Final Best Score achieved:", best_overall_score)

if __name__ == "__main__":
    # Guard ensures multiprocessing works securely on Windows/MacOS
    crack_cipher()