An Interactive Sudoku Solver in Python – Part 1: The Single Cell

I love playing Sudoku. It’s just therapeutic to me, but I do get stuck on those harder level Sudoku puzzles that require a trial-and-error approach. I like to mark the possibilities in the corners of a cell, and I run out of space rather quickly on those harder level puzzles.

Anyway, solving the harder level puzzles is not the point of this exercise in writing a Sudoku solver. I just thought it would be fun to write a solver, and more importantly, I wanted to learn more about using Tkinter.

To begin with, here is the Github repository so you can refer to the code:

https://github.com/tanyanghan/sudoku

When you look at a Sudoku grid, the single cell is the most fundamental element. It can hold a number between 1 to 9. I start with defining a class that represents the single cell. In the first commit of sudoku_simply.py, you will see a class definition for the Sudoku_Cell:

# This class defines a single cell on a Sudoku grid
class Sudoku_Cell:
    def __init__(self, value=0):
        if not value:
            self.possible_values = [1,2,3,4,5,6,7,8,9]
            self.cells_need_updating = False
        else:
            self.possible_values = [value]
            self.cells_need_updating = True

The class has two data members. The data member possible_values is the list of numbers that the individual cell could contain, and if only one value is left in the list, that value is the final value of the cell.

The other data member cells_need_updating is a Boolean used to indicate if other cells in the row, or column, or 3×3 grid area, needs to be updated when this cell has arrived at its final value. This is important for how the solver works.

The components: the Single Cell, the Row, the Column and the 3×3 grid area.

Whenever a cell has arrived at its final value, the other cells that are on the same row, or column, or 3×3 grid area, would need to remove this cell’s final value from their list of possible_values. This is achieved by calling the class method remove_possible_value(). The code below is from the initial commit of sudoku_simply.py:

    def remove_possible_value(self,value):
        if len(self.possible_values) == 1:
            # we have already arrived at an answer previously, do nothing
            return

        try:
            self.possible_values.remove(value)
        except ValueError:
            # value not found, continue
            pass
        else:
            # value removed, check if we have arrived at an answer
            if len(self.possible_values) == 1:
                # cell value has been determined, flag that we need to update
                #other cells
                self.cells_need_updating = True

In this method, we do nothing and return if the cell has already arrived at its final value. Otherwise, we try to remove the value that is passed into the method from the list of possible_values. If the value was not in the list of possible_values, we quietly exit the function. If we do succeed in removing the value from the list, we check if we have arrived at the final value, and if so, we set cells_need_updating to True.

The function other_cells_need_updating() is to check if cells_need_updating Boolean is set, and the function get_value() is to read the final value, or return zero if the cell has not arrived at its final value.

With the Sudoku_Cell class more or less defined, we’ll next look at the Sudoku grid as a whole in the next post.

Using logging.config.dictConfig with QueueHandler and Timing Measurements Posting to Slack

Following on from the previous post, I have made a quick timing measurement posting 4 messages to Slack from three locations:

Location Time Taken
(seconds)
San Francisco, California 2.70512
Kuala Lumpur, Malaysia 5.56670
Suzhou, China 12.67123

As you can see, if you’re not using the asynchronous QueueListener then your program could be stuck for over 12 seconds waiting on the communication with Slack.

Here’s an example JSON dictionary config file to use QueueHandler in logging:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "simple": {
            "format": "%(asctime)s - %(levelname)s - %(message)s"
        }
    },

    "handlers": {
        "consoleHandler": {
            "class": "logging.StreamHandler",
            "level": "DEBUG",
            "formatter": "simple",
            "stream": "ext://sys.stdout"
        },        
        "slack-queue": {
            "class":"SlackQueue.sqHandler",
            "level":"DEBUG",
            "formatter": "simple",
            "queue": "ext://Global.slack_queue"
        }
    },

    "root": {
        "level": "DEBUG",
        "handlers": ["consoleHandler", "slack-queue"]
    }
}

Of note here is the ext:// tag that allows you to pass in references to an external object into the logging dictConfig. I have put the instantiation of the queue object in a file named Global.py.

import Queue as queue

#Instantiate a queue for the handler/listener
slack_queue = queue.Queue(-1)

This queue object is required by both the QueueHandler which is instantiated by dictConfig, and also the QueueListener which I have instantiated in my main program, test_program_timing.py.

import logging
import logging.config
import json
import atexit
import SlackQueue
import Global
import time

time_program_start = time.time()
time_program_end = 0.0

def print_timing():
    time_slack_queue_finish = time.time()
    print ("Time taken by main program: %.5f"
                %(time_program_end - time_program_start))
    print ("Extra time taken by sending Slack messages: %.5f"
                %(time_slack_queue_finish - time_program_end))

#Load the logging config json file.
with open('Logger.json','r') as f:
    logging.config.dictConfig(json.load(f))

#Instantiate the queue listener with the same queue above, and also set
#the settings for communicating with Slack.
#You will need to get your own Incoming Webhook URL from Slack's Custom
#Integration setup.
SlackListener = SlackQueue.sqListener(
        Global.slack_queue,
        logging_url="https://hooks.slack.com/services/<your_webhook_url>",
        channel="#your_channel",
        username="Penguin",
        icon_emoji = ":penguin:"
    )

#Start the listener
SlackListener.start()

#Print out the timing measurement right at the end.
atexit.register(print_timing)

#Register the listener stop method to run upon exit in order to allow the 
#queue to be emptied before exiting.
#If you miss this step, the queue might not be emptied before the program
#exits and you won't receive all the messages on Slack.
atexit.register(SlackListener.stop)

#Begin logging
logging.info("Test info message")

#You can temporarily change the Slack settings via the "extra" parameter.
#The settings in extra will only apply to this one log message.
logging.debug("Another test message",extra= {
                                                "channel":"@someone",
                                                "icon_emoji":":coffee:",
                                                "username":"Latte"
                                            })

#You can put the <!channel> tag in the message to send an announcement.
msg="Some sort of warning!"
logging.warning("<!channel>: %s"%msg)

logging.error("Gasp!")

time_program_end = time.time()

I registered the function print_timing( ) with atexit.register( ) before registering the SlackListener.stop function so that the timing print out will happen right at the end after the queue has been cleared. This is because atexit uses a “last in, first out” ordering when it comes to executing registered functions.

You can find all the code on GitHub

Leave a comment below if you have any comments or feedback on how to improve the implementation.

Asynchronous Posting to Slack from Python Logging QueueHandler

We had a problem at work where the regular blocking call to request post was getting in the way at physical locations that didn’t have ideal latency times connecting to the Slack server, like from within China. What takes a second in sunny California was suddenly 8 to 12 seconds from behind the Great Firewall.

The solution is to integrate Slack into Python logging and use the logging QueueHandler to make it asynchronous.

Here’s my implementation of an extension to the QueueHandler and QueueListener classes:

from logging import Handler
import requests
import json
#QueueHandler and QueueListener is part of standard Python logging module
#from Python 3.2 onwards; but if you're using Python 2.x, you can just 
#copy the QueueHandler and QueueListener code and save it into your own 
#queueHandler.py.
try:
    #check if they're part of standard Python logging module
    from logging.handlers import QueueHandler
    from logging.handlers import QueueListener
except ImportError:
    #if not, use your own copy
    from queueHandler import QueueHandler
    from queueHandler import QueueListener


class sqHandler(QueueHandler):
    def __init__(self, queue=None):
        QueueHandler.__init__(self,queue)

    def prepare(self, record):
        """
        Override the method to allow the formatter to work.
        """
        record.msg = self.format(record)
        record.args = None
        record.exc_info = None
        return record

class sqListener(QueueListener,Handler):
    def __init__(self, queue=None, logging_url="", channel="", username="",
                 icon_emoji = ""):
        QueueListener.__init__(self,queue)
        Handler.__init__(self)
        """
        logging_url, channel, username, icon_emoji can all be overridden
        by the extra dictionary parameter of a logging record
        For example: 
            logging.info('Test messate',extra={'channel':'@someone',
                                               'username':'testbot',
                                               'icon_emoji':':penguin:'})
        """
        self.logging_url = logging_url
        self.payload = {
            "channel": channel,
            "username": username,
            "icon_emoji": icon_emoji
            }

    def handle(self, record):
        """
        Override the QueueListener.handle method with the Handler.handle 
        method
        """
        Handler.handle(self, record)

    def emit(self, record):
        #make a copy of the default settings
        new_logging_url = self.logging_url
        new_payload = self.payload.copy()

        #override default settings if necessary
        if hasattr(record,'logging_url'):
            new_logging_url = record.logging_url

        for key in self.payload.keys():
            if hasattr(record,key):
                new_payload[key] = record.__dict__[key]

        #format the message and add to payload
        msg = self.format(record)
        new_payload["text"] = '%s' % msg

        #post the request
        requests.post(new_logging_url, data=json.dumps(new_payload))

And here’s a usage example:

import logging
import Queue as queue
import atexit
import sys
import SlackQueue

    
#Set up the root logger
logging.basicConfig(stream=sys.stdout,
                    level=logging.DEBUG, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
root = logging.getLogger()

#Instantiate a queue for the handler/listener
slack_queue = queue.Queue(-1)

#Instantiate a queue handler using the queue above
SlackHandler = SlackQueue.sqHandler(slack_queue)
SlackHandler.setLevel("DEBUG")

#Set a formatter
simple = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
SlackHandler.setFormatter(simple)

#Add the queue handler to the root logger
root.addHandler(SlackHandler)

#Instantiate the queue listener with the same queue above, and also set
#the settings for communicating with Slack.
#You will need to get your own Incoming Webhook URL from Slack's Custom
#Integration setup.
SlackListener = SlackQueue.sqListener(
        slack_queue,
        logging_url="https://hooks.slack.com/services/<your_webhook_url>",
        channel="#your_channel",
        username="Penguin",
        icon_emoji = ":penguin:"
    )

#Start the listener
SlackListener.start()

#Register the listener stop method to run upon exit in order to allow the 
#queue to be emptied before exiting.
#If you miss this step, the queue might not be emptied before the program
#exits and you won't receive all the messages on Slack.
atexit.register(SlackListener.stop)


#Begin logging
logging.info("Test info message")

#You can temporarily change the Slack settings via the "extra" parameter.
#The settings in extra will only apply to this one log message.
logging.debug("Another test message",extra= {
                                                "channel":"@someone",
                                                "icon_emoji":":coffee:",
                                                "username":"Latte"
                                            })

#You can put the <!channel> tag in the message to send an announcement.
msg="Some sort of warning!"
logging.warning("<!channel>: %s"%msg)

logging.error("Gasp!")

You can use the extra parameter in logging to alter the Slack settings for a particular message. It can be useful if you decide that a particular logging message should ping a certain person directly, or post to a different channel from the default channel.

You can find all the code on GitHub.

If you’re using Python 2.x and need the source of queueHandler.py, check out Vinay Sajip’s blog post, specifically his whole script example.

Leave a comment below if you have any comments or feedback on how to improve the implementation.

References:
Logging for Slackers – Alan Gardner
Improved QueueHandler, QueueListener: dealing with handlers that block – Vinay Sajip