Examples¶
qa
special
¶
worldknowledge
special
¶
domain
¶
WorldKnowledgeDomain (LookupDomain)
¶
Question answering for the world knowledge domain.
Attributes:
Name | Type | Description |
---|---|---|
- |
artificial_id_counter (int |
pseudo identifier for each entry |
- |
name_lex (Dict[str,str] |
lexicon for matching topic's names to their KG entity |
Source code in adviser/examples/qa/worldknowledge/domain.py
class WorldKnowledgeDomain(LookupDomain):
# Attribute docstrings currently not considered by mkdocstrings -> write plugin?
"""
Question answering for the world knowledge domain.
Attributes:
- artificial_id_counter (int): pseudo identifier for each entry
- name_lex (Dict[str,str]): lexicon for matching topic's names to their KG entity
"""
def __init__(self):
"""Calls super class' constructor and loads name lexicon"""
LookupDomain.__init__(self, 'CSQA', 'World Knowledge')
self.artificial_id_counter = 1 #int: lexicon for matching topic's names to their KG entity
self.name_lex = self._init_name_lexicon()
"""Dict[str,str]: lexicon for matching topic's names to their KG entity."""
def _init_name_lexicon(self):
with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'name_dict.json')) as f:
return json.load(f)
def _find_topic_entities(self, term):
entities = self.name_lex.get(term, [])
return [(ent['id'], ent['label']) for ent in entities]
def _perform_out_query(self, relation, topic):
url = 'https://query.wikidata.org/sparql'
query = """ SELECT ?item ?itemLabel
WHERE
{
wd:%s wdt:%s ?item.
?item rdfs:label ?itemLabel.
FILTER(LANG(?itemLabel) = 'en').
}""" % (topic, relation)
body = 'query=(%s)' % urllib.parse.quote(query)
body = {'query': query}
# print(body, data_points)
r = requests.post(url, params = {'format': 'json', 'content-type': 'application/sparql-query', 'user-agent': 'Python 3.6.8'}, data=body)
# print('Status', r.status_code)
data = r.json()
return [res['itemLabel']['value'] for res in data['results'] ['bindings']]
def _perform_in_query(self, relation, topic):
url = 'https://query.wikidata.org/sparql'
query = """ SELECT ?item ?itemLabel
WHERE
{
?item wdt:%s wd:%s.
?item rdfs:label ?itemLabel.
FILTER(LANG(?itemLabel) = 'en').
}""" % (relation, topic)
body = 'query=(%s)' % urllib.parse.quote(query)
body = {'query': query}
r = requests.post(url, params = {
'format': 'json',
'content-type': 'application/sparql-query',
'user-agent': 'Python 3.6.8'
}, data=body)
data = r.json()
return [res['itemLabel']['value'] for res in data['results'] ['bindings']]
def find_entities(self, constraints: dict):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): slot-value mapping of constraints
"""
assert 'relation' in constraints
assert 'topic' in constraints
assert 'direction' in constraints
topics = self._find_topic_entities(constraints['topic'])
if not topics:
return []
answers = []
for topic_id, topic_label in topics:
answer_ids = []
if constraints['direction'] == 'out':
answer_ids = self._perform_out_query(constraints['relation'], topic_id)
for answer_id in answer_ids:
answers.append({
'subject': topic_label,
'predicate': constraints['relation'],
'object': answer_id
})
self.artificial_id_counter += 1
else:
answer_ids = self._perform_in_query(constraints['relation'], topic_id)
for answer_id in answer_ids:
answers.append({
'subject': answer_id,
'predicate': constraints['relation'],
'object': topic_label
})
self.artificial_id_counter += 1
return answers
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
raise BaseException('should not be called')
def get_domain_name(self):
return "qa"
def get_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the user. """
return ['subject', 'predicate', 'object', 'object_type']
def get_system_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the system. """
return ['relation', 'topic', 'direction']
def get_informable_slots(self) -> List[str]:
""" Returns a list of all informable slots. """
return ['relation', 'topic', 'direction']
def get_mandatory_slots(self) -> List[str]:
""" Returns a list of all mandatory slots. """
return ['relation', 'topic', 'direction']
def get_possible_values(self, slot: str) -> List[str]:
""" Returns all possible values for an informable slot
Args:
slot (str): name of the slot
Returns:
a list of strings, each string representing one possible value for
the specified slot.
"""
# 'assert False, "this method should not be called"'
raise BaseException('should not be called')
def get_primary_key(self) -> str:
""" Returns the slot name that will be used as the 'name' of an entry """
return 'artificial_id'
def get_keyword(self):
return 'world knowledge'
__init__(self)
special
¶
Calls super class' constructor and loads name lexicon
Source code in adviser/examples/qa/worldknowledge/domain.py
def __init__(self):
"""Calls super class' constructor and loads name lexicon"""
LookupDomain.__init__(self, 'CSQA', 'World Knowledge')
self.artificial_id_counter = 1 #int: lexicon for matching topic's names to their KG entity
self.name_lex = self._init_name_lexicon()
"""Dict[str,str]: lexicon for matching topic's names to their KG entity."""
find_entities(self, constraints)
¶
Returns all entities from the data backend that meet the constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
constraints |
dict |
slot-value mapping of constraints |
required |
Source code in adviser/examples/qa/worldknowledge/domain.py
def find_entities(self, constraints: dict):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): slot-value mapping of constraints
"""
assert 'relation' in constraints
assert 'topic' in constraints
assert 'direction' in constraints
topics = self._find_topic_entities(constraints['topic'])
if not topics:
return []
answers = []
for topic_id, topic_label in topics:
answer_ids = []
if constraints['direction'] == 'out':
answer_ids = self._perform_out_query(constraints['relation'], topic_id)
for answer_id in answer_ids:
answers.append({
'subject': topic_label,
'predicate': constraints['relation'],
'object': answer_id
})
self.artificial_id_counter += 1
else:
answer_ids = self._perform_in_query(constraints['relation'], topic_id)
for answer_id in answer_ids:
answers.append({
'subject': answer_id,
'predicate': constraints['relation'],
'object': topic_label
})
self.artificial_id_counter += 1
return answers
find_info_about_entity(self, entity_id, requested_slots)
¶
Returns the values (stored in the data backend) of the specified slots for the specified entity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_id |
str |
primary key value of the entity |
required |
requested_slots |
dict |
slot-value mapping of constraints |
required |
Source code in adviser/examples/qa/worldknowledge/domain.py
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
raise BaseException('should not be called')
get_domain_name(self)
¶
get_informable_slots(self)
¶
get_keyword(self)
¶
get_mandatory_slots(self)
¶
get_possible_values(self, slot)
¶
Returns all possible values for an informable slot
Parameters:
Name | Type | Description | Default |
---|---|---|---|
slot |
str |
name of the slot |
required |
Returns:
Type | Description |
---|---|
List[str] |
a list of strings, each string representing one possible value for the specified slot. |
Source code in adviser/examples/qa/worldknowledge/domain.py
def get_possible_values(self, slot: str) -> List[str]:
""" Returns all possible values for an informable slot
Args:
slot (str): name of the slot
Returns:
a list of strings, each string representing one possible value for
the specified slot.
"""
# 'assert False, "this method should not be called"'
raise BaseException('should not be called')
get_primary_key(self)
¶
get_requestable_slots(self)
¶
get_system_requestable_slots(self)
¶
get_root_dir()
¶
multinlg
¶
Handcrafted (i.e. template-based) Natural Language Generation Module
MultiNLG (HandcraftedNLG)
¶
Extension of the handcrafted NLG by allowing multiple system acts.
This change is necessary for QA, since the policy publishes multiple system acts.
Source code in adviser/examples/qa/worldknowledge/multinlg.py
class MultiNLG(HandcraftedNLG):
"""Extension of the handcrafted NLG by allowing multiple system acts.
This change is necessary for QA, since the policy publishes multiple system acts.
"""
def __init__(self, domain: Domain, template_file: str = None, sub_topic_domains: Dict[str, str] = {},
logger: DiasysLogger = DiasysLogger(), template_file_german: str = None,
language: Language = None):
# only calls the super class' constructor
HandcraftedNLG.__init__(self, domain, template_file, sub_topic_domains, logger, template_file_german, language)
@PublishSubscribe(sub_topics=["sys_acts"], pub_topics=["sys_utterance"])
def publish_system_utterance(self, sys_acts: List[SysAct] = None) -> dict(sys_utterance=str):
"""Generates the system utterance and publishes it.
Instead of the HandcraftedNLG class, this class takes multiple system acts and
applies the templates for each system act separately. Each message is then printed in a
new line.
Args:
sys_acts: The list of system acts as published by the (QA) policy
Returns:
a dict containing the system utterance
"""
message = '\n'.join([self.generate_system_utterance(sys_act) for sys_act in sys_acts])
return {'sys_utterance': message}
__init__(self, domain, template_file=None, sub_topic_domains={}, logger=<DiasysLogger adviser (NOTSET)>, template_file_german=None, language=None)
special
¶
Source code in adviser/examples/qa/worldknowledge/multinlg.py
def __init__(self, domain: Domain, template_file: str = None, sub_topic_domains: Dict[str, str] = {},
logger: DiasysLogger = DiasysLogger(), template_file_german: str = None,
language: Language = None):
# only calls the super class' constructor
HandcraftedNLG.__init__(self, domain, template_file, sub_topic_domains, logger, template_file_german, language)
publish_system_utterance(self, *args, **kwargs)
¶
Source code in adviser/examples/qa/worldknowledge/multinlg.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result
neuralmodels
special
¶
director
¶
Classifier (Module)
¶
Neural network for predicting the relation's direction (outgoing or incoming).
The model uses a question encoder to classify the question as one of the two classes "outgoing" or "incoming".
Attributes:
Name | Type | Description |
---|---|---|
hidden_dim |
int |
Size of the Bi_LSTM's hidden layer |
out_dim |
int |
Size of the output layer (here: 2) |
diminisher |
nn.Module |
Fine-tuning embedding layer, good for reducing Bi-LSTM's size |
lstm |
nn.Module |
Bi-LSTM for encoding a question |
hidden2tag |
nn.Module |
Output layer |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
class Classifier(nn.Module):
"""Neural network for predicting the relation's direction (outgoing or incoming).
The model uses a question encoder to classify the question as one of the two classes
"outgoing" or "incoming".
Attributes:
hidden_dim (int): Size of the Bi_LSTM's hidden layer
out_dim (int): Size of the output layer (here: 2)
diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTM's size
lstm (nn.Module): Bi-LSTM for encoding a question
hidden2tag (nn.Module): Output layer
"""
def __init__(self, emb_dim: int, lstm_out_dim: int, num_classes: int):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
lstm_out_dim: Output size of the Bi-LSTM
num_classes: Size of the output layer (in this context, always 2)
"""
super(Classifier, self).__init__()
self.hidden_dim = lstm_out_dim
self.out_dim = num_classes
self.diminisher = nn.Linear(768, emb_dim)
self.lstm = nn.LSTM(emb_dim, lstm_out_dim, bidirectional=True)
self.hidden2tag = nn.Linear(lstm_out_dim*2, self.out_dim)
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities of the two classes "incoming" and "outgoing"
"""
embeds = self.diminisher(embeds)
lstm_out, _ = self.lstm(embeds)
tag_space = self.hidden2tag(lstm_out[0])
tag_scores = F.log_softmax(tag_space, dim=1)
return tag_scores
__init__(self, emb_dim, lstm_out_dim, num_classes)
special
¶Initialises all required elements of the neural network.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emb_dim |
int |
Output size of the fine-tuning embedding layer |
required |
lstm_out_dim |
int |
Output size of the Bi-LSTM |
required |
num_classes |
int |
Size of the output layer (in this context, always 2) |
required |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
def __init__(self, emb_dim: int, lstm_out_dim: int, num_classes: int):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
lstm_out_dim: Output size of the Bi-LSTM
num_classes: Size of the output layer (in this context, always 2)
"""
super(Classifier, self).__init__()
self.hidden_dim = lstm_out_dim
self.out_dim = num_classes
self.diminisher = nn.Linear(768, emb_dim)
self.lstm = nn.LSTM(emb_dim, lstm_out_dim, bidirectional=True)
self.hidden2tag = nn.Linear(lstm_out_dim*2, self.out_dim)
forward(self, embeds)
¶Application of the neural network on a given input question.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embeds |
Tensor |
Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size| |
required |
Returns:
Type | Description |
---|---|
Tensor |
Probabilities of the two classes "incoming" and "outgoing" |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities of the two classes "incoming" and "outgoing"
"""
embeds = self.diminisher(embeds)
lstm_out, _ = self.lstm(embeds)
tag_space = self.hidden2tag(lstm_out[0])
tag_scores = F.log_softmax(tag_space, dim=1)
return tag_scores
simpledot
¶
SimpleDot (Module)
¶
Neural network for predicting the relation of a question.
The simple dot approach compares a question with each possible relation candidate by taking the (simple) dot product between the encoded question and every encoded relation.
Attributes:
Name | Type | Description |
---|---|---|
softmax |
bool |
whether or not the scores should be converted to probabilities |
hidden |
int |
size of the hidden layer |
relations_tensor |
torch.autograd.Variable |
embeddings for all relation descriptions |
diminisher |
nn.Module |
Fine-tuning embedding layer, good for reducing Bi-LSTMs' size |
lstm_question |
nn.Module |
Bi-LSTM for encoding a question |
lstm_relation |
nn.Module |
Bi-LSTM for encoding a relation |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
class SimpleDot(nn.Module):
"""Neural network for predicting the relation of a question.
The simple dot approach compares a question with each possible relation candidate by
taking the (simple) dot product between the encoded question and every encoded relation.
Attributes:
softmax (bool): whether or not the scores should be converted to probabilities
hidden (int): size of the hidden layer
relations_tensor (torch.autograd.Variable): embeddings for all relation descriptions
diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTMs' size
lstm_question (nn.Module): Bi-LSTM for encoding a question
lstm_relation (nn.Module): Bi-LSTM for encoding a relation
"""
def __init__(self, emb_dim: int, hidden_dim: int, softmax: bool = True):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
hidden_dim: Output size of the Bi-LSTM
softmax: Whether or not a softmax is applied on the output layer
"""
super(SimpleDot, self).__init__()
self.softmax = softmax
self.hidden = hidden_dim
self.relations_tensor = self._initialise_relations_tensor()
self.diminisher = nn.Linear(768, emb_dim)
self.lstm_question = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
self.lstm_relation = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
def _initialise_relations_tensor(self, max_rel_len: int = 30) -> torch.autograd.Variable:
"""Creates a tensor containing word embeddings of all relation descriptions.
To be processable, the tensor is transformed to the shape
|token| x |relation| x |embeddings|.
Keyword Arguments:
max_rel_len: maximum number of tokens in the relation descriptions
Returns:
A tensor containing word embeddings of all relation descriptions
"""
relations_list = []
tokens = []
with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'csqa_relation_embeddings.bin'), 'rb') as f:
rels = pickle.load(f)
relations_list = [emb[1] for emb in rels]
tokens = [emb[0] for emb in rels]
relations = [] # R x T_R x E
for relation in relations_list:
relation.extend([np.zeros(768, dtype='float32')] * (max_rel_len- len(relation)))
relations.append(relation[:max_rel_len])
# T_R x R x E
return torch.autograd.Variable(torch.Tensor(relations).transpose(0,1), requires_grad=False)
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities for the relation classes
"""
embeds = self.diminisher(embeds)
relations_embeds = self.diminisher(self.relations_tensor)
question_out, _ = self.lstm_question(embeds) # T_Q x B x H
relation_out, _ = self.lstm_relation(relations_embeds) # T_R x R x H
last_question_out = question_out[0][:,self.hidden:] # B x H
last_relation_out = relation_out[0][:,self.hidden:] # R x H
# relation prediction
rel_scores = torch.matmul(last_question_out, last_relation_out.transpose(0,1))
if self.softmax:
rel_scores = F.log_softmax(rel_scores, dim=1)
return rel_scores
__init__(self, emb_dim, hidden_dim, softmax=True)
special
¶Initialises all required elements of the neural network.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emb_dim |
int |
Output size of the fine-tuning embedding layer |
required |
hidden_dim |
int |
Output size of the Bi-LSTM |
required |
softmax |
bool |
Whether or not a softmax is applied on the output layer |
True |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
def __init__(self, emb_dim: int, hidden_dim: int, softmax: bool = True):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
hidden_dim: Output size of the Bi-LSTM
softmax: Whether or not a softmax is applied on the output layer
"""
super(SimpleDot, self).__init__()
self.softmax = softmax
self.hidden = hidden_dim
self.relations_tensor = self._initialise_relations_tensor()
self.diminisher = nn.Linear(768, emb_dim)
self.lstm_question = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
self.lstm_relation = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
forward(self, embeds)
¶Application of the neural network on a given input question.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embeds |
Tensor |
Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size| |
required |
Returns:
Type | Description |
---|---|
Tensor |
Probabilities for the relation classes |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities for the relation classes
"""
embeds = self.diminisher(embeds)
relations_embeds = self.diminisher(self.relations_tensor)
question_out, _ = self.lstm_question(embeds) # T_Q x B x H
relation_out, _ = self.lstm_relation(relations_embeds) # T_R x R x H
last_question_out = question_out[0][:,self.hidden:] # B x H
last_relation_out = relation_out[0][:,self.hidden:] # R x H
# relation prediction
rel_scores = torch.matmul(last_question_out, last_relation_out.transpose(0,1))
if self.softmax:
rel_scores = F.log_softmax(rel_scores, dim=1)
return rel_scores
get_root_dir()
¶
tagger
¶
TAGS
¶
Tagger (Module)
¶
Neural network for predicting the topic entities.
The model uses a question encoder and classifies each token using the BIO tag set.
Attributes:
Name | Type | Description |
---|---|---|
hidden_dim |
int |
Size of the Bi_LSTM's hidden layer |
diminisher |
nn.Module |
Fine-tuning embedding layer, good for reducing Bi-LSTM's size |
lstm |
nn.Module |
Bi-LSTM |
hidden2label |
nn.Module |
Output layer |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
class Tagger(nn.Module):
"""Neural network for predicting the topic entities.
The model uses a question encoder and classifies each token using the BIO tag set.
Attributes:
hidden_dim (int): Size of the Bi_LSTM's hidden layer
diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTM's size
lstm (nn.Module): Bi-LSTM
hidden2label (nn.Module): Output layer
"""
def __init__(self, emb_dim: int, hidden_dim: int):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
hidden_dim: Hidden layer size of the Bi-LSTM
"""
super(Tagger, self).__init__()
self.hidden_dim = hidden_dim
self.diminisher = nn.Linear(768, emb_dim)
self.lstm = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
self.hidden2label = nn.Linear(hidden_dim*2, len(TAGS))
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities of each BIO tag for all tokens
"""
embeds = self.diminisher(embeds)
lstm_out, _ = self.lstm(embeds)
label_space = self.hidden2label(lstm_out[1:])
label_scores = F.log_softmax(label_space, dim=2)
return label_scores
__init__(self, emb_dim, hidden_dim)
special
¶Initialises all required elements of the neural network.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
emb_dim |
int |
Output size of the fine-tuning embedding layer |
required |
hidden_dim |
int |
Hidden layer size of the Bi-LSTM |
required |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def __init__(self, emb_dim: int, hidden_dim: int):
"""Initialises all required elements of the neural network.
Args:
emb_dim: Output size of the fine-tuning embedding layer
hidden_dim: Hidden layer size of the Bi-LSTM
"""
super(Tagger, self).__init__()
self.hidden_dim = hidden_dim
self.diminisher = nn.Linear(768, emb_dim)
self.lstm = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
self.hidden2label = nn.Linear(hidden_dim*2, len(TAGS))
forward(self, embeds)
¶Application of the neural network on a given input question.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
embeds |
Tensor |
Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size| |
required |
Returns:
Type | Description |
---|---|
Tensor |
Probabilities of each BIO tag for all tokens |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
"""Application of the neural network on a given input question.
Args:
embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|
Returns:
Probabilities of each BIO tag for all tokens
"""
embeds = self.diminisher(embeds)
lstm_out, _ = self.lstm(embeds)
label_space = self.hidden2label(lstm_out[1:])
label_scores = F.log_softmax(label_space, dim=2)
return label_scores
extract_entities(tokens, tag_idxs)
¶
Extracts entities using the predicted BIO tags for each token
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tokens |
List[str] |
question's tokens |
required |
tag_idxs |
List[int] |
index of the BIO tag for each token in the question |
required |
Returns:
Type | Description |
---|---|
List[List[str]] |
List of entities, i.e. list of connected tokens |
Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def extract_entities(tokens: List[str], tag_idxs: List[int]) -> List[List[str]]:
"""Extracts entities using the predicted BIO tags for each token
Arguments:
tokens: question's tokens
tag_idxs: index of the BIO tag for each token in the question
Returns:
List of entities, i.e. list of connected tokens
"""
entities = []
curr_entity = []
tags = [TAGS[tag_idx] for tag_idx in tag_idxs]
for i, (token, tag) in enumerate(zip(tokens, tags)):
if tag == 'I':
curr_entity.append(token)
continue
else:
if curr_entity:
entities.append(curr_entity)
curr_entity = []
if tag == 'B':
curr_entity.append(token)
if curr_entity:
entities.append(curr_entity)
return entities
policyqa
¶
QaPolicy (Service)
¶
Policy module for question answering.
Provides a simple rule-based policy for question answering. The QA module assumes that the user acts contain information about relation, topic entities and relation direction. Adequate answers are looked up in the knowledge and published.
The difference to the default HandcraftedPolicy is that no BST is needed and that multiple system acts can be published.
Source code in adviser/examples/qa/worldknowledge/policyqa.py
class QaPolicy(Service):
"""Policy module for question answering.
Provides a simple rule-based policy for question answering.
The QA module assumes that the user acts contain information about relation, topic entities
and relation direction.
Adequate answers are looked up in the knowledge and published.
The difference to the default HandcraftedPolicy is that no BST is needed and that multiple
system acts can be published.
"""
def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
# only call super class' constructor
Service.__init__(self, domain=domain, debug_logger=logger)
@PublishSubscribe(sub_topics=["user_acts"], pub_topics=["sys_acts"])
def generate_sys_acts(self, user_acts: List[UserAct] = None) -> dict(sys_acts=List[SysAct]):
"""Generates system acts by looking up answers to the given user question.
Args:
user_acts: The list of user acts containing information about the predicted relation,
topic entities and relation direction
Returns:
dict with 'sys_acts' as key and list of system acts as value
"""
if user_acts is None:
return { 'sys_acts': [SysAct(SysActionType.Welcome)]}
elif any([user_act.type == UserActionType.Bye for user_act in user_acts]):
return { 'sys_acts': [SysAct(SysActionType.Bye)] }
elif not user_acts:
return { 'sys_acts': [SysAct(SysActionType.Bad)] }
user_acts = [user_act for user_act in user_acts if user_act.type != UserActionType.SelectDomain]
if len(user_acts) == 0:
return { 'sys_acts': [SysAct(SysActionType.Welcome)]}
relation = [user_act.value for user_act in user_acts \
if user_act.type == UserActionType.Inform and user_act.slot == 'relation'][0]
topics = [user_act.value for user_act in user_acts \
if user_act.type == UserActionType.Inform and user_act.slot == 'topic']
direction = [user_act.value for user_act in user_acts \
if user_act.type == UserActionType.Inform and user_act.slot == 'direction'][0]
if not topics:
return { 'sys_acts': [SysAct(SysActionType.Bad)] }
# currently, short answers are used for world knowledge
answers = self._get_short_answers(relation, topics, direction)
sys_acts = [SysAct(SysActionType.InformByName, slot_values=answer) for answer in answers]
self.debug_logger.dialog_turn("System Action: " + '; '.join(
[str(sys_act) for sys_act in sys_acts]))
return {'sys_acts': sys_acts}
def _get_short_answers(self, relation: str, topics: List[str], direction: str) \
-> dict(answer=str):
"""Looks up answers and only returns the answer string"""
answers = []
for topic in topics:
triples = self.domain.find_entities({
'relation': relation,
'topic': topic,
'direction': direction
})
for triple in triples:
if direction == 'in':
answers.append({'answer': triple['subject']})
else:
answers.append({'answer': triple['object']})
if not answers:
answers.append({'answer': 'Sorry, I don\'t know.'})
return answers
def _get_triples(self, relation, topics, direction):
"""Looks up answers and stores them as triples"""
answers = []
for topic in topics:
answers.extend(self.domain.find_entities({
'relation': relation,
'topic': topic,
'direction': direction
}))
if not answers:
for topic in topics:
answers.append({
'subject': 'unknown' if direction == 'in' else topic,
'predicate': relation,
'object': 'unknown' if direction == 'out' else topic
})
return answers
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>)
special
¶
generate_sys_acts(self, *args, **kwargs)
¶
Source code in adviser/examples/qa/worldknowledge/policyqa.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result
semanticparser
¶
QuestionParser (Service)
¶
Semantic parsing module for question answering
Attributes:
Name | Type | Description |
---|---|---|
device |
torch.device |
PyTorch device object, either CPU or GPU |
nn_relation |
nn.Module |
neural network for relation prediction |
nn_entity |
nn.Module |
neural network for topic entity prediction |
nn_direction |
nn.Module |
neural network for relation direction prediction |
tags |
List[str] |
relation tags |
max_seq_len |
int |
maximum number of tokens per question |
embedding_creator |
BertEmbedding |
object creating BERT embeddings |
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
class QuestionParser(Service):
"""Semantic parsing module for question answering
Attributes:
device (torch.device): PyTorch device object, either CPU or GPU
nn_relation (nn.Module): neural network for relation prediction
nn_entity (nn.Module): neural network for topic entity prediction
nn_direction (nn.Module): neural network for relation direction prediction
tags (List[str]): relation tags
max_seq_len (int): maximum number of tokens per question
embedding_creator (BertEmbedding): object creating BERT embeddings
"""
def __init__(self, domain: LookupDomain, \
logger: DiasysLogger = DiasysLogger(), device: str = 'cpu', cache_dir: str = None):
"""Creates neural networks for semantic parsing and other required utils
Args:
domain: the QA domain
logger: the logger
device: PyTorch device name
cache_dir: the cache directory for transformers' models
"""
Service.__init__(self, domain=domain, debug_logger=logger)
self.device = torch.device(device)
self.nn_relation = self._load_relation_model()
self.nn_entity = self._load_entity_model()
self.nn_direction = self._load_direction_model()
self.tags = self._load_tag_set()
self.max_seq_len = 40
self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
self.embedder = BertModel.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
def _load_relation_model(self):
model = SimpleDot(100, 400, True).to(self.device)
model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'simpledot.pt'), map_location=self.device))
model.eval()
return model
def _load_entity_model(self):
model = Tagger(100, 400)
model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'tagger.pt'), map_location=self.device))
model.eval()
return model
def _load_direction_model(self):
model = Classifier(75, 500, 2).to(self.device)
model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'director.pt'), map_location=self.device))
model.eval()
return model
def _load_tag_set(self):
csqa_tags = []
with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'csqa_tags.json')) as f:
csqa_tags = json.load(f)
return csqa_tags
@PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
def parse_user_utterance(self, user_utterance: str = None) -> dict(user_acts=List[UserAct]):
"""Parses the user utterance.
Responsible for detecting user acts with their respective slot-values from the user
utterance by predicting relation, topic entities and the relation's direction.
Args:
user_utterance: the last user input as string
Returns:
A dictionary with the key "user_acts" and the value containing a list of user actions
"""
result = {}
self.user_acts = []
user_utterance = user_utterance.strip()
if not user_utterance:
return {'user_acts': None}
elif user_utterance.lower().replace(' ', '').endswith('bye'):
return {'user_acts': [UserAct(user_utterance, UserActionType.Bye)]}
if self.domain.get_keyword() in user_utterance.lower():
self.user_acts.append(UserAct(user_utterance, UserActionType.SelectDomain))
begin_idx = user_utterance.lower().find(self.domain.get_keyword())
user_utterance = user_utterance.lower().replace(self.domain.get_keyword(), "")
if len(user_utterance) == 0:
return {'user_acts': self.user_acts}
tokens, embeddings = self._preprocess_utterance(user_utterance)
relation_out = self._predict_relation(embeddings)
entities_out = self._predict_topic_entities(embeddings)
direction_out = self._predict_direction(embeddings)
relation_pred = self._lookup_relation(relation_out)
entities_pred = extract_entities(tokens, entities_out[:,0])
direction_pred = self._lookup_direction(direction_out)
self.user_acts.extend([
UserAct(user_utterance, UserActionType.Inform, 'relation', relation_pred, 1.0),
UserAct(user_utterance, UserActionType.Inform, 'direction', direction_pred, 1.0)
])
for t in entities_pred:
self.user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'topic', self.tokenizer.convert_tokens_to_string(t), 1.0))
result['user_acts'] = self.user_acts
self.debug_logger.dialog_turn("User Actions: %s" % str(self.user_acts))
return result
def _preprocess_utterance(self, utterance):
encoded_input = self.tokenizer(utterance, return_tensors='pt', truncation=True, max_length=self.max_seq_len)
with torch.no_grad():
embeddings: torch.Tensor = self.embedder(**encoded_input).last_hidden_state
embeddings = torch.cat((embeddings, embeddings.new_zeros(1,self.max_seq_len - embeddings.size(1),768)), dim=1)
return self.tokenizer.tokenize(utterance, add_special_tokens=False), embeddings.permute(1,0,2)
def _predict_relation(self, embeddings):
with torch.no_grad():
rel_scores = self.nn_relation(embeddings)
_, pred_rel = torch.max(rel_scores, 1)
return pred_rel
def _lookup_relation(self, prediction):
return self.tags[int(prediction[0])]
def _predict_topic_entities(self, embeddings):
with torch.no_grad():
ent_scores = self.nn_entity(embeddings)
_, preds_ent = torch.max(ent_scores, 2)
return preds_ent
def _predict_direction(self, embeddings):
with torch.no_grad():
tag_scores = self.nn_direction(embeddings)
_, preds = torch.max(tag_scores, 1)
return preds
def _lookup_direction(self, prediction):
return ['out', 'in'][int(prediction[0])]
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>, device='cpu', cache_dir=None)
special
¶
Creates neural networks for semantic parsing and other required utils
Parameters:
Name | Type | Description | Default |
---|---|---|---|
domain |
LookupDomain |
the QA domain |
required |
logger |
DiasysLogger |
the logger |
<DiasysLogger adviser (NOTSET)> |
device |
str |
PyTorch device name |
'cpu' |
cache_dir |
str |
the cache directory for transformers' models |
None |
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
def __init__(self, domain: LookupDomain, \
logger: DiasysLogger = DiasysLogger(), device: str = 'cpu', cache_dir: str = None):
"""Creates neural networks for semantic parsing and other required utils
Args:
domain: the QA domain
logger: the logger
device: PyTorch device name
cache_dir: the cache directory for transformers' models
"""
Service.__init__(self, domain=domain, debug_logger=logger)
self.device = torch.device(device)
self.nn_relation = self._load_relation_model()
self.nn_entity = self._load_entity_model()
self.nn_direction = self._load_direction_model()
self.tags = self._load_tag_set()
self.max_seq_len = 40
self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
self.embedder = BertModel.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
parse_user_utterance(self, *args, **kwargs)
¶
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result
get_root_dir()
¶
webapi
special
¶
mensa
special
¶
domain
¶
SLOT_VALUES
¶
MensaDomain (LookupDomain)
¶
Domain for the Mensa API
Attributes:
Name | Type | Description |
---|---|---|
parser |
MensaParser |
HTML file parser for dynamically building a pseudo database |
last_results |
List[dict] |
Current results which the user might request info about |
Source code in adviser/examples/webapi/mensa/domain.py
class MensaDomain(LookupDomain):
"""Domain for the Mensa API
Attributes:
parser (MensaParser): HTML file parser for dynamically building a pseudo database
last_results (List[dict]): Current results which the user might request info about
"""
def __init__(self):
LookupDomain.__init__(self, 'MensaAPI', 'Mensa Food')
self.parser = MensaParser()
self.last_results = []
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): Slot-value mapping of constraints.
If empty, all entities in the database will be returned.
requested_slots (Iterable): list of slots that should be returned in addition to the
system requestable slots and the primary key
"""
if 'day' in constraints:
meals = self.parser.get_meals(constraints['day'])
results = [meal.as_dict() for meal in meals]
for slot in constraints:
if slot == 'day':
continue
results = [candidate for candidate in results if candidate[slot] == constraints[slot]]
for i, result in enumerate(results):
result['artificial_id'] = i+1
if list(requested_slots):
cleaned_results = [{slot: result_dict[slot] for slot in requested_slots} for result_dict in results]
else:
cleaned_results = results
self.last_results = results
return cleaned_results
else:
return []
def find_info_about_entity(self, entity_id: str, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
result = {slot: self.last_results[int(entity_id)-1][slot] for slot in requested_slots}
result['artificial_id'] = entity_id
return [result]
def get_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the user. """
return ['name', 'type', 'price', 'allergens', 'vegan', 'vegetarian', 'fish', 'pork']
def get_system_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the system. """
return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']
def get_informable_slots(self) -> List[str]:
""" Returns a list of all informable slots. """
return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']
def get_mandatory_slots(self) -> List[str]:
""" Returns a list of all mandatory slots. """
return ['day']
def get_default_inform_slots(self) -> List[str]:
""" Returns a list of all default Inform slots. """
return ['name']
def get_possible_values(self, slot: str) -> List[str]:
""" Returns all possible values for an informable slot
Args:
slot (str): name of the slot
Returns:
a list of strings, each string representing one possible value for
the specified slot.
"""
assert slot in SLOT_VALUES
return SLOT_VALUES[slot]
def get_primary_key(self) -> str:
""" Returns the slot name that will be used as the 'name' of an entry """
return 'artificial_id'
def get_keyword(self):
return 'mensa'
__init__(self)
special
¶
find_entities(self, constraints, requested_slots=<tuple_iterator object at 0x7f1ef2fb4580>)
¶
Returns all entities from the data backend that meet the constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
constraints |
dict |
Slot-value mapping of constraints. If empty, all entities in the database will be returned. |
required |
requested_slots |
Iterable |
list of slots that should be returned in addition to the system requestable slots and the primary key |
<tuple_iterator object at 0x7f1ef2fb4580> |
Source code in adviser/examples/webapi/mensa/domain.py
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): Slot-value mapping of constraints.
If empty, all entities in the database will be returned.
requested_slots (Iterable): list of slots that should be returned in addition to the
system requestable slots and the primary key
"""
if 'day' in constraints:
meals = self.parser.get_meals(constraints['day'])
results = [meal.as_dict() for meal in meals]
for slot in constraints:
if slot == 'day':
continue
results = [candidate for candidate in results if candidate[slot] == constraints[slot]]
for i, result in enumerate(results):
result['artificial_id'] = i+1
if list(requested_slots):
cleaned_results = [{slot: result_dict[slot] for slot in requested_slots} for result_dict in results]
else:
cleaned_results = results
self.last_results = results
return cleaned_results
else:
return []
find_info_about_entity(self, entity_id, requested_slots)
¶
Returns the values (stored in the data backend) of the specified slots for the specified entity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_id |
str |
primary key value of the entity |
required |
requested_slots |
dict |
slot-value mapping of constraints |
required |
Source code in adviser/examples/webapi/mensa/domain.py
def find_info_about_entity(self, entity_id: str, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
result = {slot: self.last_results[int(entity_id)-1][slot] for slot in requested_slots}
result['artificial_id'] = entity_id
return [result]
get_default_inform_slots(self)
¶
get_informable_slots(self)
¶
get_keyword(self)
¶
get_mandatory_slots(self)
¶
get_possible_values(self, slot)
¶
Returns all possible values for an informable slot
Parameters:
Name | Type | Description | Default |
---|---|---|---|
slot |
str |
name of the slot |
required |
Returns:
Type | Description |
---|---|
List[str] |
a list of strings, each string representing one possible value for the specified slot. |
Source code in adviser/examples/webapi/mensa/domain.py
get_primary_key(self)
¶
get_requestable_slots(self)
¶
get_system_requestable_slots(self)
¶
nlu
¶
MensaNLU (HandcraftedNLU)
¶
Adapted handcrafted NLU for the mensa domain.
The default handcrafted NLU is adapted to automatically add the user act request(name). This is necessary because the name is not the primary key, i.e. it is not printed by default once an element is found. To force the Policy to automatically inform about the name, too, a request for the name is added in each turn.
Source code in adviser/examples/webapi/mensa/nlu.py
class MensaNLU(HandcraftedNLU):
"""Adapted handcrafted NLU for the mensa domain.
The default handcrafted NLU is adapted to automatically add the user act request(name).
This is necessary because the name is not the primary key, i.e. it is not printed by default
once an element is found. To force the Policy to automatically inform about the name, too,
a request for the name is added in each turn.
"""
def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
# only calls super class' constructor
HandcraftedNLU.__init__(self, domain, logger)
@PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
def extract_user_acts(self, user_utterance: str = None, sys_act: SysAct = None, beliefstate: BeliefState = None) \
-> dict(user_acts=List[UserAct]):
"""Original code but adapted to automatically add a request(name) act"""
result = {}
# Setting request everything to False at every turn
self.req_everything = False
self.user_acts = []
# slots_requested & slots_informed store slots requested and informed in this turn
# they are used later for later disambiguation
self.slots_requested, self.slots_informed = set(), set()
if user_utterance is not None:
user_utterance = user_utterance.strip()
self._match_general_act(user_utterance)
self._match_domain_specific_act(user_utterance)
# Solving ambiguities from regexes, especially with requests and informs happening
# simultaneously on the same slot and two slots taking the same value
self._disambiguate_co_occurrence(beliefstate)
self._solve_informable_values()
# If nothing else has been matched, see if the user chose a domain; otherwise if it's
# not the first turn, it's a bad act
if len(self.user_acts) == 0:
if self.domain.get_keyword() in user_utterance:
self.user_acts.append(UserAct(text=user_utterance if user_utterance else "",
act_type=UserActionType.SelectDomain))
elif self.sys_act_info['last_act'] is not None:
# start of dialogue or no regex matched
self.user_acts.append(UserAct(text=user_utterance if user_utterance else "",
act_type=UserActionType.Bad))
self._assign_scores()
result['user_acts'] = self.user_acts
self.logger.dialog_turn("User Actions: %s" % str(self.user_acts))
return result
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>)
special
¶
extract_user_acts(self, *args, **kwargs)
¶
Source code in adviser/examples/webapi/mensa/nlu.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result
parser
¶
Allergen (Enum)
¶
This enum provides the allergens used in the mensa menu.
Source code in adviser/examples/webapi/mensa/parser.py
class Allergen(Enum):
"""This enum provides the allergens used in the mensa menu."""
Egg = 'Ei'
Peanut = 'En'
Fish = 'Fi'
Wheat = 'GlW'
Spelt = 'GlD'
KhorasanWheat = 'GlKW'
Rye = 'GlR'
Barley = 'GlG'
Millet = 'GlH'
Shellfishes = 'Kr'
Lactose = 'La'
Lupin = 'Lu'
Almonds = 'NuM'
Hazelnuts = 'NuH'
Walnuts = 'NuW'
Cashews = 'NuC'
Pecan = 'NuPe'
BrazilNut = 'NuPa'
Pistachio = 'NuPi'
Macadamia = 'NuMa'
Sesame = 'Se'
Mustard = 'Sf'
Celery = 'Sl'
Soy = 'So'
Sulfite = 'Sw'
Mollusca = 'Wt'
DishType (Enum)
¶
This enum provides the dish types used in the mensa menu.
Source code in adviser/examples/webapi/mensa/parser.py
class DishType(Enum):
"""This enum provides the dish types used in the mensa menu."""
Starter = 'starter'
Buffet = 'buffet'
MainDish = 'main_dish'
SideDish = 'side_dish'
Dessert = 'dessert'
@staticmethod
def from_website_name(website_name: str) -> 'DishType':
"""Converts the type as listed on the website into the type used in the dialog system.
Args:
website_name: The name as used in the response to the POST request.
Returns:
The corresponding enum member.
"""
if website_name == 'STARTER':
return DishType.Starter
elif website_name == 'BUFFET':
return DishType.Buffet
elif website_name == 'MAIN DISH':
return DishType.MainDish
elif website_name == 'SIDE DISH':
return DishType.SideDish
elif website_name == 'DESSERT':
return DishType.Dessert
Location (Enum)
¶
This enum provides the possible mensa locations.
Source code in adviser/examples/webapi/mensa/parser.py
Meal
¶
Source code in adviser/examples/webapi/mensa/parser.py
class Meal():
def __init__(self, name: str, day: str, prices: Tuple[float], price_quantity: str,\
allergens:List[Allergen], vegan: bool, vegetarian: bool, fish: bool, pork: bool,\
dish_type: DishType):
"""The class for a meal consisting of a name and several properties (slot-value pairs).
Args:
name: The name of the meal.
day: The day on which the meal is offered.
prices: The price for students and guests.
price_quantity: The unit for which the price is valid.
allergens: The allergens of this meal.
vegan: Whether the meal is vegan or not.
vegetarian: Whether the meal is vegetarian or not.
fish: Whether the meal contains fish or not.
pork: Whether the meal contains pork or not.
dish_type: The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet)
"""
self.name = name
self.day = day
self.prices = prices
self.price_quantity = price_quantity
self.allergens = allergens
self.vegan = vegan
self.vegetarian = vegetarian
self.fish = fish
self.pork = pork
self.dish_type = dish_type
def as_dict(self) -> Dict[str, str]:
"""A dict representation of the meal."""
return {
'name': self.name,
'day': self.day,
'type': self.dish_type.value,
'price': str(self.prices[0]),
'allergens': ', '.join([allergen.value for allergen in self.allergens]) if\
self.allergens is not None else 'none',
'vegan': str(self.vegan).lower(),
'vegetarian': str(self.vegetarian).lower(),
'fish': str(self.fish).lower(),
'pork': str(self.pork).lower()
}
def __str__(self) -> str:
"""The string representation of the meal."""
return (f"Meal(name={self.name}, day={self.day}, prices={self.prices},\
price_quantity={self.price_quantity}, "
f"allergens={self.allergens}, vegan={self.vegan}, vegetarian={self.vegetarian}, "
f"fish={self.fish}, pork={self.pork}, dish_type={self.dish_type})")
def __repr__(self) -> str:
"""The string representation of the meal."""
return str(self)
__init__(self, name, day, prices, price_quantity, allergens, vegan, vegetarian, fish, pork, dish_type)
special
¶
The class for a meal consisting of a name and several properties (slot-value pairs).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the meal. |
required |
day |
str |
The day on which the meal is offered. |
required |
prices |
Tuple[float] |
The price for students and guests. |
required |
price_quantity |
str |
The unit for which the price is valid. |
required |
allergens |
List[adviser.examples.webapi.mensa.parser.Allergen] |
The allergens of this meal. |
required |
vegan |
bool |
Whether the meal is vegan or not. |
required |
vegetarian |
bool |
Whether the meal is vegetarian or not. |
required |
fish |
bool |
Whether the meal contains fish or not. |
required |
pork |
bool |
Whether the meal contains pork or not. |
required |
dish_type |
DishType |
The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet) |
required |
Source code in adviser/examples/webapi/mensa/parser.py
def __init__(self, name: str, day: str, prices: Tuple[float], price_quantity: str,\
allergens:List[Allergen], vegan: bool, vegetarian: bool, fish: bool, pork: bool,\
dish_type: DishType):
"""The class for a meal consisting of a name and several properties (slot-value pairs).
Args:
name: The name of the meal.
day: The day on which the meal is offered.
prices: The price for students and guests.
price_quantity: The unit for which the price is valid.
allergens: The allergens of this meal.
vegan: Whether the meal is vegan or not.
vegetarian: Whether the meal is vegetarian or not.
fish: Whether the meal contains fish or not.
pork: Whether the meal contains pork or not.
dish_type: The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet)
"""
self.name = name
self.day = day
self.prices = prices
self.price_quantity = price_quantity
self.allergens = allergens
self.vegan = vegan
self.vegetarian = vegetarian
self.fish = fish
self.pork = pork
self.dish_type = dish_type
__repr__(self)
special
¶
__str__(self)
special
¶
The string representation of the meal.
Source code in adviser/examples/webapi/mensa/parser.py
def __str__(self) -> str:
"""The string representation of the meal."""
return (f"Meal(name={self.name}, day={self.day}, prices={self.prices},\
price_quantity={self.price_quantity}, "
f"allergens={self.allergens}, vegan={self.vegan}, vegetarian={self.vegetarian}, "
f"fish={self.fish}, pork={self.pork}, dish_type={self.dish_type})")
as_dict(self)
¶
A dict representation of the meal.
Source code in adviser/examples/webapi/mensa/parser.py
def as_dict(self) -> Dict[str, str]:
"""A dict representation of the meal."""
return {
'name': self.name,
'day': self.day,
'type': self.dish_type.value,
'price': str(self.prices[0]),
'allergens': ', '.join([allergen.value for allergen in self.allergens]) if\
self.allergens is not None else 'none',
'vegan': str(self.vegan).lower(),
'vegetarian': str(self.vegetarian).lower(),
'fish': str(self.fish).lower(),
'pork': str(self.pork).lower()
}
MensaParser
¶
Source code in adviser/examples/webapi/mensa/parser.py
class MensaParser():
def __init__(self, cache: bool = True):
"""
The class to issue post requests and parse the response. Will also take care of caching the
parser's results.
Args:
cache (bool): Whether to cache results or not.
"""
#: dict of str: storgae to cache parsed meals
self.storage = {}
self.cache = cache
def _parse(self, date: datetime.datetime) -> List[Meal]:
"""
Issues a request for the given date. The response will be parsed and a list of meals
returned.
Args:
date: The date for which the data will be parsed.
Returns:
:obj:`list` of Meal: List of parsed meals
"""
date_str = date.strftime('%Y-%m-%d')
date_next_week_str = (date + datetime.timedelta(days=7)).strftime('%Y-%m-%d')
data = {
'func': 'make_spl',
# currently we stick with only one mensa location
'locId': Location.STUTTGART_VAIHINGEN.value,
'date': date_str,
'lang': 'en',
'startThisWeek': date_str,
'startNextWeek': date_next_week_str
}
# issue post request
response = requests.post('https://sws2.maxmanager.xyz/inc/ajax-php_konnektor.inc.php',\
headers={}, cookies={}, data=data)
tree = html.fromstring(response.content.decode(response.encoding))
meals = [self._parse_meal(meal, date.strftime('%A')) for meal in\
tree.xpath('//div[contains(@class, "splMeal")]')]
if self.storage:
self.storage[date] = meals
return meals
def _parse_meal(self, meal: html.HtmlElement, day: str) -> Meal:
"""Parse all necessary properties of a meal from html.
Args:
meal: The html.HtmlElement which will be parsed.
day: The day for which this meal is valid.
"""
# name
name = meal.xpath('./div[1]/span/text()')[0].strip()
# price & quantity
div_price =\
meal.xpath('./div[4]/div[1]/text()[preceding-sibling::br or following-sibling::br]')
prices = re.search(r'(\d*,\d*).*?(\d*,\d*)', div_price[0]).groups()
# substitute comma by dot to correctly parse float
prices = tuple(map(lambda price: float(price.replace(',','.')), prices))
if len(div_price) > 1:
price_quantity = re.search(r'\(per (\d*.?)\)', div_price[1]).group(1)
else:
price_quantity = "plate"
# allergens
allergens = meal.xpath('./div[3]/div[1]/div[contains(@class, "azn") and\
not(contains(@class, "ptr")) and\
not(contains(./preceding-sibling::div/@class, "hidden"))]/text()')
if len(allergens) > 8:
# allergens are included
allergens = [Allergen(allergen) for allergen in allergens[0].strip().strip('()')\
.split(', ') if allergen != '' and allergen not in map(str, list(range(1,12)))]
else:
# there are no allergens
allergens = None
# some tags / binary slots
tags = meal.xpath('./div[4]/div[2]/img/@title')
vegan = 'vegan' in tags
vegetarian = 'vegetarian' in tags
fish = 'MSC (MSC-C-51632)' in tags
pork = 'pork' in tags or 'beef/pork' in tags
dish_type = meal.xpath('./preceding-sibling::div[contains(@class, "gruppenkopf")][1]\
/div[contains(@class, "gruppenname")]/text()')[0]
return Meal(name, day, prices, price_quantity, allergens, vegan, vegetarian, fish, pork,\
DishType.from_website_name(dish_type))
def _parse_date(self, date: str) -> datetime.datetime:
"""Parse the given string as date. Allowed is a date given as Y-m-d or one of today,
tomorrow and monday-sunday.
Raises:
ParseDateError: If ``date`` could not be parsed.
"""
try:
# try to parse date
date = datetime.datetime.strptime(date, '%Y-%m-%d')
except ValueError:
# cover some specific cases, e.g. today, tomorrow, wednesday
weekdays =\
['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
if date == 'today':
date = datetime.datetime.today()
elif date == 'tomorrow':
date = datetime.datetime.today() + datetime.timedelta(days=1)
elif date.lower() in weekdays:
today = datetime.datetime.today().weekday()
weekday = weekdays.index(date.lower())
if weekday <= today:
# if today is named shall we consider the weekday next week instead? (<= vs. <)
weekday += 7
date = datetime.datetime.today() + datetime.timedelta(days=weekday-today)
else:
raise ParseDateError
return date
def get_meals(self, date: str, use_cache: bool = True) -> List[Meal]:
"""
Gets the meals for a specified day by either looking them up in the cache or by issuing and
parsing a post request.
Args:
date (str): The date for which the data will be returned.
Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
use_cache (bool): If False will always query the server instead of using the cache.
Returns:
:obj:`list` of Meal: List of meals for specified date
"""
date = self._parse_date(date)
if use_cache and date.date() in self.storage:
# NOTE data could be deprecated
return self.storage[date.date()]
else:
# issue request to server
return self._parse(date)
__init__(self, cache=True)
special
¶
The class to issue post requests and parse the response. Will also take care of caching the parser's results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cache |
bool |
Whether to cache results or not. |
True |
Source code in adviser/examples/webapi/mensa/parser.py
get_meals(self, date, use_cache=True)
¶
Gets the meals for a specified day by either looking them up in the cache or by issuing and
parsing a post request.
Args:
date (str): The date for which the data will be returned.
Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
use_cache (bool): If False will always query the server instead of using the cache.
Returns:
Type | Description |
---|---|
obj: |
Source code in adviser/examples/webapi/mensa/parser.py
def get_meals(self, date: str, use_cache: bool = True) -> List[Meal]:
"""
Gets the meals for a specified day by either looking them up in the cache or by issuing and
parsing a post request.
Args:
date (str): The date for which the data will be returned.
Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
use_cache (bool): If False will always query the server instead of using the cache.
Returns:
:obj:`list` of Meal: List of meals for specified date
"""
date = self._parse_date(date)
if use_cache and date.date() in self.storage:
# NOTE data could be deprecated
return self.storage[date.date()]
else:
# issue request to server
return self._parse(date)
ParseDateError (Exception)
¶
weather
special
¶
domain
¶
API_KEY
¶
WeatherDomain (LookupDomain)
¶
Domain for the Weather API.
Attributes:
Name | Type | Description |
---|---|---|
last_results |
List[dict] |
Current results which the user might request info about |
Source code in adviser/examples/webapi/weather/domain.py
class WeatherDomain(LookupDomain):
"""Domain for the Weather API.
Attributes:
last_results (List[dict]): Current results which the user might request info about
"""
def __init__(self):
LookupDomain.__init__(self, 'WeatherAPI', 'Weather')
self.last_results = []
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): Slot-value mapping of constraints.
If empty, all entities in the database will be returned.
requested_slots (Iterable): list of slots that should be returned in addition to the
system requestable slots and the primary key
"""
if 'location' in constraints and 'date' in constraints:
forecast = self._query(constraints['location'], constraints['date'])
if forecast is None:
return []
temperature = int('%.0f' % (float(forecast['main']['temp']) - 273.15))
description = forecast['weather'][0]['description']
result_dict = {
'artificial_id': str(len(self.last_results)),
'temperature': temperature,
'description': description,
'location': constraints['location'],
'date': constraints['date'],
}
if any(True for _ in requested_slots):
cleaned_result_dict = {slot: result_dict[slot] for slot in requested_slots}
else:
cleaned_result_dict = result_dict
self.last_results.append(cleaned_result_dict)
return [cleaned_result_dict]
else:
return []
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
return [self.last_results[int(entity_id)]]
def get_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the user. """
return ['temperature', 'description']
def get_system_requestable_slots(self) -> List[str]:
""" Returns a list of all slots requestable by the system. """
return ['location', 'date']
def get_informable_slots(self) -> List[str]:
""" Returns a list of all informable slots. """
return ['location', 'date']
def get_mandatory_slots(self) -> List[str]:
""" Returns a list of all mandatory slots. """
return ['location', 'date']
def get_default_inform_slots(self) -> List[str]:
""" Returns a list of all default Inform slots. """
return ['temperature', 'description']
def get_possible_values(self, slot: str) -> List[str]:
""" Returns all possible values for an informable slot
Args:
slot (str): name of the slot
Returns:
a list of strings, each string representing one possible value for
the specified slot.
"""
raise BaseException('all slots in this domain do not have a fixed set of '
'values, so this method should never be called')
def get_primary_key(self) -> str:
""" Returns the slot name that will be used as the 'name' of an entry """
return 'artificial_id'
def _query(self, location, date):
"""if location is None:
location = 'Stuttgart'
if date is None:
date = datetime.now()"""
url = f'http://api.openweathermap.org/data/2.5/forecast?q={location}&APPID={API_KEY}'
try:
f = urlopen(url)
forecasts = json.loads(f.read())['list']
return self._find_closest(forecasts, date)
except BaseException as e:
raise(e)
return None
def _find_closest(self, forecasts, preferred_time):
""" From a list of forecasts, find the one which is closest to the specified time"""
closest_forecast = None
closest_difference = None
for forecast in forecasts:
forecast_time = datetime.fromtimestamp(int(forecast['dt']))
time_difference = abs(preferred_time - forecast_time)
if closest_forecast is None or time_difference < closest_difference:
closest_forecast = forecast
closest_difference = time_difference
else:
return closest_forecast
return None
def get_keyword(self):
return 'weather'
__init__(self)
special
¶
find_entities(self, constraints, requested_slots=<tuple_iterator object at 0x7f1ef2506700>)
¶
Returns all entities from the data backend that meet the constraints.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
constraints |
dict |
Slot-value mapping of constraints. If empty, all entities in the database will be returned. |
required |
requested_slots |
Iterable |
list of slots that should be returned in addition to the system requestable slots and the primary key |
<tuple_iterator object at 0x7f1ef2506700> |
Source code in adviser/examples/webapi/weather/domain.py
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
""" Returns all entities from the data backend that meet the constraints.
Args:
constraints (dict): Slot-value mapping of constraints.
If empty, all entities in the database will be returned.
requested_slots (Iterable): list of slots that should be returned in addition to the
system requestable slots and the primary key
"""
if 'location' in constraints and 'date' in constraints:
forecast = self._query(constraints['location'], constraints['date'])
if forecast is None:
return []
temperature = int('%.0f' % (float(forecast['main']['temp']) - 273.15))
description = forecast['weather'][0]['description']
result_dict = {
'artificial_id': str(len(self.last_results)),
'temperature': temperature,
'description': description,
'location': constraints['location'],
'date': constraints['date'],
}
if any(True for _ in requested_slots):
cleaned_result_dict = {slot: result_dict[slot] for slot in requested_slots}
else:
cleaned_result_dict = result_dict
self.last_results.append(cleaned_result_dict)
return [cleaned_result_dict]
else:
return []
find_info_about_entity(self, entity_id, requested_slots)
¶
Returns the values (stored in the data backend) of the specified slots for the specified entity.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_id |
str |
primary key value of the entity |
required |
requested_slots |
dict |
slot-value mapping of constraints |
required |
Source code in adviser/examples/webapi/weather/domain.py
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
""" Returns the values (stored in the data backend) of the specified slots for the
specified entity.
Args:
entity_id (str): primary key value of the entity
requested_slots (dict): slot-value mapping of constraints
"""
return [self.last_results[int(entity_id)]]
get_default_inform_slots(self)
¶
get_informable_slots(self)
¶
get_keyword(self)
¶
get_mandatory_slots(self)
¶
get_possible_values(self, slot)
¶
Returns all possible values for an informable slot
Parameters:
Name | Type | Description | Default |
---|---|---|---|
slot |
str |
name of the slot |
required |
Returns:
Type | Description |
---|---|
List[str] |
a list of strings, each string representing one possible value for the specified slot. |
Source code in adviser/examples/webapi/weather/domain.py
def get_possible_values(self, slot: str) -> List[str]:
""" Returns all possible values for an informable slot
Args:
slot (str): name of the slot
Returns:
a list of strings, each string representing one possible value for
the specified slot.
"""
raise BaseException('all slots in this domain do not have a fixed set of '
'values, so this method should never be called')
get_primary_key(self)
¶
get_requestable_slots(self)
¶
get_system_requestable_slots(self)
¶
nlg
¶
WeatherNLG (Service)
¶
Simple NLG for the weather domain
Source code in adviser/examples/webapi/weather/nlg.py
class WeatherNLG(Service):
"""Simple NLG for the weather domain"""
def __init__(self, domain, logger=DiasysLogger()):
# only calls super class' constructor
super(WeatherNLG, self).__init__(domain, debug_logger=logger)
@PublishSubscribe(sub_topics=["sys_act"], pub_topics=["sys_utterance"])
def generate_system_utterance(self, sys_act: SysAct = None) -> dict(sys_utterance=str):
"""Main function for generating and publishing the system utterance
Args:
sys_act: the system act for which to create a natural language realisation
Returns:
dict with "sys_utterance" as key and the system utterance as value
"""
if sys_act is None or sys_act.type == SysActionType.Welcome:
return {'sys_utterance': 'Hi! What do you want to know about the weather?'}
if sys_act.type == SysActionType.Bad:
return {'sys_utterance': 'Sorry, I could not understand you.'}
elif sys_act.type == SysActionType.Bye:
return {'sys_utterance': 'Thank you, good bye'}
elif sys_act.type == SysActionType.Request:
slot = list(sys_act.slot_values.keys())[0]
if slot == 'date':
return {'sys_utterance': 'For which day are you looking for the weather?'}
elif slot == 'location':
return {'sys_utterance': 'Which city are you at?'}
else:
assert False, 'Only the date and the location can be requested'
else:
location = sys_act.slot_values['location'][0]
date = sys_act.slot_values['date'][0]
date_str = date.strftime('on %B %-d at %-I %p')
temperature = sys_act.slot_values['temperature'][0]
description = sys_act.slot_values['description'][0]
return {'sys_utterance': f'The weather in {location} {date_str} is {temperature} degrees celsius with {description}.'}
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>)
special
¶
generate_system_utterance(self, *args, **kwargs)
¶
Source code in adviser/examples/webapi/weather/nlg.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result
nlu
¶
WEATHER_DATE_TODAY_REGEXES
¶
WEATHER_DATE_TOMORROW_REGEXES
¶
WEATHER_LOCATION_REGEXES
¶
WeatherNLU (Service)
¶
Very simple NLU for the weather domain.
Source code in adviser/examples/webapi/weather/nlu.py
class WeatherNLU(Service):
"""Very simple NLU for the weather domain."""
def __init__(self, domain, logger=DiasysLogger()):
# only calls super class' constructor
super(WeatherNLU, self).__init__(domain, debug_logger=logger)
@PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
def extract_user_acts(self, user_utterance: str = None) -> dict(user_acts=List[UserAct]):
"""Main function for detecting and publishing user acts.
Args:
user_utterance: the user input string
Returns:
dict with key 'user_acts' and list of user acts as value
"""
user_acts = []
if not user_utterance:
return {'user_acts': None}
user_utterance = ' '.join(user_utterance.lower().split())
for bye in ('bye', 'goodbye', 'byebye', 'seeyou'):
if user_utterance.replace(' ', '').endswith(bye):
return {'user_acts': [UserAct(user_utterance, UserActionType.Bye)]}
# check weather today
for regex in WEATHER_DATE_TODAY_REGEXES:
match = regex.search(user_utterance)
if match:
user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'date', datetime.now()))
break
if len(user_acts) == 0:
for regex in WEATHER_DATE_TOMORROW_REGEXES:
match = regex.search(user_utterance)
if match:
tomorrow = datetime.now() + timedelta(days=1)
date = datetime(tomorrow.year, tomorrow.month, tomorrow.day, hour=15)
user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'date', date))
break
for regex in WEATHER_LOCATION_REGEXES:
match = regex.search(user_utterance)
if match:
user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'location', match.group(1)))
self.debug_logger.dialog_turn("User Actions: %s" % str(user_acts))
return {'user_acts': user_acts}
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>)
special
¶
extract_user_acts(self, *args, **kwargs)
¶
Source code in adviser/examples/webapi/weather/nlu.py
def delegate(self, *args, **kwargs):
func_inst = getattr(self, func.__name__)
callargs = list(args)
if self in callargs: # remove self when in *args, because already known to function
callargs.remove(self)
result = func(self, *callargs, **kwargs)
if result:
# fix! (user could have multiple "/" characters in topic - only use last one )
domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
result = {key.split("/")[0]: result[key] for key in result}
if func_inst not in self._publish_sockets:
# not a publisher, just normal function
return result
socket = self._publish_sockets[func_inst]
domain = self._domain_name
if socket and result:
# publish messages
for topic in pub_topics:
# for topic in result: # NOTE publish any returned value in dict with it's key as topic
if topic in result:
domain = domain if domain else domains[topic]
topic_domain_str = f"{topic}/{domain}" if domain else topic
if topic in self._pub_topic_domains:
topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
_send_msg(socket, topic_domain_str, result[topic])
if self.debug_logger:
self.debug_logger.info(
f"- (DS): sent message from {func} to topic {topic_domain_str}:\n {result[topic]}")
return result