diff options
author | yvesf <yvesf@d0e8fea9-7529-0410-93fb-d39fd5b9c1dd> | 2009-12-04 23:40:36 +0000 |
---|---|---|
committer | yvesf <yvesf@d0e8fea9-7529-0410-93fb-d39fd5b9c1dd> | 2009-12-04 23:40:36 +0000 |
commit | 400958e540150b65cbf59467ea61aa4e654f4542 (patch) | |
tree | 3cc5ed7ce8d183534dfa58cf39caa4dd420d4d12 /aiml/Kernel.py | |
parent | 6153ba1277632d74105170c182594d3833636fa7 (diff) | |
download | omegle-400958e540150b65cbf59467ea61aa4e654f4542.tar.gz omegle-400958e540150b65cbf59467ea61aa4e654f4542.zip |
aiml bot
git-svn-id: http://xapek.org/svn/common/omegle@1473 d0e8fea9-7529-0410-93fb-d39fd5b9c1dd
Diffstat (limited to 'aiml/Kernel.py')
-rw-r--r-- | aiml/Kernel.py | 1183 |
1 files changed, 1183 insertions, 0 deletions
diff --git a/aiml/Kernel.py b/aiml/Kernel.py new file mode 100644 index 0000000..413f26d --- /dev/null +++ b/aiml/Kernel.py @@ -0,0 +1,1183 @@ +# -*- coding: latin-1 -*- +"""This file contains the public interface to the aiml module.""" +import AimlParser +import DefaultSubs +import Utils +from PatternMgr import PatternMgr +from WordSub import WordSub + +from ConfigParser import ConfigParser +import copy +import glob +import os +import random +import re +import string +import sys +import time +import threading +import xml.sax + + +class Kernel: + # module constants + _globalSessionID = "_global" # key of the global session (duh) + _maxHistorySize = 10 # maximum length of the _inputs and _responses lists + _maxRecursionDepth = 100 # maximum number of recursive <srai>/<sr> tags before the response is aborted. + # special predicate keys + _inputHistory = "_inputHistory" # keys to a queue (list) of recent user input + _outputHistory = "_outputHistory" # keys to a queue (list) of recent responses. + _inputStack = "_inputStack" # Should always be empty in between calls to respond() + + def __init__(self): + self._verboseMode = True + self._version = "PyAIML 0.8.5" + self._brain = PatternMgr() + self._respondLock = threading.RLock() + self._textEncoding = "utf-8" + + # set up the sessions + self._sessions = {} + self._addSession(self._globalSessionID) + + # Set up the bot predicates + self._botPredicates = {} + self.setBotPredicate("name", "Nameless") + + # set up the word substitutors (subbers): + self._subbers = {} + self._subbers['gender'] = WordSub(DefaultSubs.defaultGender) + self._subbers['person'] = WordSub(DefaultSubs.defaultPerson) + self._subbers['person2'] = WordSub(DefaultSubs.defaultPerson2) + self._subbers['normal'] = WordSub(DefaultSubs.defaultNormal) + + # set up the element processors + self._elementProcessors = { + "bot": self._processBot, + "condition": self._processCondition, + "date": self._processDate, + "formal": self._processFormal, + "gender": self._processGender, + "get": self._processGet, + "gossip": self._processGossip, + "id": self._processId, + "input": self._processInput, + "javascript": self._processJavascript, + "learn": self._processLearn, + "li": self._processLi, + "lowercase": self._processLowercase, + "person": self._processPerson, + "person2": self._processPerson2, + "random": self._processRandom, + "text": self._processText, + "sentence": self._processSentence, + "set": self._processSet, + "size": self._processSize, + "sr": self._processSr, + "srai": self._processSrai, + "star": self._processStar, + "system": self._processSystem, + "template": self._processTemplate, + "that": self._processThat, + "thatstar": self._processThatstar, + "think": self._processThink, + "topicstar": self._processTopicstar, + "uppercase": self._processUppercase, + "version": self._processVersion, + } + + def bootstrap(self, brainFile = None, learnFiles = [], commands = []): + """Prepare a Kernel object for use. + + If a brainFile argument is provided, the Kernel attempts to + load the brain at the specified filename. + + If learnFiles is provided, the Kernel attempts to load the + specified AIML files. + + Finally, each of the input strings in the commands list is + passed to respond(). + + """ + start = time.clock() + if brainFile: + self.loadBrain(brainFile) + + # learnFiles might be a string, in which case it should be + # turned into a single-element list. + learns = learnFiles + try: learns = [ learnFiles + "" ] + except: pass + for file in learns: + self.learn(file) + + # ditto for commands + cmds = commands + try: cmds = [ commands + "" ] + except: pass + for cmd in cmds: + print self._respond(cmd, self._globalSessionID) + + if self._verboseMode: + print "Kernel bootstrap completed in %.2f seconds" % (time.clock() - start) + + def verbose(self, isVerbose = True): + """Enable/disable verbose output mode.""" + self._verboseMode = isVerbose + + def version(self): + """Return the Kernel's version string.""" + return self._version + + def numCategories(self): + """Return the number of categories the Kernel has learned.""" + # there's a one-to-one mapping between templates and categories + return self._brain.numTemplates() + + def resetBrain(self): + """Reset the brain to its initial state. + + This is essentially equivilant to: + del(kern) + kern = aiml.Kernel() + + """ + del(self._brain) + self.__init__() + + def loadBrain(self, filename): + """Attempt to load a previously-saved 'brain' from the + specified filename. + + NOTE: the current contents of the 'brain' will be discarded! + + """ + if self._verboseMode: print "Loading brain from %s..." % filename, + start = time.clock() + self._brain.restore(filename) + if self._verboseMode: + end = time.clock() - start + print "done (%d categories in %.2f seconds)" % (self._brain.numTemplates(), end) + + def saveBrain(self, filename): + """Dump the contents of the bot's brain to a file on disk.""" + if self._verboseMode: print "Saving brain to %s..." % filename, + start = time.clock() + self._brain.save(filename) + if self._verboseMode: + print "done (%.2f seconds)" % (time.clock() - start) + + def getPredicate(self, name, sessionID = _globalSessionID): + """Retrieve the current value of the predicate 'name' from the + specified session. + + If name is not a valid predicate in the session, the empty + string is returned. + + """ + try: return self._sessions[sessionID][name] + except KeyError: return "" + + def setPredicate(self, name, value, sessionID = _globalSessionID): + """Set the value of the predicate 'name' in the specified + session. + + If sessionID is not a valid session, it will be created. If + name is not a valid predicate in the session, it will be + created. + + """ + self._addSession(sessionID) # add the session, if it doesn't already exist. + self._sessions[sessionID][name] = value + + def getBotPredicate(self, name): + """Retrieve the value of the specified bot predicate. + + If name is not a valid bot predicate, the empty string is returned. + + """ + try: return self._botPredicates[name] + except KeyError: return "" + + def setBotPredicate(self, name, value): + """Set the value of the specified bot predicate. + + If name is not a valid bot predicate, it will be created. + + """ + self._botPredicates[name] = value + # Clumsy hack: if updating the bot name, we must update the + # name in the brain as well + if name == "name": + self._brain.setBotName(self.getBotPredicate("name")) + + def setTextEncoding(self, encoding): + """Set the text encoding used when loading AIML files (Latin-1, UTF-8, etc.).""" + self._textEncoding = encoding + + def loadSubs(self, filename): + """Load a substitutions file. + + The file must be in the Windows-style INI format (see the + standard ConfigParser module docs for information on this + format). Each section of the file is loaded into its own + substituter. + + """ + inFile = file(filename) + parser = ConfigParser() + parser.readfp(inFile, filename) + inFile.close() + for s in parser.sections(): + # Add a new WordSub instance for this section. If one already + # exists, delete it. + if self._subbers.has_key(s): + del(self._subbers[s]) + self._subbers[s] = WordSub() + # iterate over the key,value pairs and add them to the subber + for k,v in parser.items(s): + self._subbers[s][k] = v + + def _addSession(self, sessionID): + """Create a new session with the specified ID string.""" + if self._sessions.has_key(sessionID): + return + # Create the session. + self._sessions[sessionID] = { + # Initialize the special reserved predicates + self._inputHistory: [], + self._outputHistory: [], + self._inputStack: [] + } + + def _deleteSession(self, sessionID): + """Delete the specified session.""" + if self._sessions.has_key(sessionID): + _sessions.pop(sessionID) + + def getSessionData(self, sessionID = None): + """Return a copy of the session data dictionary for the + specified session. + + If no sessionID is specified, return a dictionary containing + *all* of the individual session dictionaries. + + """ + s = None + if sessionID is not None: + try: s = self._sessions[sessionID] + except KeyError: s = {} + else: + s = self._sessions + return copy.deepcopy(s) + + def learn(self, filename): + """Load and learn the contents of the specified AIML file. + + If filename includes wildcard characters, all matching files + will be loaded and learned. + + """ + for f in glob.glob(filename): + if self._verboseMode: print "Loading %s..." % f, + start = time.clock() + # Load and parse the AIML file. + parser = AimlParser.create_parser() + handler = parser.getContentHandler() + handler.setEncoding(self._textEncoding) + try: parser.parse(f) + except xml.sax.SAXParseException, msg: + err = "\nFATAL PARSE ERROR in file %s:\n%s\n" % (f,msg) + sys.stderr.write(err) + continue + # store the pattern/template pairs in the PatternMgr. + for key,tem in handler.categories.items(): + self._brain.add(key,tem) + # Parsing was successful. + if self._verboseMode: + print "done (%.2f seconds)" % (time.clock() - start) + + def respond(self, input, sessionID = _globalSessionID): + """Return the Kernel's response to the input string.""" + if len(input) == 0: + return "" + + #ensure that input is a unicode string + try: input = input.decode(self._textEncoding, 'replace') + except UnicodeError: pass + except AttributeError: pass + + # prevent other threads from stomping all over us. + self._respondLock.acquire() + + # Add the session, if it doesn't already exist + self._addSession(sessionID) + + # split the input into discrete sentences + sentences = Utils.sentences(input) + finalResponse = "" + for s in sentences: + # Add the input to the history list before fetching the + # response, so that <input/> tags work properly. + inputHistory = self.getPredicate(self._inputHistory, sessionID) + inputHistory.append(s) + while len(inputHistory) > self._maxHistorySize: + inputHistory.pop(0) + self.setPredicate(self._inputHistory, inputHistory, sessionID) + + # Fetch the response + response = self._respond(s, sessionID) + + # add the data from this exchange to the history lists + outputHistory = self.getPredicate(self._outputHistory, sessionID) + outputHistory.append(response) + while len(outputHistory) > self._maxHistorySize: + outputHistory.pop(0) + self.setPredicate(self._outputHistory, outputHistory, sessionID) + + # append this response to the final response. + finalResponse += (response + " ") + finalResponse = finalResponse.strip() + + assert(len(self.getPredicate(self._inputStack, sessionID)) == 0) + + # release the lock and return + self._respondLock.release() + try: return finalResponse.encode(self._textEncoding) + except UnicodeError: return finalResponse + + # This version of _respond() just fetches the response for some input. + # It does not mess with the input and output histories. Recursive calls + # to respond() spawned from tags like <srai> should call this function + # instead of respond(). + def _respond(self, input, sessionID): + """Private version of respond(), does the real work.""" + if len(input) == 0: + return "" + + # guard against infinite recursion + inputStack = self.getPredicate(self._inputStack, sessionID) + if len(inputStack) > self._maxRecursionDepth: + if self._verboseMode: + err = "WARNING: maximum recursion depth exceeded (input='%s')" % input.encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "" + + # push the input onto the input stack + inputStack = self.getPredicate(self._inputStack, sessionID) + inputStack.append(input) + self.setPredicate(self._inputStack, inputStack, sessionID) + + # run the input through the 'normal' subber + subbedInput = self._subbers['normal'].sub(input) + + # fetch the bot's previous response, to pass to the match() + # function as 'that'. + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = outputHistory[-1] + except IndexError: that = "" + subbedThat = self._subbers['normal'].sub(that) + + # fetch the current topic + topic = self.getPredicate("topic", sessionID) + subbedTopic = self._subbers['normal'].sub(topic) + + # Determine the final response. + response = "" + elem = self._brain.match(subbedInput, subbedThat, subbedTopic) + if elem is None: + if self._verboseMode: + err = "WARNING: No match found for input: %s\n" % input.encode(self._textEncoding) + sys.stderr.write(err) + else: + # Process the element into a response string. + response += self._processElement(elem, sessionID).strip() + response += " " + response = response.strip() + + # pop the top entry off the input stack. + inputStack = self.getPredicate(self._inputStack, sessionID) + inputStack.pop() + self.setPredicate(self._inputStack, inputStack, sessionID) + + return response + + def _processElement(self,elem, sessionID): + """Process an AIML element. + + The first item of the elem list is the name of the element's + XML tag. The second item is a dictionary containing any + attributes passed to that tag, and their values. Any further + items in the list are the elements enclosed by the current + element's begin and end tags; they are handled by each + element's handler function. + + """ + try: + handlerFunc = self._elementProcessors[elem[0]] + except: + # Oops -- there's no handler function for this element + # type! + if self._verboseMode: + err = "WARNING: No handler found for <%s> element\n" % elem[0].encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "" + return handlerFunc(elem, sessionID) + + + ###################################################### + ### Individual element-processing functions follow ### + ###################################################### + + # <bot> + def _processBot(self, elem, sessionID): + """Process a <bot> AIML element. + + Required element attributes: + name: The name of the bot predicate to retrieve. + + <bot> elements are used to fetch the value of global, + read-only "bot predicates." These predicates cannot be set + from within AIML; you must use the setBotPredicate() function. + + """ + attrName = elem[1]['name'] + return self.getBotPredicate(attrName) + + # <condition> + def _processCondition(self, elem, sessionID): + """Process a <condition> AIML element. + + Optional element attributes: + name: The name of a predicate to test. + value: The value to test the predicate for. + + <condition> elements come in three flavors. Each has different + attributes, and each handles their contents differently. + + The simplest case is when the <condition> tag has both a 'name' + and a 'value' attribute. In this case, if the predicate + 'name' has the value 'value', then the contents of the element + are processed and returned. + + If the <condition> element has only a 'name' attribute, then + its contents are a series of <li> elements, each of which has + a 'value' attribute. The list is scanned from top to bottom + until a match is found. Optionally, the last <li> element can + have no 'value' attribute, in which case it is processed and + returned if no other match is found. + + If the <condition> element has neither a 'name' nor a 'value' + attribute, then it behaves almost exactly like the previous + case, except that each <li> subelement (except the optional + last entry) must now include both 'name' and 'value' + attributes. + + """ + attr = None + response = "" + attr = elem[1] + + # Case #1: test the value of a specific predicate for a + # specific value. + if attr.has_key('name') and attr.has_key('value'): + val = self.getPredicate(attr['name'], sessionID) + if val == attr['value']: + for e in elem[2:]: + response += self._processElement(e,sessionID) + return response + else: + # Case #2 and #3: Cycle through <li> contents, testing a + # name and value pair for each one. + try: + name = None + if attr.has_key('name'): + name = attr['name'] + # Get the list of <li> elemnents + listitems = [] + for e in elem[2:]: + if e[0] == 'li': + listitems.append(e) + # if listitems is empty, return the empty string + if len(listitems) == 0: + return "" + # iterate through the list looking for a condition that + # matches. + foundMatch = False + for li in listitems: + try: + liAttr = li[1] + # if this is the last list item, it's allowed + # to have no attributes. We just skip it for now. + if len(liAttr.keys()) == 0 and li == listitems[-1]: + continue + # get the name of the predicate to test + liName = name + if liName == None: + liName = liAttr['name'] + # get the value to check against + liValue = liAttr['value'] + # do the test + if self.getPredicate(liName, sessionID) == liValue: + foundMatch = True + response += self._processElement(li,sessionID) + break + except: + # No attributes, no name/value attributes, no + # such predicate/session, or processing error. + if self._verboseMode: print "Something amiss -- skipping listitem", li + raise + if not foundMatch: + # Check the last element of listitems. If it has + # no 'name' or 'value' attribute, process it. + try: + li = listitems[-1] + liAttr = li[1] + if not (liAttr.has_key('name') or liAttr.has_key('value')): + response += self._processElement(li, sessionID) + except: + # listitems was empty, no attributes, missing + # name/value attributes, or processing error. + if self._verboseMode: print "error in default listitem" + raise + except: + # Some other catastrophic cataclysm + if self._verboseMode: print "catastrophic condition failure" + raise + return response + + # <date> + def _processDate(self, elem, sessionID): + """Process a <date> AIML element. + + <date> elements resolve to the current date and time. The + AIML specification doesn't require any particular format for + this information, so I go with whatever's simplest. + + """ + return time.asctime() + + # <formal> + def _processFormal(self, elem, sessionID): + """Process a <formal> AIML element. + + <formal> elements process their contents recursively, and then + capitalize the first letter of each word of the result. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return string.capwords(response) + + # <gender> + def _processGender(self,elem, sessionID): + """Process a <gender> AIML element. + + <gender> elements process their contents, and then swap the + gender of any third-person singular pronouns in the result. + This subsitution is handled by the aiml.WordSub module. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return self._subbers['gender'].sub(response) + + # <get> + def _processGet(self, elem, sessionID): + """Process a <get> AIML element. + + Required element attributes: + name: The name of the predicate whose value should be + retrieved from the specified session and returned. If the + predicate doesn't exist, the empty string is returned. + + <get> elements return the value of a predicate from the + specified session. + + """ + return self.getPredicate(elem[1]['name'], sessionID) + + # <gossip> + def _processGossip(self, elem, sessionID): + """Process a <gossip> AIML element. + + <gossip> elements are used to capture and store user input in + an implementation-defined manner, theoretically allowing the + bot to learn from the people it chats with. I haven't + descided how to define my implementation, so right now + <gossip> behaves identically to <think>. + + """ + return self._processThink(elem, sessionID) + + # <id> + def _processId(self, elem, sessionID): + """ Process an <id> AIML element. + + <id> elements return a unique "user id" for a specific + conversation. In PyAIML, the user id is the name of the + current session. + + """ + return sessionID + + # <input> + def _processInput(self, elem, sessionID): + """Process an <input> AIML element. + + Optional attribute elements: + index: The index of the element from the history list to + return. 1 means the most recent item, 2 means the one + before that, and so on. + + <input> elements return an entry from the input history for + the current session. + + """ + inputHistory = self.getPredicate(self._inputHistory, sessionID) + try: index = int(elem[1]['index']) + except: index = 1 + try: return inputHistory[-index] + except IndexError: + if self._verboseMode: + err = "No such index %d while processing <input> element.\n" % index + sys.stderr.write(err) + return "" + + # <javascript> + def _processJavascript(self, elem, sessionID): + """Process a <javascript> AIML element. + + <javascript> elements process their contents recursively, and + then run the results through a server-side Javascript + interpreter to compute the final response. Implementations + are not required to provide an actual Javascript interpreter, + and right now PyAIML doesn't; <javascript> elements are behave + exactly like <think> elements. + + """ + return self._processThink(elem, sessionID) + + # <learn> + def _processLearn(self, elem, sessionID): + """Process a <learn> AIML element. + + <learn> elements process their contents recursively, and then + treat the result as an AIML file to open and learn. + + """ + filename = "" + for e in elem[2:]: + filename += self._processElement(e, sessionID) + self.learn(filename) + return "" + + # <li> + def _processLi(self,elem, sessionID): + """Process an <li> AIML element. + + Optional attribute elements: + name: the name of a predicate to query. + value: the value to check that predicate for. + + <li> elements process their contents recursively and return + the results. They can only appear inside <condition> and + <random> elements. See _processCondition() and + _processRandom() for details of their usage. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return response + + # <lowercase> + def _processLowercase(self,elem, sessionID): + """Process a <lowercase> AIML element. + + <lowercase> elements process their contents recursively, and + then convert the results to all-lowercase. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return string.lower(response) + + # <person> + def _processPerson(self,elem, sessionID): + """Process a <person> AIML element. + + <person> elements process their contents recursively, and then + convert all pronouns in the results from 1st person to 2nd + person, and vice versa. This subsitution is handled by the + aiml.WordSub module. + + If the <person> tag is used atomically (e.g. <person/>), it is + a shortcut for <person><star/></person>. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + if len(elem[2:]) == 0: # atomic <person/> = <person><star/></person> + response = self._processElement(['star',{}], sessionID) + return self._subbers['person'].sub(response) + + # <person2> + def _processPerson2(self,elem, sessionID): + """Process a <person2> AIML element. + + <person2> elements process their contents recursively, and then + convert all pronouns in the results from 1st person to 3rd + person, and vice versa. This subsitution is handled by the + aiml.WordSub module. + + If the <person2> tag is used atomically (e.g. <person2/>), it is + a shortcut for <person2><star/></person2>. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + if len(elem[2:]) == 0: # atomic <person2/> = <person2><star/></person2> + response = self._processElement(['star',{}], sessionID) + return self._subbers['person2'].sub(response) + + # <random> + def _processRandom(self, elem, sessionID): + """Process a <random> AIML element. + + <random> elements contain zero or more <li> elements. If + none, the empty string is returned. If one or more <li> + elements are present, one of them is selected randomly to be + processed recursively and have its results returned. Only the + chosen <li> element's contents are processed. Any non-<li> contents are + ignored. + + """ + listitems = [] + for e in elem[2:]: + if e[0] == 'li': + listitems.append(e) + if len(listitems) == 0: + return "" + + # select and process a random listitem. + random.shuffle(listitems) + return self._processElement(listitems[0], sessionID) + + # <sentence> + def _processSentence(self,elem, sessionID): + """Process a <sentence> AIML element. + + <sentence> elements process their contents recursively, and + then capitalize the first letter of the results. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + try: + response = response.strip() + words = string.split(response, " ", 1) + words[0] = string.capitalize(words[0]) + response = string.join(words) + return response + except IndexError: # response was empty + return "" + + # <set> + def _processSet(self, elem, sessionID): + """Process a <set> AIML element. + + Required element attributes: + name: The name of the predicate to set. + + <set> elements process their contents recursively, and assign the results to a predicate + (given by their 'name' attribute) in the current session. The contents of the element + are also returned. + + """ + value = "" + for e in elem[2:]: + value += self._processElement(e, sessionID) + self.setPredicate(elem[1]['name'], value, sessionID) + return value + + # <size> + def _processSize(self,elem, sessionID): + """Process a <size> AIML element. + + <size> elements return the number of AIML categories currently + in the bot's brain. + + """ + return str(self.numCategories()) + + # <sr> + def _processSr(self,elem,sessionID): + """Process an <sr> AIML element. + + <sr> elements are shortcuts for <srai><star/></srai>. + + """ + star = self._processElement(['star',{}], sessionID) + response = self._respond(star, sessionID) + return response + + # <srai> + def _processSrai(self,elem, sessionID): + """Process a <srai> AIML element. + + <srai> elements recursively process their contents, and then + pass the results right back into the AIML interpreter as a new + piece of input. The results of this new input string are + returned. + + """ + newInput = "" + for e in elem[2:]: + newInput += self._processElement(e, sessionID) + return self._respond(newInput, sessionID) + + # <star> + def _processStar(self, elem, sessionID): + """Process a <star> AIML element. + + Optional attribute elements: + index: Which "*" character in the current pattern should + be matched? + + <star> elements return the text fragment matched by the "*" + character in the current input pattern. For example, if the + input "Hello Tom Smith, how are you?" matched the pattern + "HELLO * HOW ARE YOU", then a <star> element in the template + would evaluate to "Tom Smith". + + """ + try: index = int(elem[1]['index']) + except KeyError: index = 1 + # fetch the user's last input + inputStack = self.getPredicate(self._inputStack, sessionID) + input = self._subbers['normal'].sub(inputStack[-1]) + # fetch the Kernel's last response (for 'that' context) + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = self._subbers['normal'].sub(outputHistory[-1]) + except: that = "" # there might not be any output yet + topic = self.getPredicate("topic", sessionID) + response = self._brain.star("star", input, that, topic, index) + return response + + # <system> + def _processSystem(self,elem, sessionID): + """Process a <system> AIML element. + + <system> elements process their contents recursively, and then + attempt to execute the results as a shell command on the + server. The AIML interpreter blocks until the command is + complete, and then returns the command's output. + + For cross-platform compatibility, any file paths inside + <system> tags should use Unix-style forward slashes ("/") as a + directory separator. + + """ + # build up the command string + command = "" + for e in elem[2:]: + command += self._processElement(e, sessionID) + + # normalize the path to the command. Under Windows, this + # switches forward-slashes to back-slashes; all system + # elements should use unix-style paths for cross-platform + # compatibility. + #executable,args = command.split(" ", 1) + #executable = os.path.normpath(executable) + #command = executable + " " + args + command = os.path.normpath(command) + + # execute the command. + response = "" + try: + out = os.popen(command) + except RuntimeError, msg: + if self._verboseMode: + err = "WARNING: RuntimeError while processing \"system\" element:\n%s\n" % msg.encode(self._textEncoding, 'replace') + sys.stderr.write(err) + return "There was an error while computing my response. Please inform my botmaster." + for line in out: + response += line + "\n" + response = string.join(response.splitlines()).strip() + return response + + # <template> + def _processTemplate(self,elem, sessionID): + """Process a <template> AIML element. + + <template> elements recursively process their contents, and + return the results. <template> is the root node of any AIML + response tree. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return response + + # text + def _processText(self,elem, sessionID): + """Process a raw text element. + + Raw text elements aren't really AIML tags. Text elements cannot contain + other elements; instead, the third item of the 'elem' list is a text + string, which is immediately returned. They have a single attribute, + automatically inserted by the parser, which indicates whether whitespace + in the text should be preserved or not. + + """ + try: elem[2] + "" + except TypeError: raise TypeError, "Text element contents are not text" + + # If the the whitespace behavior for this element is "default", + # we reduce all stretches of >1 whitespace characters to a single + # space. To improve performance, we do this only once for each + # text element encountered, and save the results for the future. + if elem[1]["xml:space"] == "default": + elem[2] = re.sub("\s+", " ", elem[2]) + elem[1]["xml:space"] = "preserve" + return elem[2] + + # <that> + def _processThat(self,elem, sessionID): + """Process a <that> AIML element. + + Optional element attributes: + index: Specifies which element from the output history to + return. 1 is the most recent response, 2 is the next most + recent, and so on. + + <that> elements (when they appear inside <template> elements) + are the output equivilant of <input> elements; they return one + of the Kernel's previous responses. + + """ + outputHistory = self.getPredicate(self._outputHistory, sessionID) + index = 1 + try: + # According to the AIML spec, the optional index attribute + # can either have the form "x" or "x,y". x refers to how + # far back in the output history to go. y refers to which + # sentence of the specified response to return. + index = int(elem[1]['index'].split(',')[0]) + except: + pass + try: return outputHistory[-index] + except IndexError: + if self._verboseMode: + err = "No such index %d while processing <that> element.\n" % index + sys.stderr.write(err) + return "" + + # <thatstar> + def _processThatstar(self, elem, sessionID): + """Process a <thatstar> AIML element. + + Optional element attributes: + index: Specifies which "*" in the <that> pattern to match. + + <thatstar> elements are similar to <star> elements, except + that where <star/> returns the portion of the input string + matched by a "*" character in the pattern, <thatstar/> returns + the portion of the previous input string that was matched by a + "*" in the current category's <that> pattern. + + """ + try: index = int(elem[1]['index']) + except KeyError: index = 1 + # fetch the user's last input + inputStack = self.getPredicate(self._inputStack, sessionID) + input = self._subbers['normal'].sub(inputStack[-1]) + # fetch the Kernel's last response (for 'that' context) + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = self._subbers['normal'].sub(outputHistory[-1]) + except: that = "" # there might not be any output yet + topic = self.getPredicate("topic", sessionID) + response = self._brain.star("thatstar", input, that, topic, index) + return response + + # <think> + def _processThink(self,elem, sessionID): + """Process a <think> AIML element. + + <think> elements process their contents recursively, and then + discard the results and return the empty string. They're + useful for setting predicates and learning AIML files without + generating any output. + + """ + for e in elem[2:]: + self._processElement(e, sessionID) + return "" + + # <topicstar> + def _processTopicstar(self, elem, sessionID): + """Process a <topicstar> AIML element. + + Optional element attributes: + index: Specifies which "*" in the <topic> pattern to match. + + <topicstar> elements are similar to <star> elements, except + that where <star/> returns the portion of the input string + matched by a "*" character in the pattern, <topicstar/> + returns the portion of current topic string that was matched + by a "*" in the current category's <topic> pattern. + + """ + try: index = int(elem[1]['index']) + except KeyError: index = 1 + # fetch the user's last input + inputStack = self.getPredicate(self._inputStack, sessionID) + input = self._subbers['normal'].sub(inputStack[-1]) + # fetch the Kernel's last response (for 'that' context) + outputHistory = self.getPredicate(self._outputHistory, sessionID) + try: that = self._subbers['normal'].sub(outputHistory[-1]) + except: that = "" # there might not be any output yet + topic = self.getPredicate("topic", sessionID) + response = self._brain.star("topicstar", input, that, topic, index) + return response + + # <uppercase> + def _processUppercase(self,elem, sessionID): + """Process an <uppercase> AIML element. + + <uppercase> elements process their contents recursively, and + return the results with all lower-case characters converted to + upper-case. + + """ + response = "" + for e in elem[2:]: + response += self._processElement(e, sessionID) + return string.upper(response) + + # <version> + def _processVersion(self,elem, sessionID): + """Process a <version> AIML element. + + <version> elements return the version number of the AIML + interpreter. + + """ + return self.version() + + +################################################## +### Self-test functions follow ### +################################################## +def _testTag(kern, tag, input, outputList): + """Tests 'tag' by feeding the Kernel 'input'. If the result + matches any of the strings in 'outputList', the test passes. + + """ + global _numTests, _numPassed + _numTests += 1 + print "Testing <" + tag + ">:", + response = kern.respond(input).decode(kern._textEncoding) + if response in outputList: + print "PASSED" + _numPassed += 1 + return True + else: + print "FAILED (response: '%s')" % response.encode(kern._textEncoding, 'replace') + return False + +if __name__ == "__main__": + # Run some self-tests + k = Kernel() + k.bootstrap(learnFiles="self-test.aiml") + + global _numTests, _numPassed + _numTests = 0 + _numPassed = 0 + + _testTag(k, 'bot', 'test bot', ["My name is Nameless"]) + + k.setPredicate('gender', 'male') + _testTag(k, 'condition test #1', 'test condition name value', ['You are handsome']) + k.setPredicate('gender', 'female') + _testTag(k, 'condition test #2', 'test condition name value', ['']) + _testTag(k, 'condition test #3', 'test condition name', ['You are beautiful']) + k.setPredicate('gender', 'robot') + _testTag(k, 'condition test #4', 'test condition name', ['You are genderless']) + _testTag(k, 'condition test #5', 'test condition', ['You are genderless']) + k.setPredicate('gender', 'male') + _testTag(k, 'condition test #6', 'test condition', ['You are handsome']) + + # the date test will occasionally fail if the original and "test" + # times cross a second boundary. There's no good way to avoid + # this problem and still do a meaningful test, so we simply + # provide a friendly message to be printed if the test fails. + date_warning = """ + NOTE: the <date> test will occasionally report failure even if it + succeeds. So long as the response looks like a date/time string, + there's nothing to worry about. + """ + if not _testTag(k, 'date', 'test date', ["The date is %s" % time.asctime()]): + print date_warning + + _testTag(k, 'formal', 'test formal', ["Formal Test Passed"]) + _testTag(k, 'gender', 'test gender', ["He'd told her he heard that her hernia is history"]) + _testTag(k, 'get/set', 'test get and set', ["I like cheese. My favorite food is cheese"]) + _testTag(k, 'gossip', 'test gossip', ["Gossip is not yet implemented"]) + _testTag(k, 'id', 'test id', ["Your id is _global"]) + _testTag(k, 'input', 'test input', ['You just said: test input']) + _testTag(k, 'javascript', 'test javascript', ["Javascript is not yet implemented"]) + _testTag(k, 'lowercase', 'test lowercase', ["The Last Word Should Be lowercase"]) + _testTag(k, 'person', 'test person', ['HE think i knows that my actions threaten him and his.']) + _testTag(k, 'person2', 'test person2', ['YOU think me know that my actions threaten you and yours.']) + _testTag(k, 'person2 (no contents)', 'test person2 I Love Lucy', ['YOU Love Lucy']) + _testTag(k, 'random', 'test random', ["response #1", "response #2", "response #3"]) + _testTag(k, 'random empty', 'test random empty', ["Nothing here!"]) + _testTag(k, 'sentence', "test sentence", ["My first letter should be capitalized."]) + _testTag(k, 'size', "test size", ["I've learned %d categories" % k.numCategories()]) + _testTag(k, 'sr', "test sr test srai", ["srai results: srai test passed"]) + _testTag(k, 'sr nested', "test nested sr test srai", ["srai results: srai test passed"]) + _testTag(k, 'srai', "test srai", ["srai test passed"]) + _testTag(k, 'srai infinite', "test srai infinite", [""]) + _testTag(k, 'star test #1', 'You should test star begin', ['Begin star matched: You should']) + _testTag(k, 'star test #2', 'test star creamy goodness middle', ['Middle star matched: creamy goodness']) + _testTag(k, 'star test #3', 'test star end the credits roll', ['End star matched: the credits roll']) + _testTag(k, 'star test #4', 'test star having multiple stars in a pattern makes me extremely happy', + ['Multiple stars matched: having, stars in a pattern, extremely happy']) + _testTag(k, 'system', "test system", ["The system says hello!"]) + _testTag(k, 'that test #1', "test that", ["I just said: The system says hello!"]) + _testTag(k, 'that test #2', "test that", ["I have already answered this question"]) + _testTag(k, 'thatstar test #1', "test thatstar", ["I say beans"]) + _testTag(k, 'thatstar test #2', "test thatstar", ["I just said \"beans\""]) + _testTag(k, 'thatstar test #3', "test thatstar multiple", ['I say beans and franks for everybody']) + _testTag(k, 'thatstar test #4', "test thatstar multiple", ['Yes, beans and franks for all!']) + _testTag(k, 'think', "test think", [""]) + k.setPredicate("topic", "fruit") + _testTag(k, 'topic', "test topic", ["We were discussing apples and oranges"]) + k.setPredicate("topic", "Soylent Green") + _testTag(k, 'topicstar test #1', 'test topicstar', ["Solyent Green is made of people!"]) + k.setPredicate("topic", "Soylent Ham and Cheese") + _testTag(k, 'topicstar test #2', 'test topicstar multiple', ["Both Soylents Ham and Cheese are made of people!"]) + _testTag(k, 'unicode support', u"ΤΗΙΟΊΓ", [u"Hey, you speak Chinese! ΤΗΙΟΊΓ"]) + _testTag(k, 'uppercase', 'test uppercase', ["The Last Word Should Be UPPERCASE"]) + _testTag(k, 'version', 'test version', ["PyAIML is version %s" % k.version()]) + _testTag(k, 'whitespace preservation', 'test whitespace', ["Extra Spaces\n Rule! (but not in here!) But Here They Do!"]) + + # Report test results + print "--------------------" + if _numTests == _numPassed: + print "%d of %d tests passed!" % (_numPassed, _numTests) + else: + print "%d of %d tests passed (see above for detailed errors)" % (_numPassed, _numTests) + + # Run an interactive interpreter + #print "\nEntering interactive mode (ctrl-c to exit)" + #while True: print k.respond(raw_input("> ")) |