# -*- coding: utf-8 -*-
# Copyright (C) 2009 emijrp
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse, codecs, re, time
from datetime import datetime
import os, sys
sys.path.append(os.path.split(os.getcwd())[0])
from wikipedia import Page, Site, output as display, stopme
import pagegenerators as pg, query as api
pairs={
u"àáâäãăǎąåā": "a", u'æǣ': "ae",
u'ḃɓ': "b",
u'çćčćĉċ': "c",
u'đḍďḋð': "d",
u'èéêëẽēę': "e",
u'ḟƒ': "f",
u'ĝġģğ': "g",
u'ĥħ': "h",
u'ìíîïīį': "i", u'ij': "ij",
u'ĵ': "j",
u'ķ': "k",
u'ŀļḷḹľł': "l",
u'ñńň': "n",
u'òóôöõøōǫ': "o",
u'œ': "oe",
u'ṗ': "p",
u'ŗřṛṝ': "r",
u'şṡšŝ': "s", u'ß': "sz",
u'ţṫṭ': "t",
u'Þ': "th",
u'ùúûüŭūų': "u",
u'ẁŵẅƿ': "w",
u'ýỳŷÿȳỹ': "y",
u'źžż': "z"
}
diacritics = "".join(pairs.keys())
def simplify_chars(string):
word=""
for ch in unicode(string):
is_upper = ch != ch.lower()
if ch.lower() in diacritics:
for keys in pairs:
if ch.lower() in keys:
ch = pairs[keys]
break
if is_upper: ch=ch.upper()
word += ch
word=word.replace(u"l·l","ll")
#word = re.sub("\W","!", word)
return word
def timedelta(td):
#get the timedelta obejct and returns also hours, minutes and seconds
#by accessing to .seconds atribute.
td = datetime.now()-datetime.fromtimestamp(td)
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
result = "%s%s%s%s" %(
"%i d" % td.days if td.days else "",
" %i h" % hours if hours else "",
" %i m" % minutes if minutes else "",
" %i s" % seconds if seconds else "",
)
if not result: result ="0 s %s ms" % str(td.microseconds).rstrip("0")
return result.strip(), td.days, hours, minutes, seconds
def get_filename(filename="wikipage2"):
user = sys.path[0].split("/")[2]
if not args.path:
path = "/home/%(u)s/temp/" % {"u": user}
else:
path = args.path
if path.startswith("*"):
path = path.replace("*/", "%s/" % os.getcwd())
if not path.endswith("/"):
path = "%s/" % path
return "%(p)s%(l)s%(f)s.log" % {"l":args.lang, "p": path, "f": filename}
def get_sql(query, filename="wikipage2"):
fdata = {
"l": args.lang,
"p": get_filename(filename),
"q": query,
"f": filename
}
os.system(
"""mysql -h %(l)swiki-p.db.toolserver.org -e"""
""" "use %(l)swiki_p;%(q)s" """
"""> %(p)s""" % fdata
)
f=codecs.open(get_filename(filename), 'r', encoding="utf-8")
lines = f.readlines()
f.close()
return lines
def load_from_cache():
pages = set()
f=codecs.open(get_filename(), 'r', encoding="utf-8")
lines = f.readlines()
f.close()
for line in lines[1:]:
#saltamos la primera linea q es la descripcion de las columnas de la sql
pages.add(line[:-1].strip().replace("_"," "))
redirects = set()
f=codecs.open(get_filename("redirs"), 'r', encoding="utf-8")
lines = f.readlines()
f.close()
for line in lines[1:]:
#saltamos la primera linea q es la descripcion de las columnas de la sql
pages.add(line[:-1].strip().replace("_"," "))
def load_from_toolserver():
debug("*** toolserver method ***")
pages = set()
debug('Cargando paginas de %swiki' % args.lang)
lines = get_sql("select page_title from page where page_namespace=0 and page_is_redirect=0;")
for line in lines[1:]:
#saltamos la primera linea q es la descripcion de las columnas de la sql
pages.add(line[:-1].strip().replace("_"," "))
debug(
'Cargadas %i paginas de un total de %i [de %swiki]' % (
len(pages), len(lines)-1, args.lang
)
)
redirects = set()
print 'Cargando redirecciones de eswiki'
lines = get_sql("select page_title from page where page_namespace=0 and page_is_redirect=1;", "redirs")
for line in lines[1:]:
#saltamos la primera linea q es la descripcion de las columnas de la sql
redirects.add(line[:-1].strip().replace("_"," "))
debug(
'Cargadas %i redirecciones de un total de %i [de %swiki]' % (
len(redirects), len(lines)-1, args.lang
)
)
return pages, redirects
def load_using_API():
debug("*** API method ***")
#pages
pages = set()
debug('Cargando paginas de %swiki' % args.lang)
params = {
"action": "query",
"list": "allpages",
"apfrom": args.begin,
"apto": args.end,
"apnamescpace": 0,
"apfilterredir": "nonredirects",
"aplimit": "max"
}
next=True
while next:
data = api.GetData(params, Site(args.lang, "wikipedia"))
next = data.has_key("query-continue") and data['query-continue']['allpages'].has_key('apcontinue')
for page in data['query']['allpages']:
pages.add(page['title'])
if next:
params['apcontinue'] = data['query-continue']['allpages']['apcontinue']
debug('Cargadas %i paginas [de %swiki]' % (len(pages), args.lang))
#redirects
redirects = set()
debug('Cargando redirecciones de %swiki' % args.lang)
params = {
"action": "query",
"list": "allpages",
"apfrom": args.begin,
"apto": args.end,
"apnamescpace": 0,
"apfilterredir": "redirects",
"aplimit": "max"
}
next=True
while next:
data = api.GetData(params, Site(args.lang, "wikipedia"))
next = data.has_key("query-continue") and data['query-continue']['allpages'].has_key('apcontinue')
for redir in data['query']['allpages']:
redirects.add(redir['title'])
if next:
params['apcontinue'] = data['query-continue']['allpages']['apcontinue']
debug('Cargadas %i paginas [de %swiki]' % (len(redirects), args.lang))
return pages, redirects
def load_from_pywikilib():
debug("*** pywikilib method ***")
pages = set()
gen = pg.AllpagesPageGenerator(
start=args.begin, includeredirects=False, site=Site(args.lang, "wikipedia")
)
debug('Cargando paginas de %swiki' % args.lang)
for page in gen:
if page.title() == args.end: break
pages.add(page.title())
debug('Cargadas %i paginas [de %swiki]' % (len(pages), args.lang))
redirects = set()
gen = pg.AllpagesPageGenerator(
start=args.begin, includeredirects="only", site=Site(args.lang, "wikipedia")
)
debug('Cargando redirecciones de %swiki' % args.lang)
for redir in gen:
if redir.title() == args.end: break
redirects.add(redir.title())
debug('Cargadas %i redirecciones [de %swiki]' % (len(redirects), args.lang))
return pages, redirects
def filter_redirects(pages, redirects):
filter = set()
for redir in redirects:
if re.search(ur"(?i)[a-z%s0-9\-\. ]" % diacritics, redir): #no meter ( ), A (desambiguacion) Pi (pelicula)
nredir = simplify_chars(redir)
if redir != nredir and nredir not in pages and nredir not in redirects:
filter.add(redir)
if len(filter) % 50 == 0:
debug(u"%i %s" % (len(filter), redir))
redir_page=Page(Site(args.lang, 'wikipedia'), redir)
nredir_page=Page(Site(args.lang, 'wikipedia'), nredir)
if redir_page.isRedirectPage() and not nredir_page.exists():
output=u"#REDIRECT [[%s]]" % redir_page.getRedirectTarget().title()
debug(output)
if args.edit and not args.test: nredir_page.put(display, u"BOT - %s" % display)
def debug(string):
if args.test or not args.quiet: display(string)
def main():
t=time.time()
display(u"[\3{lightyellow}%s\3{default}] Empezamos." % time.strftime("%H:%M:%S"))
if args.cache:
if os.path.exists(get_filename()):
pages, redirects = load_from_cache()
else:
debug("El fichero temporal no existe, iniciando la consulta SQL...")
pages, redirects = load_from_toolserver()
elif args.piwikimedia:
pages, redirects = load_from_pywikilib()
elif args.use_api:
pages, redirects = load_using_API()
else:
pages, redirects = load_from_toolserver()
display(u"[\3{lightpurple}%s\3{default}] OK. Se ha tardado: %s." % (time.strftime("%H:%M:%S"), timedelta(t)[0]))
filter_redirects(pages, redirects)
if args.remove:
os.system("rm %s" % get_filename())
os.system("rm %s" % get_filename("redirs"))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Crea redirecciones sin acentuación de artículos que contengan diacríticas en su título.",
usage="%(prog)s [--lang <lang>] [--begin <A>] [--end <M>] [--path </home/emijrp/temporal/>] [--api|--cache|--pgen] [--remove]"
)
parser.add_argument("--lang", "-l", default="es", help="Idioma del proyecto. (Opcional, por defecto: '%(default)s'.)", metavar="es")
parser.add_argument("--begin", "-b", default="!", type=unicode, help="Primer artículo", metavar="!")
parser.add_argument("--end", "-e", default=u"ÿ", type=unicode, help="Último artículo", metavar="ÿ")
parser.add_argument("--limit", "-x", default=None, type=int, help="limita el número de ediciones, útil en modo de pruebas")
parser.add_argument("--pgen", "-g", dest="piwikimedia", action="store_true", default=False, help="usar método de pagegenerator, no recomendable, es el más lento y el que más recursos consume.")
parser.add_argument("--api", "-a", dest="use_api", action="store_true", default=False, help="usar API, recomendable si no se dispone de acceso al toolserver.")
parser.add_argument("--cache", "-C", action="store_true", default=False, help="usar caché (ficheros temporales, solo para toolserver)")
parser.add_argument("--edit", "-E", action="store_true", default=False, help="editar, imprescindible para que el bot realice los cambios")
parser.add_argument("--remove", "-R", action="store_true", default=False, help="eliminar archivos temporales (solo para toolserver))")
parser.add_argument("--path", "-H", default=None, help="ruta fichero (solo para toolserver; por defecto: /home/{USER}/temp/)", metavar="/home/{USER}/temp/")
parser.add_argument("--quiet", "-Q", action="store_true", default=False, help="anula la información adicional durante del desarrollo del programa.")
parser.add_argument("--test", "-T", action="store_true", default=False, help="activar modo pruebas (no permite editar y muestra toda la información adicional.)")
args = parser.parse_args()
try:
main()
except KeyboardInterrupt:
display("Cancelled by user...")
finally:
stopme()