Wikiproyecto:Bots/Repositorio/redirecciones-redirecciones.py

Summary

actualizar · discusión · código desprotegido  

Información de fichero
  • Nombre del fichero: redirecciones-redirecciones.py
  • Lenguaje: Python
  • Estado: no protegido
Detalles de edición
  • Detalles:
Script de BOTijo (disc. · contr. · bloq.) para crear redirecciones sin acentos (a partir de otras redirecciones). Originalmente tarea033.py
# -*- coding: utf-8 -*-

# Copyright (C) 2009 emijrp
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import argparse, codecs, re, time
from datetime import datetime

import os, sys
sys.path.append(os.path.split(os.getcwd())[0])

from wikipedia import Page, Site, output as display, stopme
import pagegenerators as pg, query as api

pairs={
	u"àáâäãăǎąåā": "a", u'æǣ': "ae",
	u'ḃɓ': "b",
	u'çćčćĉċ': "c",
	u'đḍďḋð': "d",
	u'èéêëẽēę': "e",
	u'ḟƒ': "f",
	u'ĝġģğ': "g",
	u'ĥħ': "h",
	u'ìíîïīį': "i", u'ij': "ij",
	u'ĵ': "j",
	u'ķ': "k",
	u'ŀļḷḹľł': "l",
	u'ñńň': "n",
	u'òóôöõøōǫ': "o",
	u'œ': "oe",
	u'ṗ': "p",
	u'ŗřṛṝ': "r",
	u'şṡšŝ': "s", u'ß': "sz",
	u'ţṫṭ': "t",
	u'Þ': "th",
	u'ùúûüŭūų': "u",
	u'ẁŵẅƿ': "w",
	u'ýỳŷÿȳỹ': "y",
	u'źžż': "z"
}
diacritics = "".join(pairs.keys())

def simplify_chars(string):
	word=""
	for ch in unicode(string):
		is_upper = ch != ch.lower()
		if ch.lower() in diacritics:
			for keys in pairs:
				if ch.lower() in keys:
					ch = pairs[keys]
					break
		if is_upper: ch=ch.upper()
		word += ch
	word=word.replace(u"l·l","ll")
	#word = re.sub("\W","!", word)
	return word

def timedelta(td):
	#get the timedelta obejct and returns also hours, minutes and seconds
	#by accessing to .seconds atribute.
	td = datetime.now()-datetime.fromtimestamp(td)
	hours, remainder = divmod(td.seconds, 3600)
	minutes, seconds = divmod(remainder, 60)
	result = "%s%s%s%s" %(
		"%i d" %  td.days if td.days else "",
		" %i h" % hours if hours else "",
		" %i m" % minutes if minutes else "",
		" %i s" % seconds if seconds else "",
	)
	if not result: result ="0 s %s ms" % str(td.microseconds).rstrip("0")
	return result.strip(), td.days, hours, minutes, seconds

def get_filename(filename="wikipage2"):
	user = sys.path[0].split("/")[2]
	if not args.path:
		path = "/home/%(u)s/temp/" % {"u": user}
	else:
		path = args.path
	if path.startswith("*"):
		path = path.replace("*/", "%s/" % os.getcwd())
	if not path.endswith("/"):
		path = "%s/" % path
	return "%(p)s%(l)s%(f)s.log" % {"l":args.lang, "p": path, "f": filename}

def get_sql(query, filename="wikipage2"):
	fdata = {
		"l": args.lang,
		"p": get_filename(filename),
		"q": query,
		"f": filename
	}
	os.system(
		"""mysql -h %(l)swiki-p.db.toolserver.org -e"""
		""" "use %(l)swiki_p;%(q)s" """
		"""> %(p)s""" % fdata
	)
	f=codecs.open(get_filename(filename), 'r', encoding="utf-8")
	lines = f.readlines()
	f.close()
	return lines

def load_from_cache():
	pages = set()
	f=codecs.open(get_filename(), 'r', encoding="utf-8")
	lines = f.readlines()
	f.close()
	for line in lines[1:]:
		#saltamos la primera linea q es la descripcion de las columnas de la sql
		pages.add(line[:-1].strip().replace("_"," "))

	redirects = set()
	f=codecs.open(get_filename("redirs"), 'r', encoding="utf-8")
	lines = f.readlines()
	f.close()
	for line in lines[1:]:
		#saltamos la primera linea q es la descripcion de las columnas de la sql
		pages.add(line[:-1].strip().replace("_"," "))

def load_from_toolserver():
	debug("*** toolserver method ***")

	pages = set()
	debug('Cargando paginas de %swiki' % args.lang)
	lines = get_sql("select page_title from page where page_namespace=0 and page_is_redirect=0;")
	for line in lines[1:]:
		#saltamos la primera linea q es la descripcion de las columnas de la sql
		pages.add(line[:-1].strip().replace("_"," "))
	debug(
		'Cargadas %i paginas de un total de %i [de %swiki]' % (
			len(pages), len(lines)-1, args.lang
		)
	)

	redirects = set()
	print 'Cargando redirecciones de eswiki'
	lines = get_sql("select page_title from page where page_namespace=0 and page_is_redirect=1;", "redirs")
	for line in lines[1:]:
		#saltamos la primera linea q es la descripcion de las columnas de la sql
		redirects.add(line[:-1].strip().replace("_"," "))
	debug(
		'Cargadas %i redirecciones de un total de %i [de %swiki]'  % (
			len(redirects), len(lines)-1, args.lang
		)
	)
	return pages, redirects

def load_using_API():
	debug("*** API method ***")

	#pages
	pages = set()
	debug('Cargando paginas de %swiki' % args.lang)
	params = {
		"action": "query",
		"list": "allpages",
		"apfrom": args.begin,
		"apto": args.end,
		"apnamescpace": 0,
		"apfilterredir": "nonredirects",
		"aplimit": "max"
	}
	next=True
	while next:
		data = api.GetData(params, Site(args.lang, "wikipedia"))
		next = data.has_key("query-continue") and data['query-continue']['allpages'].has_key('apcontinue')
		for page in data['query']['allpages']:
			pages.add(page['title'])
		if next:
			params['apcontinue'] = data['query-continue']['allpages']['apcontinue']
	debug('Cargadas %i paginas [de %swiki]' % (len(pages), args.lang))

	#redirects
	redirects = set()
	debug('Cargando redirecciones de %swiki' % args.lang)
	params = {
		"action": "query",
		"list": "allpages",
		"apfrom": args.begin,
		"apto": args.end,
		"apnamescpace": 0,
		"apfilterredir": "redirects",
		"aplimit": "max"
	}
	next=True
	while next:
		data = api.GetData(params, Site(args.lang, "wikipedia"))
		next = data.has_key("query-continue") and data['query-continue']['allpages'].has_key('apcontinue')
		for redir in data['query']['allpages']:
			redirects.add(redir['title'])
		if next:
			params['apcontinue'] = data['query-continue']['allpages']['apcontinue']
	debug('Cargadas %i paginas [de %swiki]' % (len(redirects), args.lang))
	return pages, redirects

def load_from_pywikilib():
	debug("*** pywikilib method ***")
	pages = set()
	gen = pg.AllpagesPageGenerator(
		start=args.begin, includeredirects=False, site=Site(args.lang, "wikipedia")
	)
	debug('Cargando paginas de %swiki' % args.lang)
	for page in gen:
		if page.title() == args.end: break
		pages.add(page.title())
	debug('Cargadas %i paginas [de %swiki]' % (len(pages), args.lang))

	redirects = set()
	gen = pg.AllpagesPageGenerator(
		start=args.begin, includeredirects="only", site=Site(args.lang, "wikipedia")
	)
	debug('Cargando redirecciones de %swiki' % args.lang)
	for redir in gen:
		if redir.title() == args.end: break
		redirects.add(redir.title())
	debug('Cargadas %i redirecciones [de %swiki]' % (len(redirects), args.lang))
	return pages, redirects

def filter_redirects(pages, redirects):
	filter = set()
	for redir in redirects:
		if re.search(ur"(?i)[a-z%s0-9\-\. ]" % diacritics, redir): #no meter (    ), A (desambiguacion) Pi (pelicula)
			nredir = simplify_chars(redir)
			if redir != nredir and nredir not in pages and nredir not in redirects:
				filter.add(redir)
				if len(filter) % 50 == 0:
					debug(u"%i %s" % (len(filter), redir))

				redir_page=Page(Site(args.lang, 'wikipedia'), redir)
				nredir_page=Page(Site(args.lang, 'wikipedia'), nredir)
				if redir_page.isRedirectPage() and not nredir_page.exists():
					output=u"#REDIRECT [[%s]]" % redir_page.getRedirectTarget().title()
					debug(output)
					if args.edit and not args.test: nredir_page.put(display, u"BOT - %s" % display)

def debug(string):
	if args.test or not args.quiet: display(string)

def main():
	t=time.time()
	display(u"[\3{lightyellow}%s\3{default}] Empezamos." % time.strftime("%H:%M:%S"))
	if args.cache:
		if os.path.exists(get_filename()):
			pages, redirects = load_from_cache()
		else:
			debug("El fichero temporal no existe, iniciando la consulta SQL...")
			pages, redirects = load_from_toolserver()
	elif args.piwikimedia:
		pages, redirects = load_from_pywikilib()
	elif args.use_api:
		pages, redirects = load_using_API()
	else:
		pages, redirects = load_from_toolserver()
	display(u"[\3{lightpurple}%s\3{default}] OK. Se ha tardado: %s." % (time.strftime("%H:%M:%S"), timedelta(t)[0]))
	filter_redirects(pages, redirects)
	if args.remove:
		os.system("rm %s" % get_filename())
		os.system("rm %s" % get_filename("redirs"))

if __name__ == '__main__':
	parser = argparse.ArgumentParser(
		description="Crea redirecciones sin acentuación de artículos que contengan diacríticas en su título.",
		usage="%(prog)s [--lang <lang>] [--begin <A>] [--end <M>] [--path </home/emijrp/temporal/>] [--api|--cache|--pgen] [--remove]"
	)
	parser.add_argument("--lang", "-l", default="es", help="Idioma del proyecto. (Opcional, por defecto: '%(default)s'.)", metavar="es")
	parser.add_argument("--begin", "-b", default="!", type=unicode, help="Primer artículo", metavar="!")
	parser.add_argument("--end", "-e", default=u"ÿ", type=unicode, help="Último artículo", metavar="ÿ")
	parser.add_argument("--limit", "-x", default=None, type=int, help="limita el número de ediciones, útil en modo de pruebas")
	parser.add_argument("--pgen", "-g", dest="piwikimedia", action="store_true", default=False, help="usar método de pagegenerator, no recomendable, es el más lento y el que más recursos consume.")
	parser.add_argument("--api", "-a", dest="use_api", action="store_true", default=False, help="usar API, recomendable si no se dispone de acceso al toolserver.")
	parser.add_argument("--cache", "-C", action="store_true", default=False, help="usar caché (ficheros temporales, solo para toolserver)")
	parser.add_argument("--edit", "-E", action="store_true", default=False, help="editar, imprescindible para que el bot realice los cambios")
	parser.add_argument("--remove", "-R", action="store_true", default=False, help="eliminar archivos temporales (solo para toolserver))")
	parser.add_argument("--path", "-H", default=None, help="ruta fichero (solo para toolserver; por defecto: /home/{USER}/temp/)", metavar="/home/{USER}/temp/")
	parser.add_argument("--quiet", "-Q", action="store_true", default=False, help="anula la información adicional durante del desarrollo del programa.")
	parser.add_argument("--test", "-T", action="store_true", default=False, help="activar modo pruebas (no permite editar y muestra toda la información adicional.)")
	args = parser.parse_args()
	try:
		main()
	except KeyboardInterrupt:
		display("Cancelled by user...")
	finally:
		stopme()