Skip to content

Examples

qa special

worldknowledge special

domain

WorldKnowledgeDomain (LookupDomain)

Question answering for the world knowledge domain.

Attributes:

Name Type Description
- artificial_id_counter (int

pseudo identifier for each entry

- name_lex (Dict[str,str]

lexicon for matching topic's names to their KG entity

Source code in adviser/examples/qa/worldknowledge/domain.py
class WorldKnowledgeDomain(LookupDomain):
    # Attribute docstrings currently not considered by mkdocstrings -> write plugin?

    """
    Question answering for the world knowledge domain.

    Attributes:
        - artificial_id_counter (int): pseudo identifier for each entry
        - name_lex (Dict[str,str]): lexicon for matching topic's names to their KG entity
    """

    def __init__(self):
        """Calls super class' constructor and loads name lexicon"""

        LookupDomain.__init__(self, 'CSQA', 'World Knowledge')

        self.artificial_id_counter = 1 #int: lexicon for matching topic's names to their KG entity

        self.name_lex = self._init_name_lexicon()
        """Dict[str,str]: lexicon for matching topic's names to their KG entity."""

    def _init_name_lexicon(self):
        with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'name_dict.json')) as f:
            return json.load(f)

    def _find_topic_entities(self, term):
        entities = self.name_lex.get(term, [])
        return [(ent['id'], ent['label']) for ent in entities]

    def _perform_out_query(self, relation, topic):
        url = 'https://query.wikidata.org/sparql'
        query = """ SELECT ?item ?itemLabel
                    WHERE
                    {
                    wd:%s wdt:%s ?item.
                    ?item rdfs:label ?itemLabel.
                    FILTER(LANG(?itemLabel) = 'en').
                    }""" % (topic, relation)
        body = 'query=(%s)' % urllib.parse.quote(query)
        body = {'query': query}
        # print(body, data_points)
        r = requests.post(url, params = {'format': 'json', 'content-type': 'application/sparql-query', 'user-agent': 'Python 3.6.8'}, data=body)
        # print('Status', r.status_code)
        data = r.json()
        return [res['itemLabel']['value'] for res in data['results'] ['bindings']]

    def _perform_in_query(self, relation, topic):
        url = 'https://query.wikidata.org/sparql'
        query = """ SELECT ?item ?itemLabel
                    WHERE
                    {
                    ?item wdt:%s wd:%s.
                    ?item rdfs:label ?itemLabel.
                    FILTER(LANG(?itemLabel) = 'en').
                    }""" % (relation, topic)
        body = 'query=(%s)' % urllib.parse.quote(query)
        body = {'query': query}
        r = requests.post(url, params = {
                'format': 'json',
                'content-type': 'application/sparql-query',
                'user-agent': 'Python 3.6.8'
            }, data=body)
        data = r.json()
        return [res['itemLabel']['value'] for res in data['results'] ['bindings']]

    def find_entities(self, constraints: dict):
        """ Returns all entities from the data backend that meet the constraints.

        Args:
            constraints (dict): slot-value mapping of constraints
        """
        assert 'relation' in constraints
        assert 'topic' in constraints
        assert 'direction' in constraints

        topics = self._find_topic_entities(constraints['topic'])
        if not topics:
            return []

        answers = []
        for topic_id, topic_label in topics:
            answer_ids = []
            if constraints['direction'] == 'out':
                answer_ids = self._perform_out_query(constraints['relation'], topic_id)
                for answer_id in answer_ids:
                    answers.append({
                        'subject': topic_label,
                        'predicate': constraints['relation'],
                        'object': answer_id
                    })
                    self.artificial_id_counter += 1
            else:
                answer_ids = self._perform_in_query(constraints['relation'], topic_id)
                for answer_id in answer_ids:
                    answers.append({
                        'subject': answer_id,
                        'predicate': constraints['relation'],
                        'object': topic_label
                    })
                    self.artificial_id_counter += 1

        return answers

    def find_info_about_entity(self, entity_id, requested_slots: Iterable):
        """ Returns the values (stored in the data backend) of the specified slots for the
            specified entity.

        Args:
            entity_id (str): primary key value of the entity
            requested_slots (dict): slot-value mapping of constraints
        """
        raise BaseException('should not be called')

    def get_domain_name(self):
        return "qa"

    def get_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the user. """
        return ['subject', 'predicate', 'object', 'object_type']

    def get_system_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the system. """
        return ['relation', 'topic', 'direction']

    def get_informable_slots(self) -> List[str]:
        """ Returns a list of all informable slots. """
        return ['relation', 'topic', 'direction']

    def get_mandatory_slots(self) -> List[str]:
        """ Returns a list of all mandatory slots. """
        return ['relation', 'topic', 'direction']

    def get_possible_values(self, slot: str) -> List[str]:
        """ Returns all possible values for an informable slot

        Args:
            slot (str): name of the slot

        Returns:
            a list of strings, each string representing one possible value for
            the specified slot.
        """
        # 'assert False, "this method should not be called"'
        raise BaseException('should not be called')

    def get_primary_key(self) -> str:
        """ Returns the slot name that will be used as the 'name' of an entry """
        return 'artificial_id'

    def get_keyword(self):
        return 'world knowledge'
__init__(self) special

Calls super class' constructor and loads name lexicon

Source code in adviser/examples/qa/worldknowledge/domain.py
def __init__(self):
    """Calls super class' constructor and loads name lexicon"""

    LookupDomain.__init__(self, 'CSQA', 'World Knowledge')

    self.artificial_id_counter = 1 #int: lexicon for matching topic's names to their KG entity

    self.name_lex = self._init_name_lexicon()
    """Dict[str,str]: lexicon for matching topic's names to their KG entity."""
find_entities(self, constraints)

Returns all entities from the data backend that meet the constraints.

Parameters:

Name Type Description Default
constraints dict

slot-value mapping of constraints

required
Source code in adviser/examples/qa/worldknowledge/domain.py
def find_entities(self, constraints: dict):
    """ Returns all entities from the data backend that meet the constraints.

    Args:
        constraints (dict): slot-value mapping of constraints
    """
    assert 'relation' in constraints
    assert 'topic' in constraints
    assert 'direction' in constraints

    topics = self._find_topic_entities(constraints['topic'])
    if not topics:
        return []

    answers = []
    for topic_id, topic_label in topics:
        answer_ids = []
        if constraints['direction'] == 'out':
            answer_ids = self._perform_out_query(constraints['relation'], topic_id)
            for answer_id in answer_ids:
                answers.append({
                    'subject': topic_label,
                    'predicate': constraints['relation'],
                    'object': answer_id
                })
                self.artificial_id_counter += 1
        else:
            answer_ids = self._perform_in_query(constraints['relation'], topic_id)
            for answer_id in answer_ids:
                answers.append({
                    'subject': answer_id,
                    'predicate': constraints['relation'],
                    'object': topic_label
                })
                self.artificial_id_counter += 1

    return answers
find_info_about_entity(self, entity_id, requested_slots)

Returns the values (stored in the data backend) of the specified slots for the specified entity.

Parameters:

Name Type Description Default
entity_id str

primary key value of the entity

required
requested_slots dict

slot-value mapping of constraints

required
Source code in adviser/examples/qa/worldknowledge/domain.py
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
    """ Returns the values (stored in the data backend) of the specified slots for the
        specified entity.

    Args:
        entity_id (str): primary key value of the entity
        requested_slots (dict): slot-value mapping of constraints
    """
    raise BaseException('should not be called')
get_domain_name(self)

Return the domain name of the current ontology.

Returns:

Type Description
object
Source code in adviser/examples/qa/worldknowledge/domain.py
def get_domain_name(self):
    return "qa"
get_informable_slots(self)

Returns a list of all informable slots.

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_informable_slots(self) -> List[str]:
    """ Returns a list of all informable slots. """
    return ['relation', 'topic', 'direction']
get_keyword(self)
Source code in adviser/examples/qa/worldknowledge/domain.py
def get_keyword(self):
    return 'world knowledge'
get_mandatory_slots(self)

Returns a list of all mandatory slots.

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_mandatory_slots(self) -> List[str]:
    """ Returns a list of all mandatory slots. """
    return ['relation', 'topic', 'direction']
get_possible_values(self, slot)

Returns all possible values for an informable slot

Parameters:

Name Type Description Default
slot str

name of the slot

required

Returns:

Type Description
List[str]

a list of strings, each string representing one possible value for the specified slot.

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_possible_values(self, slot: str) -> List[str]:
    """ Returns all possible values for an informable slot

    Args:
        slot (str): name of the slot

    Returns:
        a list of strings, each string representing one possible value for
        the specified slot.
    """
    # 'assert False, "this method should not be called"'
    raise BaseException('should not be called')
get_primary_key(self)

Returns the slot name that will be used as the 'name' of an entry

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_primary_key(self) -> str:
    """ Returns the slot name that will be used as the 'name' of an entry """
    return 'artificial_id'
get_requestable_slots(self)

Returns a list of all slots requestable by the user.

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the user. """
    return ['subject', 'predicate', 'object', 'object_type']
get_system_requestable_slots(self)

Returns a list of all slots requestable by the system.

Source code in adviser/examples/qa/worldknowledge/domain.py
def get_system_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the system. """
    return ['relation', 'topic', 'direction']
get_root_dir()
Source code in adviser/examples/qa/worldknowledge/domain.py
def get_root_dir():
    return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

multinlg

Handcrafted (i.e. template-based) Natural Language Generation Module

MultiNLG (HandcraftedNLG)

Extension of the handcrafted NLG by allowing multiple system acts.

This change is necessary for QA, since the policy publishes multiple system acts.

Source code in adviser/examples/qa/worldknowledge/multinlg.py
class MultiNLG(HandcraftedNLG):
    """Extension of the handcrafted NLG by allowing multiple system acts.

    This change is necessary for QA, since the policy publishes multiple system acts.
    """
    def __init__(self, domain: Domain, template_file: str = None, sub_topic_domains: Dict[str, str] = {},
                 logger: DiasysLogger = DiasysLogger(), template_file_german: str = None,
                 language: Language = None):
        # only calls the super class' constructor
        HandcraftedNLG.__init__(self, domain, template_file, sub_topic_domains, logger, template_file_german, language)

    @PublishSubscribe(sub_topics=["sys_acts"], pub_topics=["sys_utterance"])
    def publish_system_utterance(self, sys_acts: List[SysAct] = None) -> dict(sys_utterance=str):
        """Generates the system utterance and publishes it.

        Instead of the HandcraftedNLG class, this class takes multiple system acts and
        applies the templates for each system act separately. Each message is then printed in a
        new line.

        Args:
            sys_acts: The list of system acts as published by the (QA) policy

        Returns:
            a dict containing the system utterance
        """
        message = '\n'.join([self.generate_system_utterance(sys_act) for sys_act in sys_acts])
        return {'sys_utterance': message}
__init__(self, domain, template_file=None, sub_topic_domains={}, logger=<DiasysLogger adviser (NOTSET)>, template_file_german=None, language=None) special
Source code in adviser/examples/qa/worldknowledge/multinlg.py
def __init__(self, domain: Domain, template_file: str = None, sub_topic_domains: Dict[str, str] = {},
             logger: DiasysLogger = DiasysLogger(), template_file_german: str = None,
             language: Language = None):
    # only calls the super class' constructor
    HandcraftedNLG.__init__(self, domain, template_file, sub_topic_domains, logger, template_file_german, language)
publish_system_utterance(self, *args, **kwargs)
Source code in adviser/examples/qa/worldknowledge/multinlg.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result

neuralmodels special

director
Classifier (Module)

Neural network for predicting the relation's direction (outgoing or incoming).

The model uses a question encoder to classify the question as one of the two classes "outgoing" or "incoming".

Attributes:

Name Type Description
hidden_dim int

Size of the Bi_LSTM's hidden layer

out_dim int

Size of the output layer (here: 2)

diminisher nn.Module

Fine-tuning embedding layer, good for reducing Bi-LSTM's size

lstm nn.Module

Bi-LSTM for encoding a question

hidden2tag nn.Module

Output layer

Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
class Classifier(nn.Module):
    """Neural network for predicting the relation's direction (outgoing or incoming).

    The model uses a question encoder to classify the question as one of the two classes
    "outgoing" or "incoming".

    Attributes:
        hidden_dim (int): Size of the Bi_LSTM's hidden layer
        out_dim (int): Size of the output layer (here: 2)
        diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTM's size
        lstm (nn.Module): Bi-LSTM for encoding a question
        hidden2tag (nn.Module): Output layer
    """

    def __init__(self, emb_dim: int, lstm_out_dim: int, num_classes: int):
        """Initialises all required elements of the neural network.

        Args:
            emb_dim: Output size of the fine-tuning embedding layer
            lstm_out_dim: Output size of the Bi-LSTM
            num_classes: Size of the output layer (in this context, always 2)
        """
        super(Classifier, self).__init__()
        self.hidden_dim = lstm_out_dim
        self.out_dim = num_classes

        self.diminisher = nn.Linear(768, emb_dim)
        self.lstm = nn.LSTM(emb_dim, lstm_out_dim, bidirectional=True)
        self.hidden2tag = nn.Linear(lstm_out_dim*2, self.out_dim)

    def forward(self, embeds: torch.Tensor) -> torch.Tensor:
        """Application of the neural network on a given input question.

        Args:
            embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

        Returns:
            Probabilities of the two classes "incoming" and "outgoing"
        """
        embeds = self.diminisher(embeds)
        lstm_out, _ = self.lstm(embeds)
        tag_space = self.hidden2tag(lstm_out[0])
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores
__init__(self, emb_dim, lstm_out_dim, num_classes) special

Initialises all required elements of the neural network.

Parameters:

Name Type Description Default
emb_dim int

Output size of the fine-tuning embedding layer

required
lstm_out_dim int

Output size of the Bi-LSTM

required
num_classes int

Size of the output layer (in this context, always 2)

required
Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
def __init__(self, emb_dim: int, lstm_out_dim: int, num_classes: int):
    """Initialises all required elements of the neural network.

    Args:
        emb_dim: Output size of the fine-tuning embedding layer
        lstm_out_dim: Output size of the Bi-LSTM
        num_classes: Size of the output layer (in this context, always 2)
    """
    super(Classifier, self).__init__()
    self.hidden_dim = lstm_out_dim
    self.out_dim = num_classes

    self.diminisher = nn.Linear(768, emb_dim)
    self.lstm = nn.LSTM(emb_dim, lstm_out_dim, bidirectional=True)
    self.hidden2tag = nn.Linear(lstm_out_dim*2, self.out_dim)
forward(self, embeds)

Application of the neural network on a given input question.

Parameters:

Name Type Description Default
embeds Tensor

Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

required

Returns:

Type Description
Tensor

Probabilities of the two classes "incoming" and "outgoing"

Source code in adviser/examples/qa/worldknowledge/neuralmodels/director.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
    """Application of the neural network on a given input question.

    Args:
        embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

    Returns:
        Probabilities of the two classes "incoming" and "outgoing"
    """
    embeds = self.diminisher(embeds)
    lstm_out, _ = self.lstm(embeds)
    tag_space = self.hidden2tag(lstm_out[0])
    tag_scores = F.log_softmax(tag_space, dim=1)
    return tag_scores
simpledot
SimpleDot (Module)

Neural network for predicting the relation of a question.

The simple dot approach compares a question with each possible relation candidate by taking the (simple) dot product between the encoded question and every encoded relation.

Attributes:

Name Type Description
softmax bool

whether or not the scores should be converted to probabilities

hidden int

size of the hidden layer

relations_tensor torch.autograd.Variable

embeddings for all relation descriptions

diminisher nn.Module

Fine-tuning embedding layer, good for reducing Bi-LSTMs' size

lstm_question nn.Module

Bi-LSTM for encoding a question

lstm_relation nn.Module

Bi-LSTM for encoding a relation

Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
class SimpleDot(nn.Module):
    """Neural network for predicting the relation of a question.

    The simple dot approach compares a question with each possible relation candidate by
    taking the (simple) dot product between the encoded question and every encoded relation.

    Attributes:
        softmax (bool): whether or not the scores should be converted to probabilities
        hidden (int): size of the hidden layer
        relations_tensor (torch.autograd.Variable): embeddings for all relation descriptions
        diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTMs' size
        lstm_question (nn.Module): Bi-LSTM for encoding a question
        lstm_relation (nn.Module): Bi-LSTM for encoding a relation
    """

    def __init__(self, emb_dim: int, hidden_dim: int, softmax: bool = True):
        """Initialises all required elements of the neural network.

        Args:
            emb_dim: Output size of the fine-tuning embedding layer
            hidden_dim: Output size of the Bi-LSTM
            softmax: Whether or not a softmax is applied on the output layer
        """
        super(SimpleDot, self).__init__()
        self.softmax = softmax
        self.hidden = hidden_dim

        self.relations_tensor = self._initialise_relations_tensor()
        self.diminisher = nn.Linear(768, emb_dim)
        self.lstm_question = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
        self.lstm_relation = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)

    def _initialise_relations_tensor(self, max_rel_len: int = 30) -> torch.autograd.Variable:
        """Creates a tensor containing word embeddings of all relation descriptions.

        To be processable, the tensor is transformed to the shape
        |token| x |relation| x |embeddings|.

        Keyword Arguments:
            max_rel_len: maximum number of tokens in the relation descriptions

        Returns:
            A tensor containing word embeddings of all relation descriptions
        """
        relations_list = []
        tokens = []
        with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'csqa_relation_embeddings.bin'), 'rb') as f:
            rels = pickle.load(f)
            relations_list = [emb[1] for emb in rels]
            tokens = [emb[0] for emb in rels]

        relations = []  # R x T_R x E
        for relation in relations_list:
            relation.extend([np.zeros(768, dtype='float32')] * (max_rel_len- len(relation)))
            relations.append(relation[:max_rel_len])
        # T_R x R x E
        return torch.autograd.Variable(torch.Tensor(relations).transpose(0,1), requires_grad=False)

    def forward(self, embeds: torch.Tensor) -> torch.Tensor:
        """Application of the neural network on a given input question.

        Args:
            embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

        Returns:
            Probabilities for the relation classes
        """
        embeds = self.diminisher(embeds)
        relations_embeds = self.diminisher(self.relations_tensor)

        question_out, _ = self.lstm_question(embeds)  # T_Q x B x H
        relation_out, _ = self.lstm_relation(relations_embeds)  # T_R x R x H
        last_question_out = question_out[0][:,self.hidden:]  # B x H
        last_relation_out = relation_out[0][:,self.hidden:]  # R x H

        # relation prediction
        rel_scores = torch.matmul(last_question_out, last_relation_out.transpose(0,1))
        if self.softmax:
            rel_scores = F.log_softmax(rel_scores, dim=1)
        return rel_scores
__init__(self, emb_dim, hidden_dim, softmax=True) special

Initialises all required elements of the neural network.

Parameters:

Name Type Description Default
emb_dim int

Output size of the fine-tuning embedding layer

required
hidden_dim int

Output size of the Bi-LSTM

required
softmax bool

Whether or not a softmax is applied on the output layer

True
Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
def __init__(self, emb_dim: int, hidden_dim: int, softmax: bool = True):
    """Initialises all required elements of the neural network.

    Args:
        emb_dim: Output size of the fine-tuning embedding layer
        hidden_dim: Output size of the Bi-LSTM
        softmax: Whether or not a softmax is applied on the output layer
    """
    super(SimpleDot, self).__init__()
    self.softmax = softmax
    self.hidden = hidden_dim

    self.relations_tensor = self._initialise_relations_tensor()
    self.diminisher = nn.Linear(768, emb_dim)
    self.lstm_question = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
    self.lstm_relation = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
forward(self, embeds)

Application of the neural network on a given input question.

Parameters:

Name Type Description Default
embeds Tensor

Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

required

Returns:

Type Description
Tensor

Probabilities for the relation classes

Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
    """Application of the neural network on a given input question.

    Args:
        embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

    Returns:
        Probabilities for the relation classes
    """
    embeds = self.diminisher(embeds)
    relations_embeds = self.diminisher(self.relations_tensor)

    question_out, _ = self.lstm_question(embeds)  # T_Q x B x H
    relation_out, _ = self.lstm_relation(relations_embeds)  # T_R x R x H
    last_question_out = question_out[0][:,self.hidden:]  # B x H
    last_relation_out = relation_out[0][:,self.hidden:]  # R x H

    # relation prediction
    rel_scores = torch.matmul(last_question_out, last_relation_out.transpose(0,1))
    if self.softmax:
        rel_scores = F.log_softmax(rel_scores, dim=1)
    return rel_scores
get_root_dir()
Source code in adviser/examples/qa/worldknowledge/neuralmodels/simpledot.py
def get_root_dir():
    return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
tagger
TAGS
Tagger (Module)

Neural network for predicting the topic entities.

The model uses a question encoder and classifies each token using the BIO tag set.

Attributes:

Name Type Description
hidden_dim int

Size of the Bi_LSTM's hidden layer

diminisher nn.Module

Fine-tuning embedding layer, good for reducing Bi-LSTM's size

lstm nn.Module

Bi-LSTM

hidden2label nn.Module

Output layer

Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
class Tagger(nn.Module):
    """Neural network for predicting the topic entities.

    The model uses a question encoder and classifies each token using the BIO tag set.

    Attributes:
        hidden_dim (int): Size of the Bi_LSTM's hidden layer
        diminisher (nn.Module): Fine-tuning embedding layer, good for reducing Bi-LSTM's size
        lstm (nn.Module): Bi-LSTM
        hidden2label (nn.Module): Output layer
    """

    def __init__(self, emb_dim: int, hidden_dim: int):
        """Initialises all required elements of the neural network.

        Args:
            emb_dim: Output size of the fine-tuning embedding layer
            hidden_dim: Hidden layer size of the Bi-LSTM
        """
        super(Tagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.diminisher = nn.Linear(768, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
        self.hidden2label = nn.Linear(hidden_dim*2, len(TAGS))

    def forward(self, embeds: torch.Tensor) -> torch.Tensor:
        """Application of the neural network on a given input question.

        Args:
            embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

        Returns:
            Probabilities of each BIO tag for all tokens
        """
        embeds = self.diminisher(embeds)
        lstm_out, _ = self.lstm(embeds)
        label_space = self.hidden2label(lstm_out[1:])
        label_scores = F.log_softmax(label_space, dim=2)
        return label_scores
__init__(self, emb_dim, hidden_dim) special

Initialises all required elements of the neural network.

Parameters:

Name Type Description Default
emb_dim int

Output size of the fine-tuning embedding layer

required
hidden_dim int

Hidden layer size of the Bi-LSTM

required
Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def __init__(self, emb_dim: int, hidden_dim: int):
    """Initialises all required elements of the neural network.

    Args:
        emb_dim: Output size of the fine-tuning embedding layer
        hidden_dim: Hidden layer size of the Bi-LSTM
    """
    super(Tagger, self).__init__()
    self.hidden_dim = hidden_dim

    self.diminisher = nn.Linear(768, emb_dim)
    self.lstm = nn.LSTM(emb_dim, hidden_dim, bidirectional=True)
    self.hidden2label = nn.Linear(hidden_dim*2, len(TAGS))
forward(self, embeds)

Application of the neural network on a given input question.

Parameters:

Name Type Description Default
embeds Tensor

Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

required

Returns:

Type Description
Tensor

Probabilities of each BIO tag for all tokens

Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def forward(self, embeds: torch.Tensor) -> torch.Tensor:
    """Application of the neural network on a given input question.

    Args:
        embeds: Tensor containing the embeddings of shape |Token| x |Batch| x |Embedding Size|

    Returns:
        Probabilities of each BIO tag for all tokens
    """
    embeds = self.diminisher(embeds)
    lstm_out, _ = self.lstm(embeds)
    label_space = self.hidden2label(lstm_out[1:])
    label_scores = F.log_softmax(label_space, dim=2)
    return label_scores
extract_entities(tokens, tag_idxs)

Extracts entities using the predicted BIO tags for each token

Parameters:

Name Type Description Default
tokens List[str]

question's tokens

required
tag_idxs List[int]

index of the BIO tag for each token in the question

required

Returns:

Type Description
List[List[str]]

List of entities, i.e. list of connected tokens

Source code in adviser/examples/qa/worldknowledge/neuralmodels/tagger.py
def extract_entities(tokens: List[str], tag_idxs: List[int]) -> List[List[str]]:
    """Extracts entities using the predicted BIO tags for each token

    Arguments:
        tokens: question's tokens
        tag_idxs: index of the BIO tag for each token in the question

    Returns:
        List of entities, i.e. list of connected tokens
    """
    entities = []
    curr_entity = []
    tags = [TAGS[tag_idx] for tag_idx in tag_idxs]
    for i, (token, tag) in enumerate(zip(tokens, tags)):
        if tag == 'I':
            curr_entity.append(token)
            continue
        else:
            if curr_entity:
                entities.append(curr_entity)
                curr_entity = []
            if tag == 'B':
                curr_entity.append(token)
    if curr_entity:
        entities.append(curr_entity)
    return entities

policyqa

QaPolicy (Service)

Policy module for question answering.

Provides a simple rule-based policy for question answering. The QA module assumes that the user acts contain information about relation, topic entities and relation direction. Adequate answers are looked up in the knowledge and published.

The difference to the default HandcraftedPolicy is that no BST is needed and that multiple system acts can be published.

Source code in adviser/examples/qa/worldknowledge/policyqa.py
class QaPolicy(Service):
    """Policy module for question answering.

    Provides a simple rule-based policy for question answering.
    The QA module assumes that the user acts contain information about relation, topic entities
    and relation direction.
    Adequate answers are looked up in the knowledge and published.

    The difference to the default HandcraftedPolicy is that no BST is needed and that multiple
    system acts can be published.
    """
    def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
        # only call super class' constructor
        Service.__init__(self, domain=domain, debug_logger=logger)

    @PublishSubscribe(sub_topics=["user_acts"], pub_topics=["sys_acts"])
    def generate_sys_acts(self, user_acts: List[UserAct] = None) -> dict(sys_acts=List[SysAct]):
        """Generates system acts by looking up answers to the given user question.

        Args:
            user_acts: The list of user acts containing information about the predicted relation,
                topic entities and relation direction

        Returns:
            dict with 'sys_acts' as key and list of system acts as value
        """
        if user_acts is None:
            return { 'sys_acts': [SysAct(SysActionType.Welcome)]}
        elif any([user_act.type == UserActionType.Bye for user_act in user_acts]):
            return { 'sys_acts': [SysAct(SysActionType.Bye)] }
        elif not user_acts:
            return { 'sys_acts': [SysAct(SysActionType.Bad)] }

        user_acts = [user_act for user_act in user_acts if user_act.type != UserActionType.SelectDomain]
        if len(user_acts) == 0:
           return { 'sys_acts': [SysAct(SysActionType.Welcome)]} 

        relation = [user_act.value for user_act in user_acts \
            if user_act.type == UserActionType.Inform and user_act.slot == 'relation'][0]
        topics = [user_act.value for user_act in user_acts \
            if user_act.type == UserActionType.Inform and user_act.slot == 'topic']
        direction = [user_act.value for user_act in user_acts \
            if user_act.type == UserActionType.Inform and user_act.slot == 'direction'][0]

        if not topics:
            return { 'sys_acts': [SysAct(SysActionType.Bad)] }

        # currently, short answers are used for world knowledge
        answers = self._get_short_answers(relation, topics, direction)

        sys_acts = [SysAct(SysActionType.InformByName, slot_values=answer) for answer in answers]

        self.debug_logger.dialog_turn("System Action: " + '; '.join(
            [str(sys_act) for sys_act in sys_acts]))
        return {'sys_acts': sys_acts}

    def _get_short_answers(self, relation: str, topics: List[str], direction: str) \
        -> dict(answer=str):
        """Looks up answers and only returns the answer string"""
        answers = []
        for topic in topics:
            triples = self.domain.find_entities({
                'relation': relation,
                'topic': topic,
                'direction': direction
            })
            for triple in triples:
                if direction == 'in':
                    answers.append({'answer': triple['subject']})
                else:
                    answers.append({'answer': triple['object']})
        if not answers:
            answers.append({'answer': 'Sorry, I don\'t know.'})
        return answers

    def _get_triples(self, relation, topics, direction):
        """Looks up answers and stores them as triples"""
        answers = []
        for topic in topics:
            answers.extend(self.domain.find_entities({
                'relation': relation,
                'topic': topic,
                'direction': direction
            }))
        if not answers:
            for topic in topics:
                answers.append({
                    'subject': 'unknown' if direction == 'in' else topic,
                    'predicate': relation,
                    'object': 'unknown' if direction == 'out' else topic
                })
        return answers
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>) special
Source code in adviser/examples/qa/worldknowledge/policyqa.py
def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
    # only call super class' constructor
    Service.__init__(self, domain=domain, debug_logger=logger)
generate_sys_acts(self, *args, **kwargs)
Source code in adviser/examples/qa/worldknowledge/policyqa.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result

semanticparser

QuestionParser (Service)

Semantic parsing module for question answering

Attributes:

Name Type Description
device torch.device

PyTorch device object, either CPU or GPU

nn_relation nn.Module

neural network for relation prediction

nn_entity nn.Module

neural network for topic entity prediction

nn_direction nn.Module

neural network for relation direction prediction

tags List[str]

relation tags

max_seq_len int

maximum number of tokens per question

embedding_creator BertEmbedding

object creating BERT embeddings

Source code in adviser/examples/qa/worldknowledge/semanticparser.py
class QuestionParser(Service):
    """Semantic parsing module for question answering

    Attributes:
        device (torch.device): PyTorch device object, either CPU or GPU
        nn_relation (nn.Module): neural network for relation prediction
        nn_entity (nn.Module): neural network for topic entity prediction
        nn_direction (nn.Module): neural network for relation direction prediction
        tags (List[str]): relation tags
        max_seq_len (int): maximum number of tokens per question
        embedding_creator (BertEmbedding): object creating BERT embeddings
    """

    def __init__(self, domain: LookupDomain, \
        logger: DiasysLogger = DiasysLogger(), device: str = 'cpu', cache_dir: str = None):
        """Creates neural networks for semantic parsing and other required utils

        Args:
            domain: the QA domain
            logger: the logger
            device: PyTorch device name
            cache_dir: the cache directory for transformers' models
        """
        Service.__init__(self, domain=domain, debug_logger=logger)
        self.device = torch.device(device)
        self.nn_relation = self._load_relation_model()
        self.nn_entity = self._load_entity_model()
        self.nn_direction = self._load_direction_model()

        self.tags = self._load_tag_set()

        self.max_seq_len = 40
        self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
        self.embedder = BertModel.from_pretrained('bert-base-uncased', cache_dir=cache_dir)

    def _load_relation_model(self):
        model = SimpleDot(100, 400, True).to(self.device)
        model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'simpledot.pt'), map_location=self.device))
        model.eval()
        return model

    def _load_entity_model(self):
        model = Tagger(100, 400)
        model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'tagger.pt'), map_location=self.device))
        model.eval()
        return model

    def _load_direction_model(self):
        model = Classifier(75, 500, 2).to(self.device)
        model.load_state_dict(torch.load(os.path.join(get_root_dir(), 'resources', 'models', 'qa', 'director.pt'), map_location=self.device))
        model.eval()
        return model

    def _load_tag_set(self):
        csqa_tags = []
        with open(os.path.join(get_root_dir(), 'resources', 'ontologies', 'qa', 'csqa_tags.json')) as f:
            csqa_tags = json.load(f)
        return csqa_tags

    @PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
    def parse_user_utterance(self, user_utterance: str = None) -> dict(user_acts=List[UserAct]):
        """Parses the user utterance.

        Responsible for detecting user acts with their respective slot-values from the user
        utterance by predicting relation, topic entities and the relation's direction.

        Args:
            user_utterance: the last user input as string

        Returns:
            A dictionary with the key "user_acts" and the value containing a list of user actions
        """
        result = {}
        self.user_acts = []

        user_utterance = user_utterance.strip()

        if not user_utterance:
            return {'user_acts': None}
        elif user_utterance.lower().replace(' ', '').endswith('bye'):
            return {'user_acts': [UserAct(user_utterance, UserActionType.Bye)]}

        if self.domain.get_keyword() in user_utterance.lower():
            self.user_acts.append(UserAct(user_utterance, UserActionType.SelectDomain))
            begin_idx = user_utterance.lower().find(self.domain.get_keyword())
            user_utterance = user_utterance.lower().replace(self.domain.get_keyword(), "")
            if len(user_utterance) == 0:
                return {'user_acts': self.user_acts}

        tokens, embeddings = self._preprocess_utterance(user_utterance)

        relation_out = self._predict_relation(embeddings)
        entities_out = self._predict_topic_entities(embeddings)
        direction_out = self._predict_direction(embeddings)

        relation_pred = self._lookup_relation(relation_out)
        entities_pred = extract_entities(tokens, entities_out[:,0])
        direction_pred = self._lookup_direction(direction_out)

        self.user_acts.extend([
            UserAct(user_utterance, UserActionType.Inform, 'relation', relation_pred, 1.0),
            UserAct(user_utterance, UserActionType.Inform, 'direction', direction_pred, 1.0)
        ])
        for t in entities_pred:
            self.user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'topic', self.tokenizer.convert_tokens_to_string(t), 1.0))

        result['user_acts'] = self.user_acts
        self.debug_logger.dialog_turn("User Actions: %s" % str(self.user_acts))
        return result

    def _preprocess_utterance(self, utterance):
        encoded_input = self.tokenizer(utterance, return_tensors='pt', truncation=True, max_length=self.max_seq_len)
        with torch.no_grad():
            embeddings: torch.Tensor = self.embedder(**encoded_input).last_hidden_state
        embeddings = torch.cat((embeddings, embeddings.new_zeros(1,self.max_seq_len - embeddings.size(1),768)), dim=1)
        return self.tokenizer.tokenize(utterance, add_special_tokens=False), embeddings.permute(1,0,2)

    def _predict_relation(self, embeddings):
        with torch.no_grad():
            rel_scores = self.nn_relation(embeddings)
        _, pred_rel = torch.max(rel_scores, 1)
        return pred_rel

    def _lookup_relation(self, prediction):
        return self.tags[int(prediction[0])]

    def _predict_topic_entities(self, embeddings):
        with torch.no_grad():
            ent_scores = self.nn_entity(embeddings)
        _, preds_ent = torch.max(ent_scores, 2)
        return preds_ent

    def _predict_direction(self, embeddings):
        with torch.no_grad():
            tag_scores = self.nn_direction(embeddings)
        _, preds = torch.max(tag_scores, 1)
        return preds

    def _lookup_direction(self, prediction):
        return ['out', 'in'][int(prediction[0])]
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>, device='cpu', cache_dir=None) special

Creates neural networks for semantic parsing and other required utils

Parameters:

Name Type Description Default
domain LookupDomain

the QA domain

required
logger DiasysLogger

the logger

<DiasysLogger adviser (NOTSET)>
device str

PyTorch device name

'cpu'
cache_dir str

the cache directory for transformers' models

None
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
def __init__(self, domain: LookupDomain, \
    logger: DiasysLogger = DiasysLogger(), device: str = 'cpu', cache_dir: str = None):
    """Creates neural networks for semantic parsing and other required utils

    Args:
        domain: the QA domain
        logger: the logger
        device: PyTorch device name
        cache_dir: the cache directory for transformers' models
    """
    Service.__init__(self, domain=domain, debug_logger=logger)
    self.device = torch.device(device)
    self.nn_relation = self._load_relation_model()
    self.nn_entity = self._load_entity_model()
    self.nn_direction = self._load_direction_model()

    self.tags = self._load_tag_set()

    self.max_seq_len = 40
    self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
    self.embedder = BertModel.from_pretrained('bert-base-uncased', cache_dir=cache_dir)
parse_user_utterance(self, *args, **kwargs)
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result
get_root_dir()
Source code in adviser/examples/qa/worldknowledge/semanticparser.py
def get_root_dir():
    return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

webapi special

mensa special

domain

SLOT_VALUES
MensaDomain (LookupDomain)

Domain for the Mensa API

Attributes:

Name Type Description
parser MensaParser

HTML file parser for dynamically building a pseudo database

last_results List[dict]

Current results which the user might request info about

Source code in adviser/examples/webapi/mensa/domain.py
class MensaDomain(LookupDomain):
    """Domain for the Mensa API

    Attributes:
        parser (MensaParser): HTML file parser for dynamically building a pseudo database
        last_results (List[dict]): Current results which the user might request info about
    """

    def __init__(self):
        LookupDomain.__init__(self, 'MensaAPI', 'Mensa Food')
        self.parser = MensaParser()
        self.last_results = []

    def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
        """ Returns all entities from the data backend that meet the constraints.

        Args:
            constraints (dict): Slot-value mapping of constraints.
                                If empty, all entities in the database will be returned.
            requested_slots (Iterable): list of slots that should be returned in addition to the
                                        system requestable slots and the primary key
        """
        if 'day' in constraints:
            meals = self.parser.get_meals(constraints['day'])
            results = [meal.as_dict() for meal in meals]
            for slot in constraints:
                if slot == 'day':
                    continue
                results = [candidate for candidate in results if candidate[slot] == constraints[slot]]
            for i, result in enumerate(results):
                result['artificial_id'] = i+1
            if list(requested_slots):
                cleaned_results = [{slot: result_dict[slot] for slot in requested_slots} for result_dict in results]
            else:
                cleaned_results = results
            self.last_results = results
            return cleaned_results
        else:
            return []

    def find_info_about_entity(self, entity_id: str, requested_slots: Iterable):
        """ Returns the values (stored in the data backend) of the specified slots for the
            specified entity.

        Args:
            entity_id (str): primary key value of the entity
            requested_slots (dict): slot-value mapping of constraints
        """
        result = {slot: self.last_results[int(entity_id)-1][slot] for slot in requested_slots}
        result['artificial_id'] = entity_id
        return [result]

    def get_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the user. """
        return ['name', 'type', 'price', 'allergens', 'vegan', 'vegetarian', 'fish', 'pork']

    def get_system_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the system. """
        return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']

    def get_informable_slots(self) -> List[str]:
        """ Returns a list of all informable slots. """
        return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']

    def get_mandatory_slots(self) -> List[str]:
        """ Returns a list of all mandatory slots. """
        return ['day']

    def get_default_inform_slots(self) -> List[str]:
        """ Returns a list of all default Inform slots. """
        return ['name']

    def get_possible_values(self, slot: str) -> List[str]:
        """ Returns all possible values for an informable slot

        Args:
            slot (str): name of the slot

        Returns:
            a list of strings, each string representing one possible value for
            the specified slot.
        """
        assert slot in SLOT_VALUES
        return SLOT_VALUES[slot]

    def get_primary_key(self) -> str:
        """ Returns the slot name that will be used as the 'name' of an entry """
        return 'artificial_id'

    def get_keyword(self):
        return 'mensa'
__init__(self) special
Source code in adviser/examples/webapi/mensa/domain.py
def __init__(self):
    LookupDomain.__init__(self, 'MensaAPI', 'Mensa Food')
    self.parser = MensaParser()
    self.last_results = []
find_entities(self, constraints, requested_slots=<tuple_iterator object at 0x7f1ef2fb4580>)

Returns all entities from the data backend that meet the constraints.

Parameters:

Name Type Description Default
constraints dict

Slot-value mapping of constraints. If empty, all entities in the database will be returned.

required
requested_slots Iterable

list of slots that should be returned in addition to the system requestable slots and the primary key

<tuple_iterator object at 0x7f1ef2fb4580>
Source code in adviser/examples/webapi/mensa/domain.py
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
    """ Returns all entities from the data backend that meet the constraints.

    Args:
        constraints (dict): Slot-value mapping of constraints.
                            If empty, all entities in the database will be returned.
        requested_slots (Iterable): list of slots that should be returned in addition to the
                                    system requestable slots and the primary key
    """
    if 'day' in constraints:
        meals = self.parser.get_meals(constraints['day'])
        results = [meal.as_dict() for meal in meals]
        for slot in constraints:
            if slot == 'day':
                continue
            results = [candidate for candidate in results if candidate[slot] == constraints[slot]]
        for i, result in enumerate(results):
            result['artificial_id'] = i+1
        if list(requested_slots):
            cleaned_results = [{slot: result_dict[slot] for slot in requested_slots} for result_dict in results]
        else:
            cleaned_results = results
        self.last_results = results
        return cleaned_results
    else:
        return []
find_info_about_entity(self, entity_id, requested_slots)

Returns the values (stored in the data backend) of the specified slots for the specified entity.

Parameters:

Name Type Description Default
entity_id str

primary key value of the entity

required
requested_slots dict

slot-value mapping of constraints

required
Source code in adviser/examples/webapi/mensa/domain.py
def find_info_about_entity(self, entity_id: str, requested_slots: Iterable):
    """ Returns the values (stored in the data backend) of the specified slots for the
        specified entity.

    Args:
        entity_id (str): primary key value of the entity
        requested_slots (dict): slot-value mapping of constraints
    """
    result = {slot: self.last_results[int(entity_id)-1][slot] for slot in requested_slots}
    result['artificial_id'] = entity_id
    return [result]
get_default_inform_slots(self)

Returns a list of all default Inform slots.

Source code in adviser/examples/webapi/mensa/domain.py
def get_default_inform_slots(self) -> List[str]:
    """ Returns a list of all default Inform slots. """
    return ['name']
get_informable_slots(self)

Returns a list of all informable slots.

Source code in adviser/examples/webapi/mensa/domain.py
def get_informable_slots(self) -> List[str]:
    """ Returns a list of all informable slots. """
    return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']
get_keyword(self)
Source code in adviser/examples/webapi/mensa/domain.py
def get_keyword(self):
    return 'mensa'
get_mandatory_slots(self)

Returns a list of all mandatory slots.

Source code in adviser/examples/webapi/mensa/domain.py
def get_mandatory_slots(self) -> List[str]:
    """ Returns a list of all mandatory slots. """
    return ['day']
get_possible_values(self, slot)

Returns all possible values for an informable slot

Parameters:

Name Type Description Default
slot str

name of the slot

required

Returns:

Type Description
List[str]

a list of strings, each string representing one possible value for the specified slot.

Source code in adviser/examples/webapi/mensa/domain.py
def get_possible_values(self, slot: str) -> List[str]:
    """ Returns all possible values for an informable slot

    Args:
        slot (str): name of the slot

    Returns:
        a list of strings, each string representing one possible value for
        the specified slot.
    """
    assert slot in SLOT_VALUES
    return SLOT_VALUES[slot]
get_primary_key(self)

Returns the slot name that will be used as the 'name' of an entry

Source code in adviser/examples/webapi/mensa/domain.py
def get_primary_key(self) -> str:
    """ Returns the slot name that will be used as the 'name' of an entry """
    return 'artificial_id'
get_requestable_slots(self)

Returns a list of all slots requestable by the user.

Source code in adviser/examples/webapi/mensa/domain.py
def get_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the user. """
    return ['name', 'type', 'price', 'allergens', 'vegan', 'vegetarian', 'fish', 'pork']
get_system_requestable_slots(self)

Returns a list of all slots requestable by the system.

Source code in adviser/examples/webapi/mensa/domain.py
def get_system_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the system. """
    return ['day', 'type', 'vegan', 'vegetarian', 'fish', 'pork']

nlu

MensaNLU (HandcraftedNLU)

Adapted handcrafted NLU for the mensa domain.

The default handcrafted NLU is adapted to automatically add the user act request(name). This is necessary because the name is not the primary key, i.e. it is not printed by default once an element is found. To force the Policy to automatically inform about the name, too, a request for the name is added in each turn.

Source code in adviser/examples/webapi/mensa/nlu.py
class MensaNLU(HandcraftedNLU):
    """Adapted handcrafted NLU for the mensa domain.

    The default handcrafted NLU is adapted to automatically add the user act request(name).
    This is necessary because the name is not the primary key, i.e. it is not printed by default
    once an element is found. To force the Policy to automatically inform about the name, too,
    a request for the name is added in each turn.
    """

    def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
        # only calls super class' constructor
        HandcraftedNLU.__init__(self, domain, logger)

    @PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
    def extract_user_acts(self, user_utterance: str = None, sys_act: SysAct = None, beliefstate: BeliefState = None) \
            -> dict(user_acts=List[UserAct]):
        """Original code but adapted to automatically add a request(name) act"""
        result = {}

        # Setting request everything to False at every turn
        self.req_everything = False

        self.user_acts = []

        # slots_requested & slots_informed store slots requested and informed in this turn
        # they are used later for later disambiguation
        self.slots_requested, self.slots_informed = set(), set()
        if user_utterance is not None:
            user_utterance = user_utterance.strip()
            self._match_general_act(user_utterance)
            self._match_domain_specific_act(user_utterance)

        # Solving ambiguities from regexes, especially with requests and informs happening
        # simultaneously on the same slot and two slots taking the same value
        self._disambiguate_co_occurrence(beliefstate)
        self._solve_informable_values()

        # If nothing else has been matched, see if the user chose a domain; otherwise if it's
        # not the first turn, it's a bad act
        if len(self.user_acts) == 0:
            if self.domain.get_keyword() in user_utterance:
                self.user_acts.append(UserAct(text=user_utterance if user_utterance else "",
                                              act_type=UserActionType.SelectDomain))
            elif self.sys_act_info['last_act'] is not None:
                # start of dialogue or no regex matched
                self.user_acts.append(UserAct(text=user_utterance if user_utterance else "",
                                              act_type=UserActionType.Bad))

        self._assign_scores()
        result['user_acts'] = self.user_acts
        self.logger.dialog_turn("User Actions: %s" % str(self.user_acts))
        return result
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>) special
Source code in adviser/examples/webapi/mensa/nlu.py
def __init__(self, domain: LookupDomain, logger: DiasysLogger = DiasysLogger()):
    # only calls super class' constructor
    HandcraftedNLU.__init__(self, domain, logger)
extract_user_acts(self, *args, **kwargs)
Source code in adviser/examples/webapi/mensa/nlu.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result

parser

Allergen (Enum)

This enum provides the allergens used in the mensa menu.

Source code in adviser/examples/webapi/mensa/parser.py
class Allergen(Enum):
	"""This enum provides the allergens used in the mensa menu."""

	Egg = 'Ei'
	Peanut = 'En'
	Fish = 'Fi'
	Wheat = 'GlW'
	Spelt = 'GlD'
	KhorasanWheat = 'GlKW'
	Rye = 'GlR'
	Barley = 'GlG'
	Millet = 'GlH'
	Shellfishes = 'Kr'
	Lactose = 'La'
	Lupin = 'Lu'
	Almonds = 'NuM'
	Hazelnuts = 'NuH'
	Walnuts = 'NuW'
	Cashews = 'NuC'
	Pecan = 'NuPe'
	BrazilNut = 'NuPa'
	Pistachio = 'NuPi'
	Macadamia = 'NuMa'
	Sesame = 'Se'
	Mustard = 'Sf'
	Celery = 'Sl'
	Soy = 'So'
	Sulfite = 'Sw'
	Mollusca = 'Wt'
Almonds
Barley
BrazilNut
Cashews
Celery
Egg
Fish
Hazelnuts
KhorasanWheat
Lactose
Lupin
Macadamia
Millet
Mollusca
Mustard
Peanut
Pecan
Pistachio
Rye
Sesame
Shellfishes
Soy
Spelt
Sulfite
Walnuts
Wheat
DishType (Enum)

This enum provides the dish types used in the mensa menu.

Source code in adviser/examples/webapi/mensa/parser.py
class DishType(Enum):
	"""This enum provides the dish types used in the mensa menu."""

	Starter = 'starter'
	Buffet = 'buffet'
	MainDish = 'main_dish'
	SideDish = 'side_dish'
	Dessert = 'dessert'

	@staticmethod
	def from_website_name(website_name: str) -> 'DishType':
		"""Converts the type as listed on the website into the type used in the dialog system.

		Args:
			website_name: The name as used in the response to the POST request.

		Returns:
			The corresponding enum member.

		"""


		if website_name == 'STARTER':
			return DishType.Starter
		elif website_name == 'BUFFET':
			return DishType.Buffet
		elif website_name == 'MAIN DISH':
			return DishType.MainDish
		elif website_name == 'SIDE DISH':
			return DishType.SideDish
		elif website_name == 'DESSERT':
			return DishType.Dessert
Buffet
Dessert
MainDish
SideDish
Starter
Location (Enum)

This enum provides the possible mensa locations.

Source code in adviser/examples/webapi/mensa/parser.py
class Location(Enum):
	"""This enum provides the possible mensa locations."""

	STUTTGART_MITTE = 3
	MUSIKHOCHSCHULE = 4
	KUNSTAKADEMIE = 7
	STUTTGART_VAIHINGEN = 2
	LUDWIGSBURG = 1
	FLANDERNSTRASSE = 6
	ESSLINGEN_STADTMITTE = 9
ESSLINGEN_STADTMITTE
FLANDERNSTRASSE
KUNSTAKADEMIE
LUDWIGSBURG
MUSIKHOCHSCHULE
STUTTGART_MITTE
STUTTGART_VAIHINGEN
Meal
Source code in adviser/examples/webapi/mensa/parser.py
class Meal():
	def __init__(self, name: str, day: str, prices: Tuple[float], price_quantity: str,\
		allergens:List[Allergen], vegan: bool, vegetarian: bool, fish: bool, pork: bool,\
			dish_type: DishType):
		"""The class for a  meal consisting of a name and several properties (slot-value pairs).

		Args:
			name: The name of the meal.
			day: The day on which the meal is offered.
			prices: The price for students and guests.
			price_quantity: The unit for which the price is valid.
			allergens: The allergens of this meal.
			vegan: Whether the meal is vegan or not.
			vegetarian: Whether the meal is vegetarian or not.
			fish: Whether the meal contains fish or not.
			pork: Whether the meal contains pork or not.
			dish_type: The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet)

		"""
		self.name = name
		self.day = day
		self.prices = prices
		self.price_quantity = price_quantity
		self.allergens = allergens
		self.vegan = vegan
		self.vegetarian = vegetarian
		self.fish = fish
		self.pork = pork
		self.dish_type = dish_type


	def as_dict(self) -> Dict[str, str]:
		"""A dict representation of the meal."""

		return {
			'name': self.name,
			'day': self.day,
			'type': self.dish_type.value,
			'price': str(self.prices[0]),
			'allergens': ', '.join([allergen.value for allergen in self.allergens]) if\
				self.allergens is not None else 'none',
			'vegan': str(self.vegan).lower(),
			'vegetarian': str(self.vegetarian).lower(),
			'fish': str(self.fish).lower(),
			'pork': str(self.pork).lower()
			}

	def __str__(self) -> str:
		"""The string representation of the meal."""

		return (f"Meal(name={self.name}, day={self.day}, prices={self.prices},\
			price_quantity={self.price_quantity}, "
			f"allergens={self.allergens}, vegan={self.vegan}, vegetarian={self.vegetarian}, "
			f"fish={self.fish}, pork={self.pork}, dish_type={self.dish_type})")

	def __repr__(self) -> str:
		"""The string representation of the meal."""

		return str(self)
__init__(self, name, day, prices, price_quantity, allergens, vegan, vegetarian, fish, pork, dish_type) special

The class for a meal consisting of a name and several properties (slot-value pairs).

Parameters:

Name Type Description Default
name str

The name of the meal.

required
day str

The day on which the meal is offered.

required
prices Tuple[float]

The price for students and guests.

required
price_quantity str

The unit for which the price is valid.

required
allergens List[adviser.examples.webapi.mensa.parser.Allergen]

The allergens of this meal.

required
vegan bool

Whether the meal is vegan or not.

required
vegetarian bool

Whether the meal is vegetarian or not.

required
fish bool

Whether the meal contains fish or not.

required
pork bool

Whether the meal contains pork or not.

required
dish_type DishType

The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet)

required
Source code in adviser/examples/webapi/mensa/parser.py
def __init__(self, name: str, day: str, prices: Tuple[float], price_quantity: str,\
	allergens:List[Allergen], vegan: bool, vegetarian: bool, fish: bool, pork: bool,\
		dish_type: DishType):
	"""The class for a  meal consisting of a name and several properties (slot-value pairs).

	Args:
		name: The name of the meal.
		day: The day on which the meal is offered.
		prices: The price for students and guests.
		price_quantity: The unit for which the price is valid.
		allergens: The allergens of this meal.
		vegan: Whether the meal is vegan or not.
		vegetarian: Whether the meal is vegetarian or not.
		fish: Whether the meal contains fish or not.
		pork: Whether the meal contains pork or not.
		dish_type: The type of the dish. (Starter, Buffet, Main Dish, Side Dish or Buffet)

	"""
	self.name = name
	self.day = day
	self.prices = prices
	self.price_quantity = price_quantity
	self.allergens = allergens
	self.vegan = vegan
	self.vegetarian = vegetarian
	self.fish = fish
	self.pork = pork
	self.dish_type = dish_type
__repr__(self) special

The string representation of the meal.

Source code in adviser/examples/webapi/mensa/parser.py
def __repr__(self) -> str:
	"""The string representation of the meal."""

	return str(self)
__str__(self) special

The string representation of the meal.

Source code in adviser/examples/webapi/mensa/parser.py
def __str__(self) -> str:
	"""The string representation of the meal."""

	return (f"Meal(name={self.name}, day={self.day}, prices={self.prices},\
		price_quantity={self.price_quantity}, "
		f"allergens={self.allergens}, vegan={self.vegan}, vegetarian={self.vegetarian}, "
		f"fish={self.fish}, pork={self.pork}, dish_type={self.dish_type})")
as_dict(self)

A dict representation of the meal.

Source code in adviser/examples/webapi/mensa/parser.py
def as_dict(self) -> Dict[str, str]:
	"""A dict representation of the meal."""

	return {
		'name': self.name,
		'day': self.day,
		'type': self.dish_type.value,
		'price': str(self.prices[0]),
		'allergens': ', '.join([allergen.value for allergen in self.allergens]) if\
			self.allergens is not None else 'none',
		'vegan': str(self.vegan).lower(),
		'vegetarian': str(self.vegetarian).lower(),
		'fish': str(self.fish).lower(),
		'pork': str(self.pork).lower()
		}
MensaParser
Source code in adviser/examples/webapi/mensa/parser.py
class MensaParser():
	def __init__(self, cache: bool = True):
		"""
		The class to issue post requests and parse the response. Will also take care of caching the
		parser's results.

		Args:
			cache (bool): Whether to cache results or not.

		"""

		#: dict of str: storgae to cache parsed meals
		self.storage = {}
		self.cache = cache

	def _parse(self, date: datetime.datetime) -> List[Meal]:
		"""
		Issues a request for the given date. The response will be parsed and a list of meals
		returned.

        Args:
            date: The date for which the data will be parsed.

        Returns:
            :obj:`list` of Meal: List of parsed meals

        """

		date_str = date.strftime('%Y-%m-%d')
		date_next_week_str = (date + datetime.timedelta(days=7)).strftime('%Y-%m-%d')

		data = {
		'func': 'make_spl',
		# currently we stick with only one mensa location
		'locId': Location.STUTTGART_VAIHINGEN.value,
		'date': date_str,
		'lang': 'en',
		'startThisWeek': date_str,
		'startNextWeek': date_next_week_str
		}

		# issue post request
		response = requests.post('https://sws2.maxmanager.xyz/inc/ajax-php_konnektor.inc.php',\
			headers={}, cookies={}, data=data)
		tree = html.fromstring(response.content.decode(response.encoding))
		meals = [self._parse_meal(meal, date.strftime('%A')) for meal in\
			tree.xpath('//div[contains(@class, "splMeal")]')]

		if self.storage:
			self.storage[date] = meals

		return meals

	def _parse_meal(self, meal: html.HtmlElement, day: str) -> Meal:
		"""Parse all necessary properties of a meal from html.

		Args:
			meal: The html.HtmlElement which will be parsed.
			day: The day for which this meal is valid.
		"""

		# name
		name = meal.xpath('./div[1]/span/text()')[0].strip()
		# price & quantity
		div_price =\
			meal.xpath('./div[4]/div[1]/text()[preceding-sibling::br or following-sibling::br]')
		prices = re.search(r'(\d*,\d*).*?(\d*,\d*)', div_price[0]).groups()
		# substitute comma by dot to correctly parse float
		prices = tuple(map(lambda price: float(price.replace(',','.')), prices))
		if len(div_price) > 1:
			price_quantity = re.search(r'\(per (\d*.?)\)', div_price[1]).group(1)
		else:
			price_quantity = "plate"
		# allergens
		allergens = meal.xpath('./div[3]/div[1]/div[contains(@class, "azn") and\
			not(contains(@class, "ptr")) and\
				not(contains(./preceding-sibling::div/@class, "hidden"))]/text()')
		if len(allergens) > 8:
			# allergens are included
			allergens = [Allergen(allergen) for allergen in allergens[0].strip().strip('()')\
				.split(', ') if allergen != '' and allergen not in map(str, list(range(1,12)))]
		else:
			# there are no allergens
			allergens = None
		# some tags / binary slots
		tags = meal.xpath('./div[4]/div[2]/img/@title')
		vegan = 'vegan' in tags
		vegetarian = 'vegetarian' in tags
		fish = 'MSC (MSC-C-51632)' in tags
		pork = 'pork' in tags or 'beef/pork' in tags
		dish_type = meal.xpath('./preceding-sibling::div[contains(@class, "gruppenkopf")][1]\
			/div[contains(@class, "gruppenname")]/text()')[0]

		return Meal(name, day, prices, price_quantity, allergens, vegan, vegetarian, fish, pork,\
			DishType.from_website_name(dish_type))

	def _parse_date(self, date: str) -> datetime.datetime:
		"""Parse the given string as date. Allowed is a date given as Y-m-d or one of today,
		tomorrow and monday-sunday.

		Raises:
			ParseDateError: If ``date`` could not be parsed.
		"""

		try:
			# try to parse date
			date = datetime.datetime.strptime(date, '%Y-%m-%d')
		except ValueError:
			# cover some specific cases, e.g. today, tomorrow, wednesday
			weekdays =\
				['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
			if date == 'today':
				date = datetime.datetime.today()
			elif date == 'tomorrow':
				date = datetime.datetime.today() + datetime.timedelta(days=1)
			elif date.lower() in weekdays:
				today = datetime.datetime.today().weekday()
				weekday = weekdays.index(date.lower())
				if weekday <= today:
					# if today is named shall we consider the weekday next week instead? (<= vs. <)
					weekday += 7
				date = datetime.datetime.today() + datetime.timedelta(days=weekday-today)
			else:
				raise ParseDateError

		return date

	def get_meals(self, date: str, use_cache: bool = True) -> List[Meal]:
		"""
		Gets the meals for a specified day by either looking them up in the cache or by issuing and
		parsing a post request.

		Args:
            date (str): The date for which the data will be returned.
				Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
            use_cache (bool): If False will always query the server instead of using the cache.

        Returns:
            :obj:`list` of Meal: List of meals for specified date
		"""

		date = self._parse_date(date)
		if use_cache and date.date() in self.storage:
			# NOTE data could be deprecated
			return self.storage[date.date()]
		else:
			# issue request to server
			return self._parse(date)
__init__(self, cache=True) special

The class to issue post requests and parse the response. Will also take care of caching the parser's results.

Parameters:

Name Type Description Default
cache bool

Whether to cache results or not.

True
Source code in adviser/examples/webapi/mensa/parser.py
def __init__(self, cache: bool = True):
	"""
	The class to issue post requests and parse the response. Will also take care of caching the
	parser's results.

	Args:
		cache (bool): Whether to cache results or not.

	"""

	#: dict of str: storgae to cache parsed meals
	self.storage = {}
	self.cache = cache
get_meals(self, date, use_cache=True)
    Gets the meals for a specified day by either looking them up in the cache or by issuing and
    parsing a post request.

    Args:
date (str): The date for which the data will be returned.
                    Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
use_cache (bool): If False will always query the server instead of using the cache.

Returns:

Type Description

obj:list of Meal: List of meals for specified date

Source code in adviser/examples/webapi/mensa/parser.py
	def get_meals(self, date: str, use_cache: bool = True) -> List[Meal]:
		"""
		Gets the meals for a specified day by either looking them up in the cache or by issuing and
		parsing a post request.

		Args:
            date (str): The date for which the data will be returned.
				Can be a string in the format 'Y-m-d' or one of today, tomorrow and monday-sunday.
            use_cache (bool): If False will always query the server instead of using the cache.

        Returns:
            :obj:`list` of Meal: List of meals for specified date
		"""

		date = self._parse_date(date)
		if use_cache and date.date() in self.storage:
			# NOTE data could be deprecated
			return self.storage[date.date()]
		else:
			# issue request to server
			return self._parse(date)
ParseDateError (Exception)

This exception is raised when the date cannot be parsed.

Source code in adviser/examples/webapi/mensa/parser.py
class ParseDateError(Exception):
	"""This exception is raised when the date cannot be parsed."""
	pass

weather special

domain

API_KEY
WeatherDomain (LookupDomain)

Domain for the Weather API.

Attributes:

Name Type Description
last_results List[dict]

Current results which the user might request info about

Source code in adviser/examples/webapi/weather/domain.py
class WeatherDomain(LookupDomain):
    """Domain for the Weather API.

    Attributes:
        last_results (List[dict]): Current results which the user might request info about
    """

    def __init__(self):
        LookupDomain.__init__(self, 'WeatherAPI', 'Weather')

        self.last_results = []

    def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
        """ Returns all entities from the data backend that meet the constraints.

        Args:
            constraints (dict): Slot-value mapping of constraints.
                                If empty, all entities in the database will be returned.
            requested_slots (Iterable): list of slots that should be returned in addition to the
                                        system requestable slots and the primary key
        """
        if 'location' in constraints and 'date' in constraints:

            forecast = self._query(constraints['location'], constraints['date'])
            if forecast is None:
                return []

            temperature = int('%.0f' % (float(forecast['main']['temp']) - 273.15))
            description = forecast['weather'][0]['description']

            result_dict = {
                'artificial_id': str(len(self.last_results)),
                'temperature': temperature,
                'description': description,
                'location': constraints['location'],
                'date': constraints['date'],
            }
            if any(True for _ in requested_slots):
                cleaned_result_dict = {slot: result_dict[slot] for slot in requested_slots}
            else:
                cleaned_result_dict = result_dict
            self.last_results.append(cleaned_result_dict)
            return [cleaned_result_dict]
        else:
            return []

    def find_info_about_entity(self, entity_id, requested_slots: Iterable):
        """ Returns the values (stored in the data backend) of the specified slots for the
            specified entity.

        Args:
            entity_id (str): primary key value of the entity
            requested_slots (dict): slot-value mapping of constraints
        """
        return [self.last_results[int(entity_id)]]

    def get_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the user. """
        return ['temperature', 'description']

    def get_system_requestable_slots(self) -> List[str]:
        """ Returns a list of all slots requestable by the system. """
        return ['location', 'date']

    def get_informable_slots(self) -> List[str]:
        """ Returns a list of all informable slots. """
        return ['location', 'date']

    def get_mandatory_slots(self) -> List[str]:
        """ Returns a list of all mandatory slots. """
        return ['location', 'date']

    def get_default_inform_slots(self) -> List[str]:
        """ Returns a list of all default Inform slots. """
        return ['temperature', 'description']

    def get_possible_values(self, slot: str) -> List[str]:
        """ Returns all possible values for an informable slot

        Args:
            slot (str): name of the slot

        Returns:
            a list of strings, each string representing one possible value for
            the specified slot.
         """
        raise BaseException('all slots in this domain do not have a fixed set of '
                            'values, so this method should never be called')

    def get_primary_key(self) -> str:
        """ Returns the slot name that will be used as the 'name' of an entry """
        return 'artificial_id'

    def _query(self, location, date):
        """if location is None:
            location = 'Stuttgart'
        if date is None:
            date = datetime.now()"""

        url = f'http://api.openweathermap.org/data/2.5/forecast?q={location}&APPID={API_KEY}'
        try:
            f = urlopen(url)
            forecasts = json.loads(f.read())['list']
            return self._find_closest(forecasts, date)
        except BaseException as e:
            raise(e)
            return None

    def _find_closest(self, forecasts, preferred_time):
        """ From a list of forecasts, find the one which is closest to the specified time"""
        closest_forecast = None
        closest_difference = None
        for forecast in forecasts:
            forecast_time = datetime.fromtimestamp(int(forecast['dt']))
            time_difference = abs(preferred_time - forecast_time)
            if closest_forecast is None or time_difference < closest_difference:
                closest_forecast = forecast
                closest_difference = time_difference
            else:
                return closest_forecast
        return None

    def get_keyword(self):
        return 'weather'
__init__(self) special
Source code in adviser/examples/webapi/weather/domain.py
def __init__(self):
    LookupDomain.__init__(self, 'WeatherAPI', 'Weather')

    self.last_results = []
find_entities(self, constraints, requested_slots=<tuple_iterator object at 0x7f1ef2506700>)

Returns all entities from the data backend that meet the constraints.

Parameters:

Name Type Description Default
constraints dict

Slot-value mapping of constraints. If empty, all entities in the database will be returned.

required
requested_slots Iterable

list of slots that should be returned in addition to the system requestable slots and the primary key

<tuple_iterator object at 0x7f1ef2506700>
Source code in adviser/examples/webapi/weather/domain.py
def find_entities(self, constraints: dict, requested_slots: Iterable = iter(())):
    """ Returns all entities from the data backend that meet the constraints.

    Args:
        constraints (dict): Slot-value mapping of constraints.
                            If empty, all entities in the database will be returned.
        requested_slots (Iterable): list of slots that should be returned in addition to the
                                    system requestable slots and the primary key
    """
    if 'location' in constraints and 'date' in constraints:

        forecast = self._query(constraints['location'], constraints['date'])
        if forecast is None:
            return []

        temperature = int('%.0f' % (float(forecast['main']['temp']) - 273.15))
        description = forecast['weather'][0]['description']

        result_dict = {
            'artificial_id': str(len(self.last_results)),
            'temperature': temperature,
            'description': description,
            'location': constraints['location'],
            'date': constraints['date'],
        }
        if any(True for _ in requested_slots):
            cleaned_result_dict = {slot: result_dict[slot] for slot in requested_slots}
        else:
            cleaned_result_dict = result_dict
        self.last_results.append(cleaned_result_dict)
        return [cleaned_result_dict]
    else:
        return []
find_info_about_entity(self, entity_id, requested_slots)

Returns the values (stored in the data backend) of the specified slots for the specified entity.

Parameters:

Name Type Description Default
entity_id str

primary key value of the entity

required
requested_slots dict

slot-value mapping of constraints

required
Source code in adviser/examples/webapi/weather/domain.py
def find_info_about_entity(self, entity_id, requested_slots: Iterable):
    """ Returns the values (stored in the data backend) of the specified slots for the
        specified entity.

    Args:
        entity_id (str): primary key value of the entity
        requested_slots (dict): slot-value mapping of constraints
    """
    return [self.last_results[int(entity_id)]]
get_default_inform_slots(self)

Returns a list of all default Inform slots.

Source code in adviser/examples/webapi/weather/domain.py
def get_default_inform_slots(self) -> List[str]:
    """ Returns a list of all default Inform slots. """
    return ['temperature', 'description']
get_informable_slots(self)

Returns a list of all informable slots.

Source code in adviser/examples/webapi/weather/domain.py
def get_informable_slots(self) -> List[str]:
    """ Returns a list of all informable slots. """
    return ['location', 'date']
get_keyword(self)
Source code in adviser/examples/webapi/weather/domain.py
def get_keyword(self):
    return 'weather'
get_mandatory_slots(self)

Returns a list of all mandatory slots.

Source code in adviser/examples/webapi/weather/domain.py
def get_mandatory_slots(self) -> List[str]:
    """ Returns a list of all mandatory slots. """
    return ['location', 'date']
get_possible_values(self, slot)

Returns all possible values for an informable slot

Parameters:

Name Type Description Default
slot str

name of the slot

required

Returns:

Type Description
List[str]

a list of strings, each string representing one possible value for the specified slot.

Source code in adviser/examples/webapi/weather/domain.py
def get_possible_values(self, slot: str) -> List[str]:
    """ Returns all possible values for an informable slot

    Args:
        slot (str): name of the slot

    Returns:
        a list of strings, each string representing one possible value for
        the specified slot.
     """
    raise BaseException('all slots in this domain do not have a fixed set of '
                        'values, so this method should never be called')
get_primary_key(self)

Returns the slot name that will be used as the 'name' of an entry

Source code in adviser/examples/webapi/weather/domain.py
def get_primary_key(self) -> str:
    """ Returns the slot name that will be used as the 'name' of an entry """
    return 'artificial_id'
get_requestable_slots(self)

Returns a list of all slots requestable by the user.

Source code in adviser/examples/webapi/weather/domain.py
def get_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the user. """
    return ['temperature', 'description']
get_system_requestable_slots(self)

Returns a list of all slots requestable by the system.

Source code in adviser/examples/webapi/weather/domain.py
def get_system_requestable_slots(self) -> List[str]:
    """ Returns a list of all slots requestable by the system. """
    return ['location', 'date']

nlg

WeatherNLG (Service)

Simple NLG for the weather domain

Source code in adviser/examples/webapi/weather/nlg.py
class WeatherNLG(Service):
    """Simple NLG for the weather domain"""

    def __init__(self, domain, logger=DiasysLogger()):
        # only calls super class' constructor
        super(WeatherNLG, self).__init__(domain, debug_logger=logger)

    @PublishSubscribe(sub_topics=["sys_act"], pub_topics=["sys_utterance"])
    def generate_system_utterance(self, sys_act: SysAct = None) -> dict(sys_utterance=str):
        """Main function for generating and publishing the system utterance

        Args:
            sys_act: the system act for which to create a natural language realisation

        Returns:
            dict with "sys_utterance" as key and the system utterance as value
        """

        if sys_act is None or sys_act.type == SysActionType.Welcome:
            return {'sys_utterance': 'Hi! What do you want to know about the weather?'}

        if sys_act.type == SysActionType.Bad:
            return {'sys_utterance': 'Sorry, I could not understand you.'}
        elif sys_act.type == SysActionType.Bye:
            return {'sys_utterance': 'Thank you, good bye'}
        elif sys_act.type == SysActionType.Request:
            slot = list(sys_act.slot_values.keys())[0]
            if slot == 'date':
                return {'sys_utterance': 'For which day are you looking for the weather?'}
            elif slot == 'location':
                return {'sys_utterance': 'Which city are you at?'}
            else:
                assert False, 'Only the date and the location can be requested'
        else:
            location = sys_act.slot_values['location'][0]
            date = sys_act.slot_values['date'][0]
            date_str = date.strftime('on %B %-d at %-I %p')
            temperature = sys_act.slot_values['temperature'][0]
            description = sys_act.slot_values['description'][0]
            return {'sys_utterance': f'The weather in {location} {date_str} is {temperature} degrees celsius with {description}.'}
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>) special
Source code in adviser/examples/webapi/weather/nlg.py
def __init__(self, domain, logger=DiasysLogger()):
    # only calls super class' constructor
    super(WeatherNLG, self).__init__(domain, debug_logger=logger)
generate_system_utterance(self, *args, **kwargs)
Source code in adviser/examples/webapi/weather/nlg.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result

nlu

WEATHER_DATE_TODAY_REGEXES
WEATHER_DATE_TOMORROW_REGEXES
WEATHER_LOCATION_REGEXES
WeatherNLU (Service)

Very simple NLU for the weather domain.

Source code in adviser/examples/webapi/weather/nlu.py
class WeatherNLU(Service):
    """Very simple NLU for the weather domain."""

    def __init__(self, domain, logger=DiasysLogger()):
        # only calls super class' constructor
        super(WeatherNLU, self).__init__(domain, debug_logger=logger)

    @PublishSubscribe(sub_topics=["user_utterance"], pub_topics=["user_acts"])
    def extract_user_acts(self, user_utterance: str = None) -> dict(user_acts=List[UserAct]):
        """Main function for detecting and publishing user acts.

        Args:
            user_utterance: the user input string

        Returns:
            dict with key 'user_acts' and list of user acts as value
        """
        user_acts = []
        if not user_utterance:
            return {'user_acts': None}
        user_utterance = ' '.join(user_utterance.lower().split())

        for bye in ('bye', 'goodbye', 'byebye', 'seeyou'):
            if user_utterance.replace(' ', '').endswith(bye):
                return {'user_acts': [UserAct(user_utterance, UserActionType.Bye)]}

        # check weather today
        for regex in WEATHER_DATE_TODAY_REGEXES:
            match = regex.search(user_utterance)
            if match:
                user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'date', datetime.now()))
                break
        if len(user_acts) == 0:
            for regex in WEATHER_DATE_TOMORROW_REGEXES:
                match = regex.search(user_utterance)
                if match:
                    tomorrow = datetime.now() + timedelta(days=1)
                    date = datetime(tomorrow.year, tomorrow.month, tomorrow.day, hour=15)
                    user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'date', date))
                    break
        for regex in WEATHER_LOCATION_REGEXES:
            match = regex.search(user_utterance)
            if match:
                user_acts.append(UserAct(user_utterance, UserActionType.Inform, 'location', match.group(1)))

        self.debug_logger.dialog_turn("User Actions: %s" % str(user_acts))
        return {'user_acts': user_acts}
__init__(self, domain, logger=<DiasysLogger adviser (NOTSET)>) special
Source code in adviser/examples/webapi/weather/nlu.py
def __init__(self, domain, logger=DiasysLogger()):
    # only calls super class' constructor
    super(WeatherNLU, self).__init__(domain, debug_logger=logger)
extract_user_acts(self, *args, **kwargs)
Source code in adviser/examples/webapi/weather/nlu.py
def delegate(self, *args, **kwargs):
    func_inst = getattr(self, func.__name__)

    callargs = list(args)
    if self in callargs:    # remove self when in *args, because already known to function
        callargs.remove(self)
    result = func(self, *callargs, **kwargs)
    if result:
        # fix! (user could have multiple "/" characters in topic - only use last one )
        domains = {res.split("/")[0]: res.split("/")[1] if "/" in res else "" for res in result}
        result = {key.split("/")[0]: result[key] for key in result}

    if func_inst not in self._publish_sockets:
        # not a publisher, just normal function
        return result

    socket = self._publish_sockets[func_inst]
    domain = self._domain_name
    if socket and result:
        # publish messages
        for topic in pub_topics:
        # for topic in result: # NOTE publish any returned value in dict with it's key as topic
            if topic in result:
                domain = domain if domain else domains[topic]
                topic_domain_str = f"{topic}/{domain}" if domain else topic
                if topic in self._pub_topic_domains:
                    topic_domain_str = f"{topic}/{self._pub_topic_domains[topic]}" if self._pub_topic_domains[topic] else topic
                _send_msg(socket, topic_domain_str, result[topic])
                if self.debug_logger:
                    self.debug_logger.info(
                        f"- (DS): sent message from {func} to topic {topic_domain_str}:\n   {result[topic]}")
    return result