数独求解工具

对象有一本做数独的书,里面满满的都是题目,然而,看到这前三关直接不淡定了,这尼玛是人能做出来的。

试了半天无过之后,果断还是决定上代码了。

就这么个破玩意儿,愣是跑了十几分钟才出来。

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import threading
import queue
import time
import json
import copy
import concurrent.futures
import os
import multiprocessing

# --- 数独核心逻辑 ---
def is_valid(board, row, col, num):
    for i in range(9):
        if board[row][i] == num or board[i][col] == num:
            return False
    start_row, start_col = 3 * (row // 3), 3 * (col // 3)
    for i in range(3):
        for j in range(3):
            if board[start_row + i][start_col + j] == num:
                return False
    return True

def solve_sudoku_all_with_progress(board, branch_id, progress_queue):
    solutions = []
    visited = [0]
    def backtrack(bd, pos=0):
        if pos == 81:
            solutions.append(copy.deepcopy(bd))
            return
        row, col = divmod(pos, 9)
        if bd[row][col] != 0:
            backtrack(bd, pos + 1)
        else:
            for num in range(1, 10):
                if is_valid(bd, row, col, num):
                    bd[row][col] = num
                    visited[0] += 1
                    if visited[0] % 1000 == 0:
                        progress_queue.put((branch_id, visited[0]))
                    backtrack(bd, pos + 1)
                    bd[row][col] = 0
    backtrack(board)
    # 结束时推送最终节点数
    progress_queue.put((branch_id, visited[0]))
    return solutions, visited[0]

def worker(new_board, branch_id, progress_queue):
    return solve_sudoku_all_with_progress(new_board, branch_id, progress_queue)

class SudokuGUI:
    def __init__(self, root):
        self.root = root
        self.root.title('多进程数独求解器')
        self.root.geometry('520x650')
        self.root.resizable(False, False)
        self.entries = [[None for _ in range(9)] for _ in range(9)]
        self._build_ui()
        self.progress_queue = queue.Queue()
        self._poll_progress()

    def _build_ui(self):
        self.root.configure(bg='#f7f7fa')
        style = ttk.Style()
        style.theme_use('clam')
        style.configure('TButton', font=('Segoe UI', 12), padding=6)
        style.configure('TLabel', font=('Segoe UI', 11), background='#f7f7fa')
        style.configure('TProgressbar', thickness=18, troughcolor='#e0e0e0', background='#4caf50')
        title = ttk.Label(self.root, text='多进程数独求解器', font=('Segoe UI', 22, 'bold'))
        title.pack(pady=(18, 8))
        frame = tk.Frame(self.root, bg='#f7f7fa')
        frame.pack(pady=8)
        for i in range(9):
            for j in range(9):
                e = tk.Entry(frame, width=2, font=('Consolas', 22), justify='center', bd=2, relief='ridge', bg='#fff', fg='#222')
                e.grid(row=i, column=j, padx=(2 if j % 3 == 0 else 0, 2), pady=(2 if i % 3 == 0 else 0, 2))
                self.entries[i][j] = e
        btn_frame = tk.Frame(self.root, bg='#f7f7fa')
        btn_frame.pack(pady=10)
        ttk.Button(btn_frame, text='求解所有解', command=self.solve).grid(row=0, column=0, padx=8)
        ttk.Button(btn_frame, text='清空', command=self.clear).grid(row=0, column=1, padx=8)
        ttk.Button(btn_frame, text='保存', command=self.save).grid(row=0, column=2, padx=8)
        ttk.Button(btn_frame, text='加载', command=self.load).grid(row=0, column=3, padx=8)
        self.progress_var = tk.DoubleVar()
        self.progress_bar = ttk.Progressbar(self.root, variable=self.progress_var, maximum=1, length=400)
        self.progress_bar.pack(pady=(10, 2))
        self.progress_label = ttk.Label(self.root, text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.progress_label.pack()
        self.status_label = ttk.Label(self.root, text='', font=('Segoe UI', 10))
        self.status_label.pack(pady=(8, 0))
        self.thread_label = ttk.Label(self.root, text='线程状态: 空闲', font=('Segoe UI', 10))
        self.thread_label.pack(pady=(2, 0))

    def get_board(self):
        board = []
        for i in range(9):
            row = []
            for j in range(9):
                val = self.entries[i][j].get()
                if val == '':
                    row.append(0)
                else:
                    try:
                        num = int(val)
                        if 1 <= num <= 9:
                            row.append(num)
                        else:
                            raise ValueError
                    except ValueError:
                        messagebox.showerror('输入错误', f'第{i+1}行第{j+1}列输入无效')
                        return None
            board.append(row)
        return board

    def clear(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].delete(0, tk.END)
        self.progress_var.set(0)
        self.progress_label.config(text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.status_label.config(text='')
        self.thread_label.config(text='线程状态: 空闲')

    def save(self):
        data = [[self.entries[i][j].get() for j in range(9)] for i in range(9)]
        file_path = filedialog.asksaveasfilename(defaultextension='.json', filetypes=[('JSON文件', '*.json')])
        if file_path:
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False)
            self.status_label.config(text='已保存')

    def load(self):
        file_path = filedialog.askopenfilename(defaultextension='.json', filetypes=[('JSON文件', '*.json')])
        if file_path:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            for i in range(9):
                for j in range(9):
                    self.entries[i][j].delete(0, tk.END)
                    if data[i][j]:
                        self.entries[i][j].insert(0, data[i][j])
            self.status_label.config(text='已加载')

    def solve(self):
        board = self.get_board()
        if board is None:
            return
        self.progress_var.set(0)
        self.progress_label.config(text='进度: 0/0  节点: 0  速度: -- 节点/秒')
        self.status_label.config(text='正在求解...')
        self.thread_label.config(text='线程状态: 启动中...')
        self.disable_inputs()
        self._parallel_solve(board)

    def disable_inputs(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].config(state='disabled')

    def enable_inputs(self):
        for i in range(9):
            for j in range(9):
                self.entries[i][j].config(state='normal')

    def _parallel_solve(self, board):
        # 找到第一个空格
        first_empty = None
        for i in range(9):
            for j in range(9):
                if board[i][j] == 0:
                    first_empty = (i, j)
                    break
            if first_empty:
                break
        if not first_empty:
            self.root.after(0, lambda: self.thread_label.config(text='线程状态: 空闲'))
            self.on_solve_done([copy.deepcopy(board)])
            return
        i, j = first_empty
        candidates = [num for num in range(1, 10) if is_valid(board, i, j, num)]
        total_tasks = len(candidates)
        finished = [0]
        all_solutions = []
        all_visited = [0]
        branch_nodes = {idx: 0 for idx in range(total_tasks)}
        start_time = time.time()
        self.root.after(0, lambda: self.thread_label.config(text='线程状态: 运行中 (多进程)'))
        # 用multiprocessing.Manager().Queue()跨进程通信
        from multiprocessing import Manager
        mp_manager = Manager()
        mp_queue = mp_manager.Queue()
        def manager():
            try:
                with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
                    futures = []
                    for idx, num in enumerate(candidates):
                        new_board = copy.deepcopy(board)
                        new_board[i][j] = num
                        futures.append(executor.submit(worker, new_board, idx, mp_queue))
                    # 进度监听线程
                    def progress_listener():
                        while finished[0] < total_tasks:
                            try:
                                branch_id, nodes = mp_queue.get(timeout=0.1)
                                branch_nodes[branch_id] = nodes
                                all_visited[0] = sum(branch_nodes.values())
                                elapsed = time.time() - start_time
                                speed = all_visited[0] / elapsed if elapsed > 0 else 0
                                self.progress_queue.put((finished[0], total_tasks, speed, all_visited[0]))
                            except Exception:
                                pass
                    listener_thread = threading.Thread(target=progress_listener, daemon=True)
                    listener_thread.start()
                    for idx, fut in enumerate(futures):
                        result, visited = fut.result()
                        all_solutions.extend(result)
                        finished[0] += 1
                        branch_nodes[idx] = visited
                        all_visited[0] = sum(branch_nodes.values())
                        elapsed = time.time() - start_time
                        speed = all_visited[0] / elapsed if elapsed > 0 else 0
                        self.progress_queue.put((finished[0], total_tasks, speed, all_visited[0]))
                    listener_thread.join(timeout=0.1)
                self.root.after(0, lambda: self.thread_label.config(text='线程状态: 已完成 (多进程)'))
                self.root.after(0, lambda: self.on_solve_done(all_solutions))
            except Exception as e:
                import traceback
                traceback.print_exc()
                self.root.after(0, lambda e=e: messagebox.showerror('主进程异常', str(e)))
        threading.Thread(target=manager, daemon=True).start()

    def _poll_progress(self):
        try:
            while True:
                v, t, s, nodes = self.progress_queue.get_nowait()
                self._update_progress(v, t, s, nodes)
        except queue.Empty:
            pass
        self.root.after(50, self._poll_progress)

    def _update_progress(self, finished, total, speed, nodes):
        progress = finished / total if total > 0 else 0
        speed_str = f"{speed:.1f}" if speed > 0 else "--"
        self.progress_var.set(progress)
        self.progress_label.config(text=f'进度: {finished}/{total}  节点: {nodes}  速度: {speed_str} 节点/秒')

    def on_solve_done(self, solutions):
        self.enable_inputs()
        if not solutions:
            self.status_label.config(text='无解')
            self.thread_label.config(text='线程状态: 空闲')
            messagebox.showinfo('结果', '无解')
        else:
            self.status_label.config(text=f'共找到 {len(solutions)} 个解')
            self.thread_label.config(text='线程状态: 空闲')
            self.show_solution_window(solutions)

    def show_solution_window(self, solutions):
        win = tk.Toplevel(self.root)
        win.title(f'共找到{len(solutions)}个解')
        win.geometry('420x420')
        idx = tk.IntVar(value=0)
        original = [[self.entries[i][j].get() != '' for j in range(9)] for i in range(9)]
        sol_entries = [[tk.Entry(win, width=2, font=('Consolas', 18), justify='center', state='readonly') for _ in range(9)] for _ in range(9)]
        for i in range(9):
            for j in range(9):
                sol_entries[i][j].grid(row=i, column=j, padx=(2 if j % 3 == 0 else 0, 2), pady=(2 if i % 3 == 0 else 0, 2))
        def show(idx_val):
            for i in range(9):
                for j in range(9):
                    sol_entries[i][j].config(state='normal')
                    sol_entries[i][j].delete(0, tk.END)
                    sol_entries[i][j].insert(0, str(solutions[idx_val][i][j]))
                    if original[i][j]:
                        sol_entries[i][j].config(fg='#e53935')
                    else:
                        sol_entries[i][j].config(fg='#388e3c')
                    sol_entries[i][j].config(state='readonly')
        btn_prev = ttk.Button(win, text='上一解', command=lambda: (idx.set((idx.get()-1)%len(solutions)), show(idx.get())))
        btn_next = ttk.Button(win, text='下一解', command=lambda: (idx.set((idx.get()+1)%len(solutions)), show(idx.get())))
        btn_prev.grid(row=9, column=2, columnspan=2, pady=8)
        btn_next.grid(row=9, column=5, columnspan=2, pady=8)
        if len(solutions) == 1:
            btn_prev.config(state='disabled')
            btn_next.config(state='disabled')
        show(0)

if __name__ == "__main__":
    multiprocessing.freeze_support()  # for Windows compatibility
    root = tk.Tk()
    app = SudokuGUI(root)
    root.mainloop()

 

☆版权☆

* 网站名称:obaby@mars
* 网址:https://lang.ma/
* 个性:https://oba.by/
* 本文标题: 《数独求解工具》
* 本文链接:https://danteng.me/2025/08/21136
* 短链接:https://oba.by/?p=21136
* 转载文章请标明文章来源,原文标题以及原文链接。请遵从 《署名-非商业性使用-相同方式共享 2.5 中国大陆 (CC BY-NC-SA 2.5 CN) 》许可协议。


You may also like

5 comments

  1.  Level 1
    Microsoft Edge 138 Microsoft Edge 138 Windows 11 Windows 11 cn中国–上海–上海 电信

    我没玩过这个,好像从小就不会玩。
    玩法都不懂的,去编程写个游戏就更不可能了哈哈

    1. 公主 Queen 
      Google Chrome 134 Google Chrome 134 Mac OS X 10.15 Mac OS X 10.15 cn中国–山东–青岛 移动

      主要是缺的数太多了,实在是算不出来,😂

    1. 公主 Queen 
      Google Chrome 134 Google Chrome 134 Mac OS X 10.15 Mac OS X 10.15 cn中国–山东–青岛 移动

      偶尔,那本书放哪里好多年了,都没做。
      我看了下,根本做不了一点。

  2.  Level 6
    Google Chrome 127 Google Chrome 127 GNU/Linux GNU/Linux cn中国 中国联通

    刚开始的时候,数独和扫雷高级版我分辨不出来,我总觉得原理是一样的,都挺锻炼智力的。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注