fight-fixes
bobloy 7 years ago
parent 3bf5f24a01
commit de2b9c352b

@ -7,8 +7,8 @@ from discord.ext import commands
from redbot.core import Config
from redbot.core.bot import Red
from .chatterbot import ChatBot
from .chatterbot.trainers import ListTrainer
from .source import ChatBot
from .source.trainers import ListTrainer
from datetime import datetime,timedelta
@ -31,6 +31,8 @@ class Chatter:
self.config.register_global(**default_global)
self.config.register_guild(**default_guild)
self.loop = asyncio.get_event_loop()
async def _get_conversation(self, ctx, in_channel: discord.TextChannel):
"""
@ -59,12 +61,13 @@ class Chatter:
return out
async def _train(self, data):
def _train(self, data):
try:
self.chatbot.train(data)
except:
return False
return True
@commands.group()
async def chatter(self, ctx: commands.Context):
"""
@ -90,12 +93,14 @@ class Chatter:
conversation = await self._get_conversation(ctx, channel)
await ctx.send("Gather successful! Training begins now")
if not conversation:
await ctx.send("Failed to gather training data")
return
if await self._train(conversation):
await ctx.send("Gather successful! Training begins now\n(**This will take a long time, be patient**)")
future = await self.loop.run_in_executor(None, self._train, conversation)
if future:
await ctx.send("Training successful!")
else:
await ctx.send("Error occurred :(")

@ -0,0 +1,13 @@
"""
ChatterBot is a machine learning, conversational dialog engine.
"""
from .chatterbot import ChatBot
__version__ = '0.8.4'
__author__ = 'Gunther Cox'
__email__ = 'gunthercx@gmail.com'
__url__ = 'https://github.com/gunthercox/ChatterBot'
__all__ = (
'ChatBot',
)

@ -0,0 +1,23 @@
import sys
if __name__ == '__main__':
import importlib
if '--version' in sys.argv:
chatterbot = importlib.import_module('chatterbot')
print(chatterbot.__version__)
if 'list_nltk_data' in sys.argv:
import os
import nltk.data
data_directories = []
# Find each data directory in the NLTK path that has content
for path in nltk.data.path:
if os.path.exists(path):
if os.listdir(path):
data_directories.append(path)
print(os.linesep.join(data_directories))

@ -0,0 +1,47 @@
import logging
class Adapter(object):
"""
A superclass for all adapter classes.
:param logger: A python logger.
"""
def __init__(self, **kwargs):
self.logger = kwargs.get('logger', logging.getLogger(__name__))
self.chatbot = kwargs.get('chatbot')
def set_chatbot(self, chatbot):
"""
Gives the adapter access to an instance of the ChatBot class.
:param chatbot: A chat bot instanse.
:type chatbot: ChatBot
"""
self.chatbot = chatbot
class AdapterMethodNotImplementedError(NotImplementedError):
"""
An exception to be raised when an adapter method has not been implemented.
Typically this indicates that the developer is expected to implement the
method in a subclass.
"""
def __init__(self, message=None):
"""
Set the message for the esception.
"""
if not message:
message = 'This method must be overridden in a subclass method.'
self.message = message
def __str__(self):
return self.message
class InvalidAdapterTypeException(Exception):
"""
An exception to be raised when an adapter
of an unexpected class type is received.
"""
pass

@ -0,0 +1,173 @@
from __future__ import unicode_literals
import logging
from .storage import StorageAdapter
from .input import InputAdapter
from .output import OutputAdapter
from . import utils
class ChatBot(object):
"""
A conversational dialog chat bot.
"""
def __init__(self, name, **kwargs):
from .logic import MultiLogicAdapter
self.name = name
kwargs['name'] = name
kwargs['chatbot'] = self
self.default_session = None
storage_adapter = kwargs.get('storage_adapter', 'chatter.source.storage.SQLStorageAdapter')
logic_adapters = kwargs.get('logic_adapters', [
'chatter.source.logic.BestMatch'
])
input_adapter = kwargs.get('input_adapter', 'chatter.source.input.VariableInputTypeAdapter')
output_adapter = kwargs.get('output_adapter', 'chatter.source.output.OutputAdapter')
# Check that each adapter is a valid subclass of it's respective parent
utils.validate_adapter_class(storage_adapter, StorageAdapter)
utils.validate_adapter_class(input_adapter, InputAdapter)
utils.validate_adapter_class(output_adapter, OutputAdapter)
self.logic = MultiLogicAdapter(**kwargs)
self.storage = utils.initialize_class(storage_adapter, **kwargs)
self.input = utils.initialize_class(input_adapter, **kwargs)
self.output = utils.initialize_class(output_adapter, **kwargs)
filters = kwargs.get('filters', tuple())
self.filters = tuple([utils.import_module(F)() for F in filters])
# Add required system logic adapter
self.logic.system_adapters.append(
utils.initialize_class('chatter.source.logic.NoKnowledgeAdapter', **kwargs)
)
for adapter in logic_adapters:
self.logic.add_adapter(adapter, **kwargs)
# Add the chatbot instance to each adapter to share information such as
# the name, the current conversation, or other adapters
self.logic.set_chatbot(self)
self.input.set_chatbot(self)
self.output.set_chatbot(self)
preprocessors = kwargs.get(
'preprocessors', [
'chatter.source.preprocessors.clean_whitespace'
]
)
self.preprocessors = []
for preprocessor in preprocessors:
self.preprocessors.append(utils.import_module(preprocessor))
# Use specified trainer or fall back to the default
trainer = kwargs.get('trainer', 'chatter.source.trainers.Trainer')
TrainerClass = utils.import_module(trainer)
self.trainer = TrainerClass(self.storage, **kwargs)
self.training_data = kwargs.get('training_data')
self.default_conversation_id = None
self.logger = kwargs.get('logger', logging.getLogger(__name__))
# Allow the bot to save input it receives so that it can learn
self.read_only = kwargs.get('read_only', False)
if kwargs.get('initialize', True):
self.initialize()
def initialize(self):
"""
Do any work that needs to be done before the responses can be returned.
"""
self.logic.initialize()
def get_response(self, input_item, conversation_id=None):
"""
Return the bot's response based on the input.
:param input_item: An input value.
:param conversation_id: The id of a conversation.
:returns: A response to the input.
:rtype: Statement
"""
if not conversation_id:
if not self.default_conversation_id:
self.default_conversation_id = self.storage.create_conversation()
conversation_id = self.default_conversation_id
input_statement = self.input.process_input_statement(input_item)
# Preprocess the input statement
for preprocessor in self.preprocessors:
input_statement = preprocessor(self, input_statement)
statement, response = self.generate_response(input_statement, conversation_id)
# Learn that the user's input was a valid response to the chat bot's previous output
previous_statement = self.storage.get_latest_response(conversation_id)
if not self.read_only:
self.learn_response(statement, previous_statement)
self.storage.add_to_conversation(conversation_id, statement, response)
# Process the response output with the output adapter
return self.output.process_response(response, conversation_id)
def generate_response(self, input_statement, conversation_id):
"""
Return a response based on a given input statement.
"""
self.storage.generate_base_query(self, conversation_id)
# Select a response to the input statement
response = self.logic.process(input_statement)
return input_statement, response
def learn_response(self, statement, previous_statement):
"""
Learn that the statement provided is a valid response.
"""
from .conversation import Response
if previous_statement:
statement.add_response(
Response(previous_statement.text)
)
self.logger.info('Adding "{}" as a response to "{}"'.format(
statement.text,
previous_statement.text
))
# Save the statement after selecting a response
self.storage.update(statement)
def set_trainer(self, training_class, **kwargs):
"""
Set the module used to train the chatbot.
:param training_class: The training class to use for the chat bot.
:type training_class: `Trainer`
:param \**kwargs: Any parameters that should be passed to the training class.
"""
if 'chatbot' not in kwargs:
kwargs['chatbot'] = self
self.trainer = training_class(self.storage, **kwargs)
@property
def train(self):
"""
Proxy method to the chat bot's trainer class.
"""
return self.trainer.train

@ -0,0 +1,331 @@
# -*- coding: utf-8 -*-
import sys
"""
This module contains various text-comparison algorithms
designed to compare one statement to another.
"""
# Use python-Levenshtein if available
try:
from Levenshtein.StringMatcher import StringMatcher as SequenceMatcher
except ImportError:
from difflib import SequenceMatcher
class Comparator:
def __call__(self, statement_a, statement_b):
return self.compare(statement_a, statement_b)
def compare(self, statement_a, statement_b):
return 0
def get_initialization_functions(self):
"""
Return all initialization methods for the comparison algorithm.
Initialization methods must start with 'initialize_' and
take no parameters.
"""
initialization_methods = [
(
method,
getattr(self, method),
) for method in dir(self) if method.startswith('initialize_')
]
return {
key: value for (key, value) in initialization_methods
}
class LevenshteinDistance(Comparator):
"""
Compare two statements based on the Levenshtein distance
of each statement's text.
For example, there is a 65% similarity between the statements
"where is the post office?" and "looking for the post office"
based on the Levenshtein distance algorithm.
"""
def compare(self, statement, other_statement):
"""
Compare the two input statements.
:return: The percent of similarity between the text of the statements.
:rtype: float
"""
PYTHON = sys.version_info[0]
# Return 0 if either statement has a falsy text value
if not statement.text or not other_statement.text:
return 0
# Get the lowercase version of both strings
if PYTHON < 3:
statement_text = unicode(statement.text.lower()) # NOQA
other_statement_text = unicode(other_statement.text.lower()) # NOQA
else:
statement_text = str(statement.text.lower())
other_statement_text = str(other_statement.text.lower())
similarity = SequenceMatcher(
None,
statement_text,
other_statement_text
)
# Calculate a decimal percent of the similarity
percent = round(similarity.ratio(), 2)
return percent
class SynsetDistance(Comparator):
"""
Calculate the similarity of two statements.
This is based on the total maximum synset similarity between each word in each sentence.
This algorithm uses the `wordnet`_ functionality of `NLTK`_ to determine the similarity
of two statements based on the path similarity between each token of each statement.
This is essentially an evaluation of the closeness of synonyms.
"""
def initialize_nltk_wordnet(self):
"""
Download required NLTK corpora if they have not already been downloaded.
"""
from .utils import nltk_download_corpus
nltk_download_corpus('corpora/wordnet')
def initialize_nltk_punkt(self):
"""
Download required NLTK corpora if they have not already been downloaded.
"""
from .utils import nltk_download_corpus
nltk_download_corpus('tokenizers/punkt')
def initialize_nltk_stopwords(self):
"""
Download required NLTK corpora if they have not already been downloaded.
"""
from .utils import nltk_download_corpus
nltk_download_corpus('corpora/stopwords')
def compare(self, statement, other_statement):
"""
Compare the two input statements.
:return: The percent of similarity between the closest synset distance.
:rtype: float
.. _wordnet: http://www.nltk.org/howto/wordnet.html
.. _NLTK: http://www.nltk.org/
"""
from nltk.corpus import wordnet
from nltk import word_tokenize
from . import utils
import itertools
tokens1 = word_tokenize(statement.text.lower())
tokens2 = word_tokenize(other_statement.text.lower())
# Remove all stop words from the list of word tokens
tokens1 = utils.remove_stopwords(tokens1, language='english')
tokens2 = utils.remove_stopwords(tokens2, language='english')
# The maximum possible similarity is an exact match
# Because path_similarity returns a value between 0 and 1,
# max_possible_similarity is the number of words in the longer
# of the two input statements.
max_possible_similarity = max(
len(statement.text.split()),
len(other_statement.text.split())
)
max_similarity = 0.0
# Get the highest matching value for each possible combination of words
for combination in itertools.product(*[tokens1, tokens2]):
synset1 = wordnet.synsets(combination[0])
synset2 = wordnet.synsets(combination[1])
if synset1 and synset2:
# Get the highest similarity for each combination of synsets
for synset in itertools.product(*[synset1, synset2]):
similarity = synset[0].path_similarity(synset[1])
if similarity and (similarity > max_similarity):
max_similarity = similarity
if max_possible_similarity == 0:
return 0
return max_similarity / max_possible_similarity
class SentimentComparison(Comparator):
"""
Calculate the similarity of two statements based on the closeness of
the sentiment value calculated for each statement.
"""
def initialize_nltk_vader_lexicon(self):
"""
Download the NLTK vader lexicon for sentiment analysis
that is required for this algorithm to run.
"""
from .utils import nltk_download_corpus
nltk_download_corpus('sentiment/vader_lexicon')
def compare(self, statement, other_statement):
"""
Return the similarity of two statements based on
their calculated sentiment values.
:return: The percent of similarity between the sentiment value.
:rtype: float
"""
from nltk.sentiment.vader import SentimentIntensityAnalyzer
sentiment_analyzer = SentimentIntensityAnalyzer()
statement_polarity = sentiment_analyzer.polarity_scores(statement.text.lower())
statement2_polarity = sentiment_analyzer.polarity_scores(other_statement.text.lower())
statement_greatest_polarity = 'neu'
statement_greatest_score = -1
for polarity in sorted(statement_polarity):
if statement_polarity[polarity] > statement_greatest_score:
statement_greatest_polarity = polarity
statement_greatest_score = statement_polarity[polarity]
statement2_greatest_polarity = 'neu'
statement2_greatest_score = -1
for polarity in sorted(statement2_polarity):
if statement2_polarity[polarity] > statement2_greatest_score:
statement2_greatest_polarity = polarity
statement2_greatest_score = statement2_polarity[polarity]
# Check if the polarity if of a different type
if statement_greatest_polarity != statement2_greatest_polarity:
return 0
values = [statement_greatest_score, statement2_greatest_score]
difference = max(values) - min(values)
return 1.0 - difference
class JaccardSimilarity(Comparator):
"""
Calculates the similarity of two statements based on the Jaccard index.
The Jaccard index is composed of a numerator and denominator.
In the numerator, we count the number of items that are shared between the sets.
In the denominator, we count the total number of items across both sets.
Let's say we define sentences to be equivalent if 50% or more of their tokens are equivalent.
Here are two sample sentences:
The young cat is hungry.
The cat is very hungry.
When we parse these sentences to remove stopwords, we end up with the following two sets:
{young, cat, hungry}
{cat, very, hungry}
In our example above, our intersection is {cat, hungry}, which has count of two.
The union of the sets is {young, cat, very, hungry}, which has a count of four.
Therefore, our `Jaccard similarity index`_ is two divided by four, or 50%.
Given our similarity threshold above, we would consider this to be a match.
.. _`Jaccard similarity index`: https://en.wikipedia.org/wiki/Jaccard_index
"""
SIMILARITY_THRESHOLD = 0.5
def initialize_nltk_wordnet(self):
"""
Download the NLTK wordnet corpora that is required for this algorithm
to run only if the corpora has not already been downloaded.
"""
from .utils import nltk_download_corpus
nltk_download_corpus('corpora/wordnet')
def compare(self, statement, other_statement):
"""
Return the calculated similarity of two
statements based on the Jaccard index.
"""
from nltk.corpus import wordnet
import nltk
import string
a = statement.text.lower()
b = other_statement.text.lower()
# Get default English stopwords and extend with punctuation
stopwords = nltk.corpus.stopwords.words('english')
stopwords.extend(string.punctuation)
stopwords.append('')
lemmatizer = nltk.stem.wordnet.WordNetLemmatizer()
def get_wordnet_pos(pos_tag):
if pos_tag[1].startswith('J'):
return (pos_tag[0], wordnet.ADJ)
elif pos_tag[1].startswith('V'):
return (pos_tag[0], wordnet.VERB)
elif pos_tag[1].startswith('N'):
return (pos_tag[0], wordnet.NOUN)
elif pos_tag[1].startswith('R'):
return (pos_tag[0], wordnet.ADV)
else:
return (pos_tag[0], wordnet.NOUN)
ratio = 0
pos_a = map(get_wordnet_pos, nltk.pos_tag(nltk.tokenize.word_tokenize(a)))
pos_b = map(get_wordnet_pos, nltk.pos_tag(nltk.tokenize.word_tokenize(b)))
lemma_a = [
lemmatizer.lemmatize(
token.strip(string.punctuation),
pos
) for token, pos in pos_a if pos == wordnet.NOUN and token.strip(
string.punctuation
) not in stopwords
]
lemma_b = [
lemmatizer.lemmatize(
token.strip(string.punctuation),
pos
) for token, pos in pos_b if pos == wordnet.NOUN and token.strip(
string.punctuation
) not in stopwords
]
# Calculate Jaccard similarity
try:
numerator = len(set(lemma_a).intersection(lemma_b))
denominator = float(len(set(lemma_a).union(lemma_b)))
ratio = numerator / denominator
except Exception as e:
print('Error', e)
return ratio >= self.SIMILARITY_THRESHOLD
# ---------------------------------------- #
levenshtein_distance = LevenshteinDistance()
synset_distance = SynsetDistance()
sentiment_comparison = SentimentComparison()
jaccard_similarity = JaccardSimilarity()

@ -0,0 +1,15 @@
"""
ChatterBot constants
"""
'''
The maximum length of characters that the text of a statement can contain.
This should be enforced on a per-model basis by the data model for each
storage adapter.
'''
STATEMENT_TEXT_MAX_LENGTH = 400
# The maximum length of characters that the name of a tag can contain
TAG_NAME_MAX_LENGTH = 50
DEFAULT_DJANGO_APP_NAME = 'django_chatterbot'

@ -0,0 +1,229 @@
class StatementMixin(object):
"""
This class has shared methods used to
normalize different statement models.
"""
def get_tags(self):
"""
Return the list of tags for this statement.
"""
return self.tags
def add_tags(self, tags):
"""
Add a list of strings to the statement as tags.
"""
for tag in tags:
self.tags.append(tag)
class Statement(StatementMixin):
"""
A statement represents a single spoken entity, sentence or
phrase that someone can say.
"""
def __init__(self, text, **kwargs):
import sys
# Try not to allow non-string types to be passed to statements
try:
text = str(text)
except UnicodeEncodeError:
pass
# Prefer decoded utf8-strings in Python 2.7
if sys.version_info[0] < 3:
try:
text = text.decode('utf-8')
except UnicodeEncodeError:
pass
self.text = text
self.tags = kwargs.pop('tags', [])
self.in_response_to = kwargs.pop('in_response_to', [])
self.extra_data = kwargs.pop('extra_data', {})
# This is the confidence with which the chat bot believes
# this is an accurate response. This value is set when the
# statement is returned by the chat bot.
self.confidence = 0
self.storage = None
def __str__(self):
return self.text
def __repr__(self):
return '<Statement text:%s>' % (self.text)
def __hash__(self):
return hash(self.text)
def __eq__(self, other):
if not other:
return False
if isinstance(other, Statement):
return self.text == other.text
return self.text == other
def save(self):
"""
Save the statement in the database.
"""
self.storage.update(self)
def add_extra_data(self, key, value):
"""
This method allows additional data to be stored on the statement object.
Typically this data is something that pertains just to this statement.
For example, a value stored here might be the tagged parts of speech for
each word in the statement text.
- key = 'pos_tags'
- value = [('Now', 'RB'), ('for', 'IN'), ('something', 'NN'), ('different', 'JJ')]
:param key: The key to use in the dictionary of extra data.
:type key: str
:param value: The value to set for the specified key.
"""
self.extra_data[key] = value
def add_response(self, response):
"""
Add the response to the list of statements that this statement is in response to.
If the response is already in the list, increment the occurrence count of that response.
:param response: The response to add.
:type response: `Response`
"""
if not isinstance(response, Response):
raise Statement.InvalidTypeException(
'A {} was received when a {} instance was expected'.format(
type(response),
type(Response(''))
)
)
updated = False
for index in range(0, len(self.in_response_to)):
if response.text == self.in_response_to[index].text:
self.in_response_to[index].occurrence += 1
updated = True
if not updated:
self.in_response_to.append(response)
def remove_response(self, response_text):
"""
Removes a response from the statement's response list based
on the value of the response text.
:param response_text: The text of the response to be removed.
:type response_text: str
"""
for response in self.in_response_to:
if response_text == response.text:
self.in_response_to.remove(response)
return True
return False
def get_response_count(self, statement):
"""
Find the number of times that the statement has been used
as a response to the current statement.
:param statement: The statement object to get the count for.
:type statement: `Statement`
:returns: Return the number of times the statement has been used as a response.
:rtype: int
"""
for response in self.in_response_to:
if statement.text == response.text:
return response.occurrence
return 0
def serialize(self):
"""
:returns: A dictionary representation of the statement object.
:rtype: dict
"""
data = {}
data['text'] = self.text
data['in_response_to'] = []
data['extra_data'] = self.extra_data
for response in self.in_response_to:
data['in_response_to'].append(response.serialize())
return data
@property
def response_statement_cache(self):
"""
This property is to allow ChatterBot Statement objects to
be swappable with Django Statement models.
"""
return self.in_response_to
class InvalidTypeException(Exception):
def __init__(self, value='Received an unexpected value type.'):
self.value = value
def __str__(self):
return repr(self.value)
class Response(object):
"""
A response represents an entity which response to a statement.
"""
def __init__(self, text, **kwargs):
from datetime import datetime
from dateutil import parser as date_parser
self.text = text
self.created_at = kwargs.get('created_at', datetime.now())
self.occurrence = kwargs.get('occurrence', 1)
if not isinstance(self.created_at, datetime):
self.created_at = date_parser.parse(self.created_at)
def __str__(self):
return self.text
def __repr__(self):
return '<Response text:%s>' % (self.text)
def __hash__(self):
return hash(self.text)
def __eq__(self, other):
if not other:
return False
if isinstance(other, Response):
return self.text == other.text
return self.text == other
def serialize(self):
data = {}
data['text'] = self.text
data['created_at'] = self.created_at.isoformat()
data['occurrence'] = self.occurrence
return data

@ -0,0 +1,11 @@
"""
Seamlessly import the external chatterbot corpus module.
View the corpus on GitHub at https://github.com/gunthercox/chatterbot-corpus
"""
from chatterbot_corpus import Corpus
__all__ = (
'Corpus',
)

@ -0,0 +1,3 @@
default_app_config = (
'chatter.source.ext.django_chatterbot.apps.DjangoChatterBotConfig'
)

@ -0,0 +1,261 @@
from ...conversation import StatementMixin
from ... import constants
from django.db import models
from django.apps import apps
from django.utils import timezone
from django.conf import settings
DJANGO_APP_NAME = constants.DEFAULT_DJANGO_APP_NAME
STATEMENT_MODEL = 'Statement'
RESPONSE_MODEL = 'Response'
if hasattr(settings, 'CHATTERBOT'):
"""
Allow related models to be overridden in the project settings.
Default to the original settings if one is not defined.
"""
DJANGO_APP_NAME = settings.CHATTERBOT.get(
'django_app_name',
DJANGO_APP_NAME
)
STATEMENT_MODEL = settings.CHATTERBOT.get(
'statement_model',
STATEMENT_MODEL
)
RESPONSE_MODEL = settings.CHATTERBOT.get(
'response_model',
RESPONSE_MODEL
)
class AbstractBaseStatement(models.Model, StatementMixin):
"""
The abstract base statement allows other models to
be created using the attributes that exist on the
default models.
"""
text = models.CharField(
unique=True,
blank=False,
null=False,
max_length=constants.STATEMENT_TEXT_MAX_LENGTH
)
extra_data = models.CharField(
max_length=500,
blank=True
)
# This is the confidence with which the chat bot believes
# this is an accurate response. This value is set when the
# statement is returned by the chat bot.
confidence = 0
class Meta:
abstract = True
def __str__(self):
if len(self.text.strip()) > 60:
return '{}...'.format(self.text[:57])
elif len(self.text.strip()) > 0:
return self.text
return '<empty>'
def __init__(self, *args, **kwargs):
super(AbstractBaseStatement, self).__init__(*args, **kwargs)
# Responses to be saved if the statement is updated with the storage adapter
self.response_statement_cache = []
@property
def in_response_to(self):
"""
Return the response objects that are for this statement.
"""
ResponseModel = apps.get_model(DJANGO_APP_NAME, RESPONSE_MODEL)
return ResponseModel.objects.filter(statement=self)
def add_extra_data(self, key, value):
"""
Add extra data to the extra_data field.
"""
import json
if not self.extra_data:
self.extra_data = '{}'
extra_data = json.loads(self.extra_data)
extra_data[key] = value
self.extra_data = json.dumps(extra_data)
def add_tags(self, tags):
"""
Add a list of strings to the statement as tags.
(Overrides the method from StatementMixin)
"""
for tag in tags:
self.tags.create(
name=tag
)
def add_response(self, statement):
"""
Add a response to this statement.
"""
self.response_statement_cache.append(statement)
def remove_response(self, response_text):
"""
Removes a response from the statement's response list based
on the value of the response text.
:param response_text: The text of the response to be removed.
:type response_text: str
"""
is_deleted = False
response = self.in_response.filter(response__text=response_text)
if response.exists():
is_deleted = True
return is_deleted
def get_response_count(self, statement):
"""
Find the number of times that the statement has been used
as a response to the current statement.
:param statement: The statement object to get the count for.
:type statement: chatterbot.conversation.Statement
:returns: Return the number of times the statement has been used as a response.
:rtype: int
"""
return self.in_response.filter(response__text=statement.text).count()
def serialize(self):
"""
:returns: A dictionary representation of the statement object.
:rtype: dict
"""
import json
data = {}
if not self.extra_data:
self.extra_data = '{}'
data['text'] = self.text
data['in_response_to'] = []
data['extra_data'] = json.loads(self.extra_data)
for response in self.in_response.all():
data['in_response_to'].append(response.serialize())
return data
class AbstractBaseResponse(models.Model):
"""
The abstract base response allows other models to
be created using the attributes that exist on the
default models.
"""
statement = models.ForeignKey(
STATEMENT_MODEL,
related_name='in_response',
on_delete=models.CASCADE
)
response = models.ForeignKey(
STATEMENT_MODEL,
related_name='responses',
on_delete=models.CASCADE
)
created_at = models.DateTimeField(
default=timezone.now,
help_text='The date and time that this response was created at.'
)
class Meta:
abstract = True
@property
def occurrence(self):
"""
Return a count of the number of times this response has occurred.
"""
ResponseModel = apps.get_model(DJANGO_APP_NAME, RESPONSE_MODEL)
return ResponseModel.objects.filter(
statement__text=self.statement.text,
response__text=self.response.text
).count()
def __str__(self):
statement = self.statement.text
response = self.response.text
return '{} => {}'.format(
statement if len(statement) <= 20 else statement[:17] + '...',
response if len(response) <= 40 else response[:37] + '...'
)
def serialize(self):
"""
:returns: A dictionary representation of the statement object.
:rtype: dict
"""
data = {}
data['text'] = self.response.text
data['created_at'] = self.created_at.isoformat()
data['occurrence'] = self.occurrence
return data
class AbstractBaseConversation(models.Model):
"""
The abstract base conversation allows other models to
be created using the attributes that exist on the
default models.
"""
responses = models.ManyToManyField(
RESPONSE_MODEL,
related_name='conversations',
help_text='The responses in this conversation.'
)
class Meta:
abstract = True
def __str__(self):
return str(self.id)
class AbstractBaseTag(models.Model):
"""
The abstract base tag allows other models to
be created using the attributes that exist on the
default models.
"""
name = models.SlugField(
max_length=constants.TAG_NAME_MAX_LENGTH
)
statements = models.ManyToManyField(
STATEMENT_MODEL,
related_name='tags'
)
class Meta:
abstract = True
def __str__(self):
return self.name

@ -0,0 +1,31 @@
from django.contrib import admin
from .models import (
Statement, Response, Conversation, Tag
)
class StatementAdmin(admin.ModelAdmin):
list_display = ('text', )
list_filter = ('text', )
search_fields = ('text', )
class ResponseAdmin(admin.ModelAdmin):
list_display = ('statement', 'response', 'occurrence', )
search_fields = ['statement__text', 'response__text']
class ConversationAdmin(admin.ModelAdmin):
list_display = ('id', )
class TagAdmin(admin.ModelAdmin):
list_display = ('name', )
list_filter = ('name', )
search_fields = ('name', )
admin.site.register(Statement, StatementAdmin)
admin.site.register(Response, ResponseAdmin)
admin.site.register(Conversation, ConversationAdmin)
admin.site.register(Tag, TagAdmin)

@ -0,0 +1,8 @@
from django.apps import AppConfig
class DjangoChatterBotConfig(AppConfig):
name = 'chatter.source.ext.django_chatterbot'
label = 'django_chatterbot'
verbose_name = 'Django ChatterBot'

@ -0,0 +1,42 @@
"""
These factories are used to generate fake data for testing.
"""
import factory
from . import models
from ... import constants
from factory.django import DjangoModelFactory
class StatementFactory(DjangoModelFactory):
text = factory.Faker(
'text',
max_nb_chars=constants.STATEMENT_TEXT_MAX_LENGTH
)
class Meta:
model = models.Statement
class ResponseFactory(DjangoModelFactory):
statement = factory.SubFactory(StatementFactory)
response = factory.SubFactory(StatementFactory)
class Meta:
model = models.Response
class ConversationFactory(DjangoModelFactory):
class Meta:
model = models.Conversation
class TagFactory(DjangoModelFactory):
name = factory.Faker('word')
class Meta:
model = models.Tag

@ -0,0 +1,29 @@
from django.core.management.base import BaseCommand
class Command(BaseCommand):
"""
A Django management command for calling a
chat bot's training method.
"""
help = 'Trains the database used by the chat bot'
can_import_settings = True
def handle(self, *args, **options):
from ..... import ChatBot
from ... import settings
chatterbot = ChatBot(**settings.CHATTERBOT)
chatterbot.train(chatterbot.training_data)
# Django 1.8 does not define SUCCESS
if hasattr(self.style, 'SUCCESS'):
style = self.style.SUCCESS
else:
style = self.style.NOTICE
self.stdout.write(style('Starting training...'))
training_class = chatterbot.trainer.__class__.__name__
self.stdout.write(style('ChatterBot trained using "%s"' % training_class))

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = []
operations = [
migrations.CreateModel(
name='Response',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('occurrence', models.PositiveIntegerField(default=0)),
],
),
migrations.CreateModel(
name='Statement',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('text', models.CharField(max_length=255, unique=True)),
],
),
migrations.AddField(
model_name='response',
name='response',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='+', to='django_chatterbot.Statement'),
),
migrations.AddField(
model_name='response',
name='statement',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='in_response_to', to='django_chatterbot.Statement'),
),
]

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.2 on 2016-10-30 12:13
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='statement',
name='extra_data',
field=models.CharField(default='{}', max_length=500),
preserve_default=False,
),
]

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2016-12-12 00:06
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0002_statement_extra_data'),
]
operations = [
migrations.AlterField(
model_name='response',
name='occurrence',
field=models.PositiveIntegerField(default=1),
),
]

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.3 on 2016-12-04 23:52
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0003_change_occurrence_default'),
]
operations = [
migrations.AlterField(
model_name='response',
name='statement',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='in_response', to='django_chatterbot.Statement'),
),
migrations.AlterField(
model_name='response',
name='response',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='responses', to='django_chatterbot.Statement'),
),
]

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.1 on 2016-12-29 19:20
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0004_rename_in_response_to'),
]
operations = [
migrations.AddField(
model_name='statement',
name='created_at',
field=models.DateTimeField(
default=django.utils.timezone.now,
help_text='The date and time that this statement was created at.'
),
),
]

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2017-01-17 07:02
from __future__ import unicode_literals
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0005_statement_created_at'),
]
operations = [
migrations.CreateModel(
name='Conversation',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
],
),
migrations.AlterField(
model_name='statement',
name='created_at',
field=models.DateTimeField(default=django.utils.timezone.now, help_text='The date and time that this statement was created at.'),
),
migrations.AddField(
model_name='conversation',
name='statements',
field=models.ManyToManyField(help_text='The statements in this conversation.', related_name='conversation', to='django_chatterbot.Statement'),
),
]

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-07-18 00:16
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0006_create_conversation'),
]
operations = [
migrations.AddField(
model_name='response',
name='created_at',
field=models.DateTimeField(
default=django.utils.timezone.now,
help_text='The date and time that this response was created at.'
),
),
]

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-07-18 11:25
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0007_response_created_at'),
]
operations = [
migrations.RemoveField(
model_name='conversation',
name='statements',
),
migrations.RemoveField(
model_name='response',
name='occurrence',
),
migrations.RemoveField(
model_name='statement',
name='created_at',
),
migrations.AddField(
model_name='conversation',
name='responses',
field=models.ManyToManyField(help_text='The responses in this conversation.', related_name='conversations', to='django_chatterbot.Response'),
),
]

@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11a1 on 2017-07-07 00:12
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0008_update_conversations'),
]
operations = [
migrations.CreateModel(
name='Tag',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.SlugField()),
],
options={
'abstract': False,
},
),
migrations.AlterField(
model_name='statement',
name='text',
field=models.CharField(max_length=255, unique=True),
),
migrations.AddField(
model_name='tag',
name='statements',
field=models.ManyToManyField(related_name='tags', to='django_chatterbot.Statement'),
),
]

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.4 on 2017-08-16 00:56
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0009_tags'),
]
operations = [
migrations.AlterField(
model_name='statement',
name='text',
field=models.CharField(max_length=400, unique=True),
),
]

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.4 on 2017-08-20 13:55
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_chatterbot', '0010_statement_text'),
]
operations = [
migrations.AlterField(
model_name='statement',
name='extra_data',
field=models.CharField(blank=True, max_length=500),
),
]

@ -0,0 +1,34 @@
from .abstract_models import (
AbstractBaseConversation, AbstractBaseResponse,
AbstractBaseStatement, AbstractBaseTag
)
class Statement(AbstractBaseStatement):
"""
A statement represents a single spoken entity, sentence or
phrase that someone can say.
"""
pass
class Response(AbstractBaseResponse):
"""
A connection between a statement and anther statement
that response to it.
"""
pass
class Conversation(AbstractBaseConversation):
"""
A sequence of statements representing a conversation.
"""
pass
class Tag(AbstractBaseTag):
"""
A label that categorizes a statement.
"""
pass

@ -0,0 +1,19 @@
"""
Default ChatterBot settings for Django.
"""
from django.conf import settings
from ... import constants
CHATTERBOT_SETTINGS = getattr(settings, 'CHATTERBOT', {})
CHATTERBOT_DEFAULTS = {
'name': 'ChatterBot',
'storage_adapter': 'chatter.source.storage.DjangoStorageAdapter',
'input_adapter': 'chatter.source.input.VariableInputTypeAdapter',
'output_adapter': 'chatter.source.output.OutputAdapter',
'django_app_name': constants.DEFAULT_DJANGO_APP_NAME
}
CHATTERBOT = CHATTERBOT_DEFAULTS.copy()
CHATTERBOT.update(CHATTERBOT_SETTINGS)

@ -0,0 +1,11 @@
from django.conf.urls import url
from .views import ChatterBotView
urlpatterns = [
url(
r'^$',
ChatterBotView.as_view(),
name='chatterbot',
),
]

@ -0,0 +1,118 @@
import json
from django.views.generic import View
from django.http import JsonResponse
from ... import ChatBot
from . import settings
class ChatterBotViewMixin(object):
"""
Subclass this mixin for access to the 'chatterbot' attribute.
"""
chatterbot = ChatBot(**settings.CHATTERBOT)
def validate(self, data):
"""
Validate the data recieved from the client.
* The data should contain a text attribute.
"""
from django.core.exceptions import ValidationError
if 'text' not in data:
raise ValidationError('The attribute "text" is required.')
def get_conversation(self, request):
"""
Return the conversation for the session if one exists.
Create a new conversation if one does not exist.
"""
from .models import Conversation, Response
class Obj(object):
def __init__(self):
self.id = None
self.statements = []
conversation = Obj()
conversation.id = request.session.get('conversation_id', 0)
existing_conversation = False
try:
Conversation.objects.get(id=conversation.id)
existing_conversation = True
except Conversation.DoesNotExist:
conversation_id = self.chatterbot.storage.create_conversation()
request.session['conversation_id'] = conversation_id
conversation.id = conversation_id
if existing_conversation:
responses = Response.objects.filter(
conversations__id=conversation.id
)
for response in responses:
conversation.statements.append(response.statement.serialize())
conversation.statements.append(response.response.serialize())
return conversation
class ChatterBotView(ChatterBotViewMixin, View):
"""
Provide an API endpoint to interact with ChatterBot.
"""
def post(self, request, *args, **kwargs):
"""
Return a response to the statement in the posted data.
"""
input_data = json.loads(request.read().decode('utf-8'))
self.validate(input_data)
conversation = self.get_conversation(request)
response = self.chatterbot.get_response(input_data, conversation.id)
response_data = response.serialize()
return JsonResponse(response_data, status=200)
def get(self, request, *args, **kwargs):
"""
Return data corresponding to the current conversation.
"""
conversation = self.get_conversation(request)
data = {
'detail': 'You should make a POST request to this endpoint.',
'name': self.chatterbot.name,
'conversation': conversation.statements
}
# Return a method not allowed response
return JsonResponse(data, status=405)
def patch(self, request, *args, **kwargs):
"""
The patch method is not allowed for this endpoint.
"""
data = {
'detail': 'You should make a POST request to this endpoint.'
}
# Return a method not allowed response
return JsonResponse(data, status=405)
def delete(self, request, *args, **kwargs):
"""
The delete method is not allowed for this endpoint.
"""
data = {
'detail': 'You should make a POST request to this endpoint.'
}
# Return a method not allowed response
return JsonResponse(data, status=405)

@ -0,0 +1,132 @@
from sqlalchemy import Table, Column, Integer, DateTime, ForeignKey, PickleType
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import declared_attr, declarative_base
from ...constants import TAG_NAME_MAX_LENGTH, STATEMENT_TEXT_MAX_LENGTH
from .types import UnicodeString
from ...conversation import StatementMixin
class ModelBase(object):
"""
An augmented base class for SqlAlchemy models.
"""
@declared_attr
def __tablename__(cls):
"""
Return the lowercase class name as the name of the table.
"""
return cls.__name__.lower()
id = Column(
Integer,
primary_key=True,
autoincrement=True
)
Base = declarative_base(cls=ModelBase)
tag_association_table = Table(
'tag_association',
Base.metadata,
Column('tag_id', Integer, ForeignKey('tag.id')),
Column('statement_id', Integer, ForeignKey('statement.id'))
)
class Tag(Base):
"""
A tag that describes a statement.
"""
name = Column(UnicodeString(TAG_NAME_MAX_LENGTH))
class Statement(Base, StatementMixin):
"""
A Statement represents a sentence or phrase.
"""
text = Column(UnicodeString(STATEMENT_TEXT_MAX_LENGTH), unique=True)
tags = relationship(
'Tag',
secondary=lambda: tag_association_table,
backref='statements'
)
extra_data = Column(PickleType)
in_response_to = relationship(
'Response',
back_populates='statement_table'
)
def get_tags(self):
"""
Return a list of tags for this statement.
"""
return [tag.name for tag in self.tags]
def get_statement(self):
from ...conversation import Statement as StatementObject
from ...conversation import Response as ResponseObject
statement = StatementObject(
self.text,
tags=[tag.name for tag in self.tags],
extra_data=self.extra_data
)
for response in self.in_response_to:
statement.add_response(
ResponseObject(text=response.text, occurrence=response.occurrence)
)
return statement
class Response(Base):
"""
Response, contains responses related to a given statement.
"""
text = Column(UnicodeString(STATEMENT_TEXT_MAX_LENGTH))
created_at = Column(
DateTime(timezone=True),
server_default=func.now()
)
occurrence = Column(Integer, default=1)
statement_text = Column(UnicodeString(STATEMENT_TEXT_MAX_LENGTH), ForeignKey('statement.text'))
statement_table = relationship(
'Statement',
back_populates='in_response_to',
cascade='all',
uselist=False
)
conversation_association_table = Table(
'conversation_association',
Base.metadata,
Column('conversation_id', Integer, ForeignKey('conversation.id')),
Column('statement_id', Integer, ForeignKey('statement.id'))
)
class Conversation(Base):
"""
A conversation.
"""
statements = relationship(
'Statement',
secondary=lambda: conversation_association_table,
backref='conversations'
)

@ -0,0 +1,21 @@
from sqlalchemy.types import TypeDecorator, Unicode
class UnicodeString(TypeDecorator):
"""
Type for unicode strings.
"""
impl = Unicode
def process_bind_param(self, value, dialect):
"""
Coerce Python bytestrings to unicode before
saving them to the database.
"""
import sys
if sys.version_info[0] < 3:
if isinstance(value, str):
value = value.decode('utf-8')
return value

@ -0,0 +1,47 @@
"""
Filters set the base query that gets passed to the storage adapter.
"""
class Filter(object):
"""
A base filter object from which all other
filters should be subclassed.
"""
def filter_selection(self, chatterbot, conversation_id):
"""
Because this is the base filter class, this method just
returns the storage adapter's base query. Other filters
are expected to override this method.
"""
return chatterbot.storage.base_query
class RepetitiveResponseFilter(Filter):
"""
A filter that eliminates possibly repetitive responses to prevent
a chat bot from repeating statements that it has recently said.
"""
def filter_selection(self, chatterbot, conversation_id):
text_of_recent_responses = []
# TODO: Add a larger quantity of response history
latest_response = chatterbot.storage.get_latest_response(conversation_id)
if latest_response:
text_of_recent_responses.append(latest_response.text)
# Return the query with no changes if there are no statements to exclude
if not text_of_recent_responses:
return super(RepetitiveResponseFilter, self).filter_selection(
chatterbot,
conversation_id
)
query = chatterbot.storage.base_query.statement_text_not_in(
text_of_recent_responses
)
return query

@ -0,0 +1,18 @@
from .input_adapter import InputAdapter
from .microsoft import Microsoft
from .gitter import Gitter
from .hipchat import HipChat
from .mailgun import Mailgun
from .terminal import TerminalAdapter
from .variable_input_type_adapter import VariableInputTypeAdapter
__all__ = (
'InputAdapter',
'Microsoft',
'Gitter',
'HipChat',
'Mailgun',
'TerminalAdapter',
'VariableInputTypeAdapter',
)

@ -0,0 +1,176 @@
from __future__ import unicode_literals
from time import sleep
from . import InputAdapter
from ..conversation import Statement
class Gitter(InputAdapter):
"""
An input adapter that allows a ChatterBot instance to get
input statements from a Gitter room.
"""
def __init__(self, **kwargs):
super(Gitter, self).__init__(**kwargs)
self.gitter_host = kwargs.get('gitter_host', 'https://api.gitter.im/v1/')
self.gitter_room = kwargs.get('gitter_room')
self.gitter_api_token = kwargs.get('gitter_api_token')
self.only_respond_to_mentions = kwargs.get('gitter_only_respond_to_mentions', True)
self.sleep_time = kwargs.get('gitter_sleep_time', 4)
authorization_header = 'Bearer {}'.format(self.gitter_api_token)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json',
'Accept': 'application/json'
}
# Join the Gitter room
room_data = self.join_room(self.gitter_room)
self.room_id = room_data.get('id')
user_data = self.get_user_data()
self.user_id = user_data[0].get('id')
self.username = user_data[0].get('username')
def _validate_status_code(self, response):
code = response.status_code
if code not in [200, 201]:
raise self.HTTPStatusException('{} status code recieved'.format(code))
def join_room(self, room_name):
"""
Join the specified Gitter room.
"""
import requests
endpoint = '{}rooms'.format(self.gitter_host)
response = requests.post(
endpoint,
headers=self.headers,
json={'uri': room_name}
)
self.logger.info('{} joining room {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def get_user_data(self):
import requests
endpoint = '{}user'.format(self.gitter_host)
response = requests.get(
endpoint,
headers=self.headers
)
self.logger.info('{} retrieving user data {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def mark_messages_as_read(self, message_ids):
"""
Mark the specified message ids as read.
"""
import requests
endpoint = '{}user/{}/rooms/{}/unreadItems'.format(
self.gitter_host, self.user_id, self.room_id
)
response = requests.post(
endpoint,
headers=self.headers,
json={'chat': message_ids}
)
self.logger.info('{} marking messages as read {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def get_most_recent_message(self):
"""
Get the most recent message from the Gitter room.
"""
import requests
endpoint = '{}rooms/{}/chatMessages?limit=1'.format(self.gitter_host, self.room_id)
response = requests.get(
endpoint,
headers=self.headers
)
self.logger.info('{} getting most recent message'.format(
response.status_code
))
self._validate_status_code(response)
data = response.json()
if data:
return data[0]
return None
def _contains_mention(self, mentions):
for mention in mentions:
if self.username == mention.get('screenName'):
return True
return False
def should_respond(self, data):
"""
Takes the API response data from a single message.
Returns true if the chat bot should respond.
"""
if data:
unread = data.get('unread', False)
if self.only_respond_to_mentions:
if unread and self._contains_mention(data['mentions']):
return True
else:
return False
elif unread:
return True
return False
def remove_mentions(self, text):
"""
Return a string that has no leading mentions.
"""
import re
text_without_mentions = re.sub(r'@\S+', '', text)
# Remove consecutive spaces
text_without_mentions = re.sub(' +', ' ', text_without_mentions.strip())
return text_without_mentions
def process_input(self, statement):
new_message = False
while not new_message:
data = self.get_most_recent_message()
if self.should_respond(data):
self.mark_messages_as_read([data['id']])
new_message = True
sleep(self.sleep_time)
text = self.remove_mentions(data['text'])
statement = Statement(text)
return statement
class HTTPStatusException(Exception):
"""
Exception raised when unexpected non-success HTTP
status codes are returned in a response.
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

@ -0,0 +1,113 @@
from __future__ import unicode_literals
from time import sleep
from . import InputAdapter
from ..conversation import Statement
class HipChat(InputAdapter):
"""
An input adapter that allows a ChatterBot instance to get
input statements from a HipChat room.
"""
def __init__(self, **kwargs):
super(HipChat, self).__init__(**kwargs)
self.hipchat_host = kwargs.get('hipchat_host')
self.hipchat_access_token = kwargs.get('hipchat_access_token')
self.hipchat_room = kwargs.get('hipchat_room')
self.session_id = str(self.chatbot.default_session.uuid)
import requests
self.session = requests.Session()
self.session.verify = kwargs.get('ssl_verify', True)
authorization_header = 'Bearer {}'.format(self.hipchat_access_token)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json'
}
# This is a list of the messages that have been responded to
self.recent_message_ids = self.get_initial_ids()
def get_initial_ids(self):
"""
Returns a list of the most recent message ids.
"""
data = self.view_recent_room_history(
self.hipchat_room,
max_results=75
)
results = set()
for item in data['items']:
results.add(item['id'])
return results
def view_recent_room_history(self, room_id_or_name, max_results=1):
"""
https://www.hipchat.com/docs/apiv2/method/view_recent_room_history
"""
recent_histroy_url = '{}/v2/room/{}/history?max-results={}'.format(
self.hipchat_host,
room_id_or_name,
max_results
)
response = self.session.get(
recent_histroy_url,
headers=self.headers
)
return response.json()
def get_most_recent_message(self, room_id_or_name):
"""
Return the most recent message from the HipChat room.
"""
data = self.view_recent_room_history(room_id_or_name)
items = data['items']
if not items:
return None
return items[-1]
def process_input(self, statement):
"""
Process input from the HipChat room.
"""
new_message = False
response_statement = self.chatbot.storage.get_latest_response(
self.session_id
)
if response_statement:
last_message_id = response_statement.extra_data.get(
'hipchat_message_id', None
)
if last_message_id:
self.recent_message_ids.add(last_message_id)
while not new_message:
data = self.get_most_recent_message(self.hipchat_room)
if data and data['id'] not in self.recent_message_ids:
self.recent_message_ids.add(data['id'])
new_message = True
else:
pass
sleep(3.5)
text = data['message']
statement = Statement(text)
statement.add_extra_data('hipchat_message_id', data['id'])
return statement

@ -0,0 +1,33 @@
from __future__ import unicode_literals
from ..adapters import Adapter
class InputAdapter(Adapter):
"""
This is an abstract class that represents the
interface that all input adapters should implement.
"""
def process_input(self, *args, **kwargs):
"""
Returns a statement object based on the input source.
"""
raise self.AdapterMethodNotImplementedError()
def process_input_statement(self, *args, **kwargs):
"""
Return an existing statement object (if one exists).
"""
input_statement = self.process_input(*args, **kwargs)
self.logger.info('Received input statement: {}'.format(input_statement.text))
existing_statement = self.chatbot.storage.find(input_statement.text)
if existing_statement:
self.logger.info('"{}" is a known statement'.format(input_statement.text))
input_statement = existing_statement
else:
self.logger.info('"{}" is not a known statement'.format(input_statement.text))
return input_statement

@ -0,0 +1,61 @@
from __future__ import unicode_literals
import datetime
from . import InputAdapter
from ..conversation import Statement
class Mailgun(InputAdapter):
"""
Get input from Mailgun.
"""
def __init__(self, **kwargs):
super(Mailgun, self).__init__(**kwargs)
# Use the bot's name for the name of the sender
self.name = kwargs.get('name')
self.from_address = kwargs.get('mailgun_from_address')
self.api_key = kwargs.get('mailgun_api_key')
self.endpoint = kwargs.get('mailgun_api_endpoint')
def get_email_stored_events(self):
import requests
yesterday = datetime.datetime.now() - datetime.timedelta(1)
return requests.get(
'{}/events'.format(self.endpoint),
auth=('api', self.api_key),
params={
'begin': yesterday.isoformat(),
'ascending': 'yes',
'limit': 1
}
)
def get_stored_email_urls(self):
response = self.get_email_stored_events()
data = response.json()
for item in data.get('items', []):
if 'storage' in item:
if 'url' in item['storage']:
yield item['storage']['url']
def get_message(self, url):
import requests
return requests.get(
url,
auth=('api', self.api_key)
)
def process_input(self, statement):
urls = self.get_stored_email_urls()
url = list(urls)[0]
response = self.get_message(url)
message = response.json()
text = message.get('stripped-text')
return Statement(text)

@ -0,0 +1,115 @@
from __future__ import unicode_literals
from time import sleep
from . import InputAdapter
from ..conversation import Statement
class Microsoft(InputAdapter):
"""
An input adapter that allows a ChatterBot instance to get
input statements from a Microsoft Bot using *Directline client protocol*.
https://docs.botframework.com/en-us/restapi/directline/#navtitle
"""
def __init__(self, **kwargs):
super(Microsoft, self).__init__(**kwargs)
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.directline_host = kwargs.get('directline_host', 'https://directline.botframework.com')
# NOTE: Direct Line client credentials are different from your bot's
# credentials
self.direct_line_token_or_secret = kwargs.\
get('direct_line_token_or_secret')
authorization_header = 'BotConnector {}'.\
format(self.direct_line_token_or_secret)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json',
'Accept': 'application/json',
'charset': 'utf-8'
}
conversation_data = self.start_conversation()
self.conversation_id = conversation_data.get('conversationId')
self.conversation_token = conversation_data.get('token')
def _validate_status_code(self, response):
code = response.status_code
if not code == 200:
raise self.HTTPStatusException('{} status code recieved'.
format(code))
def start_conversation(self):
import requests
endpoint = '{host}/api/conversations'.format(host=self.directline_host)
response = requests.post(
endpoint,
headers=self.headers,
verify=False
)
self.logger.info('{} starting conversation {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def get_most_recent_message(self):
import requests
endpoint = '{host}/api/conversations/{id}/messages'\
.format(host=self.directline_host,
id=self.conversation_id)
response = requests.get(
endpoint,
headers=self.headers,
verify=False
)
self.logger.info('{} retrieving most recent messages {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
data = response.json()
if data['messages']:
last_msg = int(data['watermark'])
return data['messages'][last_msg - 1]
return None
def process_input(self, statement):
new_message = False
data = None
while not new_message:
data = self.get_most_recent_message()
if data and data['id']:
new_message = True
else:
pass
sleep(3.5)
text = data['text']
statement = Statement(text)
self.logger.info('processing user statement {}'.format(statement))
return statement
class HTTPStatusException(Exception):
"""
Exception raised when unexpected non-success HTTP
status codes are returned in a response.
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

@ -0,0 +1,18 @@
from __future__ import unicode_literals
from . import InputAdapter
from ..conversation import Statement
from ..utils import input_function
class TerminalAdapter(InputAdapter):
"""
A simple adapter that allows ChatterBot to
communicate through the terminal.
"""
def process_input(self, *args, **kwargs):
"""
Read the user's input from the terminal.
"""
user_input = input_function()
return Statement(user_input)

@ -0,0 +1,65 @@
from __future__ import unicode_literals
from . import InputAdapter
from ..conversation import Statement
class VariableInputTypeAdapter(InputAdapter):
JSON = 'json'
TEXT = 'text'
OBJECT = 'object'
VALID_FORMATS = (JSON, TEXT, OBJECT, )
def detect_type(self, statement):
import sys
if sys.version_info[0] < 3:
string_types = basestring # NOQA
else:
string_types = str
if hasattr(statement, 'text'):
return self.OBJECT
if isinstance(statement, string_types):
return self.TEXT
if isinstance(statement, dict):
return self.JSON
input_type = type(statement)
raise self.UnrecognizedInputFormatException(
'The type {} is not recognized as a valid input type.'.format(
input_type
)
)
def process_input(self, statement):
input_type = self.detect_type(statement)
# Return the statement object without modification
if input_type == self.OBJECT:
return statement
# Convert the input string into a statement object
if input_type == self.TEXT:
return Statement(statement)
# Convert input dictionary into a statement object
if input_type == self.JSON:
input_json = dict(statement)
text = input_json['text']
del input_json['text']
return Statement(text, **input_json)
class UnrecognizedInputFormatException(Exception):
"""
Exception raised when an input format is specified that is
not in the VariableInputTypeAdapter.VALID_FORMATS variable.
"""
def __init__(self, value='The input format was not recognized.'):
self.value = value
def __str__(self):
return repr(self.value)

@ -0,0 +1,20 @@
from .logic_adapter import LogicAdapter
from .best_match import BestMatch
from .low_confidence import LowConfidenceAdapter
from .mathematical_evaluation import MathematicalEvaluation
from .multi_adapter import MultiLogicAdapter
from .no_knowledge_adapter import NoKnowledgeAdapter
from .specific_response import SpecificResponseAdapter
from .time_adapter import TimeLogicAdapter
__all__ = (
'LogicAdapter',
'BestMatch',
'LowConfidenceAdapter',
'MathematicalEvaluation',
'MultiLogicAdapter',
'NoKnowledgeAdapter',
'SpecificResponseAdapter',
'TimeLogicAdapter',
)

@ -0,0 +1,84 @@
from __future__ import unicode_literals
from .logic_adapter import LogicAdapter
class BestMatch(LogicAdapter):
"""
A logic adapter that returns a response based on known responses to
the closest matches to the input statement.
"""
def get(self, input_statement):
"""
Takes a statement string and a list of statement strings.
Returns the closest matching statement from the list.
"""
statement_list = self.chatbot.storage.get_response_statements()
if not statement_list:
if self.chatbot.storage.count():
# Use a randomly picked statement
self.logger.info(
'No statements have known responses. ' +
'Choosing a random response to return.'
)
random_response = self.chatbot.storage.get_random()
random_response.confidence = 0
return random_response
else:
raise self.EmptyDatasetException()
closest_match = input_statement
closest_match.confidence = 0
# Find the closest matching known statement
for statement in statement_list:
confidence = self.compare_statements(input_statement, statement)
if confidence > closest_match.confidence:
statement.confidence = confidence
closest_match = statement
return closest_match
def can_process(self, statement):
"""
Check that the chatbot's storage adapter is available to the logic
adapter and there is at least one statement in the database.
"""
return self.chatbot.storage.count()
def process(self, input_statement):
# Select the closest match to the input statement
closest_match = self.get(input_statement)
self.logger.info('Using "{}" as a close match to "{}"'.format(
input_statement.text, closest_match.text
))
# Get all statements that are in response to the closest match
response_list = self.chatbot.storage.filter(
in_response_to__contains=closest_match.text
)
if response_list:
self.logger.info(
'Selecting response from {} optimal responses.'.format(
len(response_list)
)
)
response = self.select_response(input_statement, response_list)
response.confidence = closest_match.confidence
self.logger.info('Response selected. Using "{}"'.format(response.text))
else:
response = self.chatbot.storage.get_random()
self.logger.info(
'No response to "{}" found. Selecting a random response.'.format(
closest_match.text
)
)
# Set confidence to zero because a random response is selected
response.confidence = 0
return response

@ -0,0 +1,100 @@
from __future__ import unicode_literals
from ..adapters import Adapter
from ..utils import import_module
class LogicAdapter(Adapter):
"""
This is an abstract class that represents the interface
that all logic adapters should implement.
:param statement_comparison_function: The dot-notated import path to a statement comparison function.
Defaults to ``levenshtein_distance``.
:param response_selection_method: The a response selection method.
Defaults to ``get_first_response``.
"""
def __init__(self, **kwargs):
super(LogicAdapter, self).__init__(**kwargs)
from ..comparisons import levenshtein_distance
from ..response_selection import get_first_response
# Import string module parameters
if 'statement_comparison_function' in kwargs:
import_path = kwargs.get('statement_comparison_function')
if isinstance(import_path, str):
kwargs['statement_comparison_function'] = import_module(import_path)
if 'response_selection_method' in kwargs:
import_path = kwargs.get('response_selection_method')
if isinstance(import_path, str):
kwargs['response_selection_method'] = import_module(import_path)
# By default, compare statements using Levenshtein distance
self.compare_statements = kwargs.get(
'statement_comparison_function',
levenshtein_distance
)
# By default, select the first available response
self.select_response = kwargs.get(
'response_selection_method',
get_first_response
)
def get_initialization_functions(self):
"""
Return a dictionary of functions to be run once when the chat bot is instantiated.
"""
return self.compare_statements.get_initialization_functions()
def initialize(self):
for function in self.get_initialization_functions().values():
function()
def can_process(self, statement):
"""
A preliminary check that is called to determine if a
logic adapter can process a given statement. By default,
this method returns true but it can be overridden in
child classes as needed.
:rtype: bool
"""
return True
def process(self, statement):
"""
Override this method and implement your logic for selecting a response to an input statement.
A confidence value and the selected response statement should be returned.
The confidence value represents a rating of how accurate the logic adapter
expects the selected response to be. Confidence scores are used to select
the best response from multiple logic adapters.
The confidence value should be a number between 0 and 1 where 0 is the
lowest confidence level and 1 is the highest.
:param statement: An input statement to be processed by the logic adapter.
:type statement: Statement
:rtype: Statement
"""
raise self.AdapterMethodNotImplementedError()
@property
def class_name(self):
"""
Return the name of the current logic adapter class.
This is typically used for logging and debugging.
"""
return str(self.__class__.__name__)
class EmptyDatasetException(Exception):
def __init__(self, value='An empty set was received when at least one statement was expected.'):
self.value = value
def __str__(self):
return repr(self.value)

@ -0,0 +1,58 @@
from __future__ import unicode_literals
from ..conversation import Statement
from .best_match import BestMatch
class LowConfidenceAdapter(BestMatch):
"""
Returns a default response with a high confidence
when a high confidence response is not known.
:kwargs:
* *threshold* (``float``) --
The low confidence value that triggers this adapter.
Defaults to 0.65.
* *default_response* (``str``) or (``iterable``)--
The response returned by this logic adaper.
* *response_selection_method* (``str``) or (``callable``)
The a response selection method.
Defaults to ``get_first_response``.
"""
def __init__(self, **kwargs):
super(LowConfidenceAdapter, self).__init__(**kwargs)
self.confidence_threshold = kwargs.get('threshold', 0.65)
default_responses = kwargs.get(
'default_response', "I'm sorry, I do not understand."
)
# Convert a single string into a list
if isinstance(default_responses, str):
default_responses = [
default_responses
]
self.default_responses = [
Statement(text=default) for default in default_responses
]
def process(self, input_statement):
"""
Return a default response with a high confidence if
a high confidence response is not known.
"""
# Select the closest match to the input statement
closest_match = self.get(input_statement)
# Choose a response from the list of options
response = self.select_response(input_statement, self.default_responses)
# Confidence should be high only if it is less than the threshold
if closest_match.confidence < self.confidence_threshold:
response.confidence = 1
else:
response.confidence = 0
return response

@ -0,0 +1,67 @@
from __future__ import unicode_literals
from . import LogicAdapter
from ..conversation import Statement
class MathematicalEvaluation(LogicAdapter):
"""
The MathematicalEvaluation logic adapter parses input to determine
whether the user is asking a question that requires math to be done.
If so, the equation is extracted from the input and returned with
the evaluated result.
For example:
User: 'What is three plus five?'
Bot: 'Three plus five equals eight'
:kwargs:
* *language* (``str``) --
The language is set to 'ENG' for English by default.
"""
def __init__(self, **kwargs):
super(MathematicalEvaluation, self).__init__(**kwargs)
self.language = kwargs.get('language', 'ENG')
self.cache = {}
def can_process(self, statement):
"""
Determines whether it is appropriate for this
adapter to respond to the user input.
"""
response = self.process(statement)
self.cache[statement.text] = response
return response.confidence == 1
def process(self, statement):
"""
Takes a statement string.
Returns the equation from the statement with the mathematical terms solved.
"""
from mathparse import mathparse
input_text = statement.text
# Use the result cached by the process method if it exists
if input_text in self.cache:
cached_result = self.cache[input_text]
self.cache = {}
return cached_result
# Getting the mathematical terms within the input statement
expression = mathparse.extract_expression(input_text, language=self.language)
response = Statement(text=expression)
try:
response.text += ' = ' + str(
mathparse.parse(expression, language=self.language)
)
# The confidence is 1 if the expression could be evaluated
response.confidence = 1
except mathparse.PostfixTokenEvaluationException:
response.confidence = 0
return response

@ -0,0 +1,153 @@
from __future__ import unicode_literals
from collections import Counter
from .. import utils
from .logic_adapter import LogicAdapter
class MultiLogicAdapter(LogicAdapter):
"""
MultiLogicAdapter allows ChatterBot to use multiple logic
adapters. It has methods that allow ChatterBot to add an
adapter, set the chat bot, and process an input statement
to get a response.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Logic adapters added by the chat bot
self.adapters = []
# Required logic adapters that must always be present
self.system_adapters = []
def get_initialization_functions(self):
"""
Get the initialization functions for each logic adapter.
"""
functions_dict = {}
# Iterate over each adapter and get its initialization functions
for logic_adapter in self.get_adapters():
functions = logic_adapter.get_initialization_functions()
functions_dict.update(functions)
return functions_dict
def process(self, statement):
"""
Returns the output of a selection of logic adapters
for a given input statement.
:param statement: The input statement to be processed.
"""
results = []
result = None
max_confidence = -1
for adapter in self.get_adapters():
if adapter.can_process(statement):
output = adapter.process(statement)
results.append((output.confidence, output, ))
self.logger.info(
'{} selected "{}" as a response with a confidence of {}'.format(
adapter.class_name, output.text, output.confidence
)
)
if output.confidence > max_confidence:
result = output
max_confidence = output.confidence
else:
self.logger.info(
'Not processing the statement using {}'.format(adapter.class_name)
)
# If multiple adapters agree on the same statement,
# then that statement is more likely to be the correct response
if len(results) >= 3:
statements = [s[1] for s in results]
count = Counter(statements)
most_common = count.most_common()
if most_common[0][1] > 1:
result = most_common[0][0]
max_confidence = self.get_greatest_confidence(result, results)
result.confidence = max_confidence
return result
def get_greatest_confidence(self, statement, options):
"""
Returns the greatest confidence value for a statement that occurs
multiple times in the set of options.
:param statement: A statement object.
:param options: A tuple in the format of (confidence, statement).
"""
values = []
for option in options:
if option[1] == statement:
values.append(option[0])
return max(values)
def get_adapters(self):
"""
Return a list of all logic adapters being used, including system logic adapters.
"""
adapters = []
adapters.extend(self.adapters)
adapters.extend(self.system_adapters)
return adapters
def add_adapter(self, adapter, **kwargs):
"""
Appends a logic adapter to the list of logic adapters being used.
:param adapter: The logic adapter to be added.
:type adapter: `LogicAdapter`
"""
utils.validate_adapter_class(adapter, LogicAdapter)
adapter = utils.initialize_class(adapter, **kwargs)
self.adapters.append(adapter)
def insert_logic_adapter(self, logic_adapter, insert_index, **kwargs):
"""
Adds a logic adapter at a specified index.
:param logic_adapter: The string path to the logic adapter to add.
:type logic_adapter: str
:param insert_index: The index to insert the logic adapter into the list at.
:type insert_index: int
"""
utils.validate_adapter_class(logic_adapter, LogicAdapter)
NewAdapter = utils.import_module(logic_adapter)
adapter = NewAdapter(**kwargs)
self.adapters.insert(insert_index, adapter)
def remove_logic_adapter(self, adapter_name):
"""
Removes a logic adapter from the chat bot.
:param adapter_name: The class name of the adapter to remove.
:type adapter_name: str
"""
for index, adapter in enumerate(self.adapters):
if adapter_name == type(adapter).__name__:
del self.adapters[index]
return True
return False
def set_chatbot(self, chatbot):
"""
Set the chatbot for each of the contained logic adapters.
"""
super(MultiLogicAdapter, self).set_chatbot(chatbot)
for adapter in self.get_adapters():
adapter.set_chatbot(chatbot)

@ -0,0 +1,26 @@
from __future__ import unicode_literals
from .logic_adapter import LogicAdapter
class NoKnowledgeAdapter(LogicAdapter):
"""
This is a system adapter that is automatically added
to the list of logic adapters during initialization.
This adapter is placed at the beginning of the list
to be given the highest priority.
"""
def process(self, statement):
"""
If there are no known responses in the database,
then a confidence of 1 should be returned with
the input statement.
Otherwise, a confidence of 0 should be returned.
"""
if self.chatbot.storage.count():
statement.confidence = 0
else:
statement.confidence = 1
return statement

@ -0,0 +1,38 @@
from __future__ import unicode_literals
from .logic_adapter import LogicAdapter
class SpecificResponseAdapter(LogicAdapter):
"""
Return a specific response to a specific input.
:kwargs:
* *input_text* (``str``) --
The input text that triggers this logic adapter.
* *output_text* (``str``) --
The output text returned by this logic adapter.
"""
def __init__(self, **kwargs):
super(SpecificResponseAdapter, self).__init__(**kwargs)
from ..conversation import Statement
self.input_text = kwargs.get('input_text')
output_text = kwargs.get('output_text')
self.response_statement = Statement(output_text)
def can_process(self, statement):
if statement == self.input_text:
return True
return False
def process(self, statement):
if statement == self.input_text:
self.response_statement.confidence = 1
else:
self.response_statement.confidence = 0
return self.response_statement

@ -0,0 +1,91 @@
from __future__ import unicode_literals
from datetime import datetime
from .logic_adapter import LogicAdapter
class TimeLogicAdapter(LogicAdapter):
"""
The TimeLogicAdapter returns the current time.
:kwargs:
* *positive* (``list``) --
The time-related questions used to identify time questions.
Defaults to a list of English sentences.
* *negative* (``list``) --
The non-time-related questions used to identify time questions.
Defaults to a list of English sentences.
"""
def __init__(self, **kwargs):
super(TimeLogicAdapter, self).__init__(**kwargs)
from nltk import NaiveBayesClassifier
self.positive = kwargs.get('positive', [
'what time is it',
'hey what time is it',
'do you have the time',
'do you know the time',
'do you know what time it is',
'what is the time'
])
self.negative = kwargs.get('negative', [
'it is time to go to sleep',
'what is your favorite color',
'i had a great time',
'thyme is my favorite herb',
'do you have time to look at my essay',
'how do you have the time to do all this'
'what is it'
])
labeled_data = (
[(name, 0) for name in self.negative] +
[(name, 1) for name in self.positive]
)
train_set = [
(self.time_question_features(text), n) for (text, n) in labeled_data
]
self.classifier = NaiveBayesClassifier.train(train_set)
def time_question_features(self, text):
"""
Provide an analysis of significant features in the string.
"""
features = {}
# A list of all words from the known sentences
all_words = " ".join(self.positive + self.negative).split()
# A list of the first word in each of the known sentence
all_first_words = []
for sentence in self.positive + self.negative:
all_first_words.append(
sentence.split(' ', 1)[0]
)
for word in text.split():
features['first_word({})'.format(word)] = (word in all_first_words)
for word in text.split():
features['contains({})'.format(word)] = (word in all_words)
for letter in 'abcdefghijklmnopqrstuvwxyz':
features['count({})'.format(letter)] = text.lower().count(letter)
features['has({})'.format(letter)] = (letter in text.lower())
return features
def process(self, statement):
from ..conversation import Statement
now = datetime.now()
time_features = self.time_question_features(statement.text.lower())
confidence = self.classifier.classify(time_features)
response = Statement('The current time is ' + now.strftime('%I:%M %p'))
response.confidence = confidence
return response

@ -0,0 +1,15 @@
from .output_adapter import OutputAdapter
from .microsoft import Microsoft
from .terminal import TerminalAdapter
from .mailgun import Mailgun
from .gitter import Gitter
from .hipchat import HipChat
__all__ = (
'OutputAdapter',
'Microsoft',
'TerminalAdapter',
'Mailgun',
'Gitter',
'HipChat',
)

@ -0,0 +1,85 @@
from __future__ import unicode_literals
from .output_adapter import OutputAdapter
class Gitter(OutputAdapter):
"""
An output adapter that allows a ChatterBot instance to send
responses to a Gitter room.
"""
def __init__(self, **kwargs):
super(Gitter, self).__init__(**kwargs)
self.gitter_host = kwargs.get('gitter_host', 'https://api.gitter.im/v1/')
self.gitter_room = kwargs.get('gitter_room')
self.gitter_api_token = kwargs.get('gitter_api_token')
authorization_header = 'Bearer {}'.format(self.gitter_api_token)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json; charset=utf-8',
'Accept': 'application/json'
}
# Join the Gitter room
room_data = self.join_room(self.gitter_room)
self.room_id = room_data.get('id')
def _validate_status_code(self, response):
code = response.status_code
if code not in [200, 201]:
raise self.HTTPStatusException('{} status code recieved'.format(code))
def join_room(self, room_name):
"""
Join the specified Gitter room.
"""
import requests
endpoint = '{}rooms'.format(self.gitter_host)
response = requests.post(
endpoint,
headers=self.headers,
json={'uri': room_name}
)
self.logger.info('{} status joining room {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def send_message(self, text):
"""
Send a message to a Gitter room.
"""
import requests
endpoint = '{}rooms/{}/chatMessages'.format(self.gitter_host, self.room_id)
response = requests.post(
endpoint,
headers=self.headers,
json={'text': text}
)
self.logger.info('{} sending message to {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
return response.json()
def process_response(self, statement, session_id=None):
self.send_message(statement.text)
return statement
class HTTPStatusException(Exception):
"""
Exception raised when unexpected non-success HTTP
status codes are returned in a response.
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

@ -0,0 +1,67 @@
from __future__ import unicode_literals
import json
from .output_adapter import OutputAdapter
class HipChat(OutputAdapter):
"""
An output adapter that allows a ChatterBot instance to send
responses to a HipChat room.
"""
def __init__(self, **kwargs):
super(HipChat, self).__init__(**kwargs)
self.hipchat_host = kwargs.get("hipchat_host")
self.hipchat_access_token = kwargs.get("hipchat_access_token")
self.hipchat_room = kwargs.get("hipchat_room")
authorization_header = "Bearer {}".format(self.hipchat_access_token)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json'
}
import requests
self.session = requests.Session()
self.session.verify = kwargs.get('ssl_verify', True)
def send_message(self, room_id_or_name, message):
"""
Send a message to a HipChat room.
https://www.hipchat.com/docs/apiv2/method/send_message
"""
message_url = "{}/v2/room/{}/message".format(
self.hipchat_host,
room_id_or_name
)
response = self.session.post(
message_url,
headers=self.headers,
data=json.dumps({
'message': message
})
)
return response.json()
def reply_to_message(self):
"""
The HipChat api supports responding to a given message.
This may be a good feature to implement in the future to
help with multi-user conversations.
https://www.hipchat.com/docs/apiv2/method/reply_to_message
"""
raise self.AdapterMethodNotImplementedError()
def process_response(self, statement, session_id=None):
data = self.send_message(self.hipchat_room, statement.text)
# Update the output statement with the message id
self.chatbot.storage.update(
statement.add_extra_data('hipchat_message_id', data['id'])
)
return statement

@ -0,0 +1,49 @@
from __future__ import unicode_literals
from .output_adapter import OutputAdapter
class Mailgun(OutputAdapter):
def __init__(self, **kwargs):
super(Mailgun, self).__init__(**kwargs)
# Use the bot's name for the name of the sender
self.name = kwargs.get('name')
self.from_address = kwargs.get('mailgun_from_address')
self.api_key = kwargs.get('mailgun_api_key')
self.endpoint = kwargs.get('mailgun_api_endpoint')
self.recipients = kwargs.get('mailgun_recipients')
def send_message(self, subject, text, from_address, recipients):
"""
* subject: Subject of the email.
* text: Text body of the email.
* from_email: The email address that the message will be sent from.
* recipients: A list of recipient email addresses.
"""
import requests
return requests.post(
self.endpoint,
auth=('api', self.api_key),
data={
'from': '%s <%s>' % (self.name, from_address),
'to': recipients,
'subject': subject,
'text': text
})
def process_response(self, statement, session_id=None):
"""
Send the response statement as an email.
"""
subject = 'Message from %s' % (self.name)
self.send_message(
subject,
statement.text,
self.from_address,
self.recipients
)
return statement

@ -0,0 +1,109 @@
from __future__ import unicode_literals
import json
from .output_adapter import OutputAdapter
class Microsoft(OutputAdapter):
"""
An output adapter that allows a ChatterBot instance to send
responses to a Microsoft bot using *Direct Line client protocol*.
"""
def __init__(self, **kwargs):
super(Microsoft, self).__init__(**kwargs)
self.directline_host = kwargs.get(
'directline_host',
'https://directline.botframework.com'
)
self.direct_line_token_or_secret = kwargs.get(
'direct_line_token_or_secret'
)
self.conversation_id = kwargs.get('conversation_id')
authorization_header = 'BotConnector {}'.format(
self.direct_line_token_or_secret
)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json'
}
def _validate_status_code(self, response):
status_code = response.status_code
if status_code not in [200, 204]:
raise self.HTTPStatusException('{} status code recieved'.format(status_code))
def get_most_recent_message(self):
"""
Return the most recently sent message.
"""
import requests
endpoint = '{host}/api/conversations/{id}/messages'.format(
host=self.directline_host,
id=self.conversation_id
)
response = requests.get(
endpoint,
headers=self.headers,
verify=False
)
self.logger.info('{} retrieving most recent messages {}'.format(
response.status_code, endpoint
))
self._validate_status_code(response)
data = response.json()
if data['messages']:
last_msg = int(data['watermark'])
return data['messages'][last_msg - 1]
return None
def send_message(self, conversation_id, message):
"""
Send a message to a HipChat room.
https://www.hipchat.com/docs/apiv2/method/send_message
"""
import requests
message_url = "{host}/api/conversations/{conversationId}/messages".format(
host=self.directline_host,
conversationId=conversation_id
)
response = requests.post(
message_url,
headers=self.headers,
data=json.dumps({
'message': message
})
)
self.logger.info('{} sending message {}'.format(
response.status_code, message_url
))
self._validate_status_code(response)
# Microsoft return 204 on operation succeeded and no content was returned.
return self.get_most_recent_message()
def process_response(self, statement, session_id=None):
data = self.send_message(self.conversation_id, statement.text)
self.logger.info('processing user response {}'.format(data))
return statement
class HTTPStatusException(Exception):
"""
Exception raised when unexpected non-success HTTP
status codes are returned in a response.
"""
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save