# -*- coding:Utf-8 -*-
import os
import time
import datetime
#from collections import defaultdict
#from operator import itemgetter
from django.conf import settings
from sulci.utils import load_file, save_to_file
from sulci.thesaurus import Trigger, Descriptor
from sulci.textmining import SemanticalTagger
from sulci.textutils import tokenize_text
from sulci.rules_templates import LemmatizerTemplateGenerator, RuleTemplate,\
ContextualTemplateGenerator, LexicalTemplateGenerator
from sulci import content_model, content_manager
from sulci.log import sulci_logger
[docs]class SemanticalTrainer(object):
"""
Create and update triggers. And make triggertodescription ponderation.
"""
PENDING_EXT = ".pdg"
VALID_EXT = ".trg"
def __init__(self, thesaurus, pos_tagger, mode="full"):
self.thesaurus = thesaurus
self.pos_tagger = pos_tagger
self.mode = mode
[docs] def begin(self):
"""
Make one trigger for each descriptor of the thesaurus.
Have to be called one time at the begining, and that's all.
"""
# TODO Add aliases...
for d in self.thesaurus:
t = Trigger.objects.create(original=unicode(d))
t.connect(d, 1)
[docs] def setup_socket_master(self):
"""
Configure the sockets for the master trainer.
"""
import zmq
# This is a socket load-balanced to every workers
# listening the canal.
context = zmq.Context()
self.reqsocket = context.socket(zmq.XREQ)
self.reqsocket.bind("ipc:///tmp/sulci.action")
# This is a publisher socket, used to distribute data.
# No response is expected.
self.pubsocket = zmq.Socket(zmq.Context(), zmq.PUB)
self.pubsocket.bind("ipc:///tmp/sulci.apply")
[docs] def setup_socket_slave(self):
"""
Configure sockets for the workers (slaves).
"""
import zmq
context = zmq.Context()
# Socket to receive messages on
self.repsocket = context.socket(zmq.XREP)
self.repsocket.connect("ipc:///tmp/sulci.action")
# This is the subscriber socket. Its used to subscribe to a canal
# to receive data.
self.subsocket = zmq.Socket(zmq.Context(), zmq.SUB)
self.subsocket.connect("ipc:///tmp/sulci.apply")
self.subpoller = zmq.Poller()
self.subpoller.register(self.subsocket, zmq.POLLIN)
# self.reppoller = zmq.Poller()
# self.reppoller.register(self.repsocket, zmq.POLLIN)
self.subsocket.setsockopt(zmq.SUBSCRIBE, "")
[docs] def do(self, *args):
if self.mode == "slave":
self.slave()
else:
t_init = time.time()
if self.mode == "master":
self.setup_socket_master()
print "MASTER -- ready"
qs = content_manager.all().filter(pk__gt=705969).order_by("id")
# qs = content_manager.all().order_by("id")
if self.mode == "master":
qs = qs.only("id")
# We make it by step, to limit RAM consuming
step = 1000
forloop_remaining = total = len(qs)
for loop in range(len(qs) / step + 1):
_from = loop*step
_to = _from + step
current_qs = qs[_from:_to]
print "MASTER -- Processing qs from %d to %d" % (_from, _to)
for a in current_qs:
if self.mode == "master":
self.reqsocket.send_multipart([str(a.pk)])
# We are training all objects, but without slaves.
else:
self.train(a)
if self.mode == "master":
# Waiting for answers
# The need is also to send the "stop" action only at the end
for idx in current_qs:
status = self.reqsocket.recv()
# Calculating ETA
forloop_remaining -= 1
t_now = time.time()
t_diff = t_now - t_init
time_remaining = (t_diff / (total - forloop_remaining) * forloop_remaining)
ETA = datetime.datetime.now() + datetime.timedelta(0,time_remaining)
# Using print, as I launch this huge script
# with python -O (so not __debug__, so no log)
print "MASTER -- %s -- %s remaining to process -- ETA : %s" %\
(status, forloop_remaining, ETA.strftime("%a %d %R"))
if self.mode == "master":
self.pubsocket.send(" stop")
[docs] def slave(self):
self.setup_socket_slave()
print "SLAVE %s -- ready" % os.getpid()
while True:
if self.subpoller.poll(0):
# This is subscription mode
# Used just to stop the process
# All the subprocess receive the same socket
action = self.subsocket.recv()[1:]
if action == "stop": return
# This is the REP mode
# We receive an action to do, with a pk
idx, pk = self.repsocket.recv_multipart()
print "SLAVE %s processing pk %s" % (os.getpid(), pk)
self.train(int(pk))
self.repsocket.send_multipart([idx, "Processed pk %s" % pk])
[docs] def train(self, inst):
"""
For the moment, human defined descriptors are a string with "," separator.
"""
if isinstance(inst, int):
#We guess we have a pk here
inst = content_model.objects.get(pk=inst)
text = getattr(inst, settings.SULCI_CLI_CONTENT_PROPERTY)
descriptors = getattr(inst, settings.SULCI_CLI_KEYWORDS_PROPERTY)
# hack
if descriptors == "" or text == "": return
validated_descriptors = set()
# Retrieve descriptors
for d in descriptors.split(","):
d = d.strip().replace(u"’", u"'")
if not d == "":
# We create the descriptor not in thesaurus for now
# because descriptors in article and thesaurus are not
# always matching. Will be improved.
dsc, created = Descriptor.objects.get_or_create(name=d)
# Retrieve the primeval value
dsc = dsc.primeval
validated_descriptors.add(dsc)
if created:
sulci_logger.info(u"Lairning descriptor not in thesaurus : %s" % unicode(dsc), "RED")
# Retrieve keytentities :
try:
S = SemanticalTagger(text, self.thesaurus, self.pos_tagger)
S.deduplicate_keyentities() # During lairning, try to filter
except ValueError:
# SemanticalTagger raise ValueError if text is empty
return
current_triggers = set()
for ke in S.keyentities:
# Retrieve or create triggers
t, created = Trigger.objects.get_or_create(original=unicode(ke))
current_triggers.add(t)
t.count += 1
t.save()
# t.current_score = ke.trigger_score
# For now, only create all the relations
for d in validated_descriptors:
for t in current_triggers:
t.connect(d, 1)
# log(u"Connecting %s and %s" % (t, d), "WHITE")
# log(u"Current triggers", "WHITE")
# log([unicode(d) for d in current_triggers], "YELLOW")
# log(u"Descriptors validated by human", "WHITE")
# log([unicode(d) for d in validated_descriptors], "YELLOW")
# #Descriptors calculated by SemanticalTagger
# calculated_descriptors = set(d for d, value in S.descriptors)
# log(u"Descriptors calculated", "WHITE")
# log([unicode(d) for d in calculated_descriptors], "YELLOW")
# #Descriptors that where tagged by humans, but not calculated
# false_negative = validated_descriptors.difference(calculated_descriptors)
# #Descriptors that where not tagged by humans, but where calculated
# false_positive = calculated_descriptors.difference(validated_descriptors)
# #Validated descriptors that where also calculated
# true_positive = calculated_descriptors.intersection(validated_descriptors)
#
# for d in true_positive:
# for t in current_triggers:
# if d in t:
# t.connect(d, t.current_score)#trust the relation
# log(u"Adding 2 to connection %s - %s" % (t, d), "YELLOW")
#
# for d in false_positive:
# for t in current_triggers:
# if d in t:
# t.connect(d, -t.current_score)#untrust the relation
# log(u"Removing 1 to connection %s - %s" % (t, d), "BLUE")
#
# for d in false_negative:
# for t in current_triggers:
# t.connect(d, t.current_score)#guess the relation
# log(u"Connecting %s and %s" % (t, d), "WHITE")
[docs] def clean_connections(self):
"""
Delete all the connection where score < 0.
"""
# We maybe have to clean connections where pondered_weigth is < 0.1.
Trigger.clean_all_connections()
[docs]class RuleTrainer(object):
"""
Main trainer class for rules based, for factorisation.
"""
[docs] def do(self):
if self.mode == "slave":
self.slave()
elif self.mode == "master":
self.setup_socket_master()
self.train()
self.pubsocket.send(" stop")
else:
self.train()
[docs] def setup_socket_master(self):
"""
Configure the sockets for the master trainer.
"""
import zmq
# This is a socket load-balanced to every workers
# listening the canal.
self.reqsocket = zmq.Socket(zmq.Context(), zmq.XREQ)
self.reqsocket.bind("ipc:///tmp/sulci.action")
# This is a publisher socket, used to distribute data.
# No response is expected.
self.pubsocket = zmq.Socket(zmq.Context(), zmq.PUB)
self.pubsocket.bind("ipc:///tmp/sulci.apply")
[docs] def setup_socket_slave(self):
"""
Configure sockets for the workers (slaves).
"""
import zmq
self.repsocket = zmq.Socket(zmq.Context(), zmq.XREP)
self.repsocket.connect("ipc:///tmp/sulci.action")
# This is the subscriber socket. Its used to subscribe to a canal
# to receive data.
self.subsocket = zmq.Socket(zmq.Context(), zmq.SUB)
self.subsocket.connect("ipc:///tmp/sulci.apply")
self.subpoller = zmq.Poller()
self.subpoller.register(self.subsocket, zmq.POLLIN)
self.reppoller = zmq.Poller()
self.reppoller.register(self.repsocket, zmq.POLLIN)
self.subsocket.setsockopt(zmq.SUBSCRIBE, "")
[docs] def slave(self):
self.setup_socket_slave()
while True:
if self.subpoller.poll(0):
rule = self.subsocket.recv()[1:]
if rule == "stop": return
rule = rule.decode("utf-8")
template = self.get_template_instance(rule)
#Apply the rule to the tokens
sulci_logger.info(u"Applying rule %s" % rule, "RED")
template.apply_rule(self.tokens, rule)
if self.reppoller.poll(0):
idx, action, rule = self.repsocket.recv_multipart()
_, good, bad = self.test_rule(rule.decode("utf-8"))
self.repsocket.send_multipart([idx, rule, str(good), str(bad)])
[docs] def test_rules(self, rules_candidates):
pondered_rules = []
if self.mode == "master":
# Send order
for rule in rules_candidates:
self.reqsocket.send_multipart(["check", rule.encode("utf-8")])
#Receive results
for rule in rules_candidates:
resp = self.reqsocket.recv_multipart()
r, good, bad = resp
pondered_rules.append((r.decode("utf-8"), int(good), int(bad)))
sulci_logger.info(u"Received rule %s" % r.decode("utf-8"), "MAGENTA")
sulci_logger.info(u"All rules are received from slaves")
else:
for rule in rules_candidates:
pondered_rules.append(self.test_rule(rule))
return pondered_rules
[docs] def pretrain(self):
"""
Trainer specific training session preparation.
"""
pass
[docs] def log_error(self, token):
pass
[docs] def get_template_instance(self, tpl):
pass
[docs] def select_one_rule(self, rules):
pass
[docs] def train(self):
"""
Main factorized train method.
"""
#We have to apply rules one after one to all objects
sulci_logger.info("Begin of training session.", "WHITE", True)
final_rules = []
errors = self.get_errors()
while errors:
run_applied_rule = False
sulci_logger.info("%d errors for now..." % len(errors), "RED", True)
for token_with_error in errors[:]:
rules_candidates = []
self.log_error(token_with_error)
# Make rules candidates
for tpl, _ in self.template_generator.register.items():
# print "tpl", tpl
template = self.get_template_instance(tpl)
rules_candidates += template.make_rules(token_with_error)
# Test the rules
pondered_rules = self.test_rules(rules_candidates)
# Select one rule
rule_candidate, score = self.select_one_rule(pondered_rules)
# Maybe the test "rule_candidate in final_rules" have to be done before...
if rule_candidate and not rule_candidate in final_rules:#How to calculate the score min ?
template = self.get_template_instance(rule_candidate)
final_rules.append((rule_candidate, score))
# Apply the rule to the tokens
sulci_logger.info(u"Applying rule %s (%s)" % (rule_candidate, score), "RED")
template.apply_rule(self.tokens, rule_candidate)
if self.mode == "master":
# Send the rule to apply
self.pubsocket.send(" %s" % rule_candidate.encode("utf-8"))
run_applied_rule = True
# We have applied a rule, we can try another run
errors = self.get_errors()
break#break the for
else:#No rule applied for this error
# We don't want to reprocess this error another time
# unless the sample (so the context) as changed.
token_with_error.sample.set_trained_position(token_with_error.position)
if run_applied_rule: continue#go back to while
errors = None#Nothing applied, we stop here.
self.display_errors()
self.template_generator.export(final_rules)
[docs] def test_rule(self, rule):
template = self.get_template_instance(rule)
bad = 0
good = 0
for ttk in self.tokens:
test = template.test_rule(ttk, rule)
if test == 1:
good += 1
elif test == -1:
bad += 1
sulci_logger.info(u"%s g: %d b : %d" % (rule, good, bad), "GRAY")
return rule, good, bad
[docs] def display_errors(self):
"""
Display errors in current step.
"""
remaining_errors = self.get_errors()
errors_count = len(remaining_errors)
total_words = len(self.tokens)
sulci_logger.info(u"Remaining %d errors (%f %% of %d total words)" %
(errors_count, 100.0 * errors_count / total_words, total_words), "RED")
for r_error in remaining_errors:
self.log_error(r_error)
[docs] def get_errors(self):
"""
Retrieve token where tag !== verified_tag.
"""
final = []
for sample in self.samples:
# We don't take in count sample yet processed and not modified.
final += sample.get_errors(self.attr_name)
return final
[docs]class LemmatizerTrainer(RuleTrainer):
"""
Train the Lemmatizer.
"""
template_generator = LemmatizerTemplateGenerator
attr_name = "lemme" # Name of the attribute we test on
def __init__(self, lemmatizer, mode = "full"):
self.lemmatizer = lemmatizer
self.mode = mode
self.tokens = self.lemmatizer.tokens
self.samples = self.lemmatizer.samples
self.pretrain()
[docs] def pretrain(self):
"""
We need to have the right tags, here
"""
for token in self.tokens:
token.tag = token.verified_tag
[docs] def log_error(self, token):
sulci_logger.info(u"Error : %s, lemmatized %s instead of %s" \
% (unicode(token), token.lemme, token.verified_lemme), "WHITE")
[docs] def get_template_instance(self, tpl):
template, _ = self.template_generator.get_instance(tpl)
return template
[docs] def select_one_rule(self, rules):
"""
Having a set of rules candidate for correcting some error, select the
one correcting the more case, and creating the less errors.
"""
return RuleTemplate.select_one(rules, len(self.tokens))
[docs]class POSTrainer(RuleTrainer):
"""
Pos Tagger trainer.
"""
attr_name = "tag" # Name of the attribute we test on
def __init__(self, tagger, corpus, mode = "full"):
self.tagger = tagger
self.corpus = corpus
self.pretrain()
self.tokens = self.corpus.tokens
self.samples = self.corpus.samples
self.mode = mode
[docs] def log_error(self, token):
sulci_logger.info(u"Error : %s, tagged %s instead of %s" \
% (unicode(token), token.tag, token.verified_tag), "WHITE")
[docs] def get_template_instance(self, tpl):
template, _ = self.template_generator.get_instance(tpl, lexicon=self.tagger.lexicon)
return template
[docs] def select_one_rule(self, rules):
return RuleTemplate.select_one(rules, len(self.corpus), 3)
[docs] def pretrain(self):
self.tagger.tag_all(self.corpus.tokens)
[docs]class LexicalTrainer(POSTrainer):
template_generator = LexicalTemplateGenerator
[docs] def pretrain(self):
"""
Tag the tokens, but not using POS rules, as we are training it.
"""
self.tagger.tag_all(self.corpus.tokens, lexical=False, contextual=False)
[docs] def get_errors(self):
"""
We don't care about token in Lexicon, for lexical trainer.
"""
final = []
for sample in self.corpus.samples:
# We don't take in count sample yet processed and not modified.
final += [t for t in sample.get_errors() if not t in self.tagger.lexicon]
return final
[docs]class ContextualTrainer(POSTrainer):
template_generator = ContextualTemplateGenerator
[docs] def pretrain(self):
"""
Tag the tokens, but not using the contextual rules, as we are training it.
"""
self.tagger.tag_all(self.corpus.tokens, contextual=False)