User:InfoboxBot/wikipedia edit pages clean.py

From WikiProjectMed
Jump to navigation Jump to search

The following code was developed for Python 3.6. It may or may not work on previous versions of Python (untested), and I have not tested to see if it works correctly on other operating systems. It is designed to work in conjunction with a pre-existing MySQL database containing scraped infobox data from enwiki within a single discrete table per infobox parent template (which I may sometimes refer to as a infobox "category" even though it's not quite accurate terminology — the reason for this is because I match multiple older infobox template names that are now merged into and redirect to the parent infobox template), and those tables are populated by several other entirely different scripts that I've written, all of which are not currently open source or available for public viewing (I may eventually make the whole project properly & fully open source, but for now I'm keeping it closed-source outside of the core page editing code).

The following is a static snapshot in time of the bot's core page editing code, which is continually evolving and changes to at least a limited extent with every individual specific task executed. This is complete in that this was the entirety of the program at one point in time for one specific series of edits, but it will never be perfectly current — I will only periodically update this page with new versions of the code, not every single time it changes!

NOTE: As of v1.2b the bot now supports seeing & editing multiple infoboxes of the specified category on a single page. Previously it was unable to see or edit anything but the first infobox of the specified category on a single page. The bot is currently still limited to only modifying one infobox per page edit even if multiple infoboxes on the page need to be changed. I am currently uncertain if I will be implementing the ability for the bot to make changes to more than one infobox on a page in a single page edit, as this functionality appears to be fairly complicated to implement and doesn't really seem to provide benefits proportional to the amount of dev time that it'd take. Another bit of somewhat-related functionality that I am uncertain if I will be able to implement is the ability to specify & handle multiple requested changes to infobox values in such a way that any articles that meet the criteria for more than one of the set of requested changes to be made (for a single one of the infoboxes on said article) can have all of the requested changes that it meets the criteria for made in a single page edit. This particular functionality could be implemented as a full shared-priority set of changes to be made to all pages matching the criteria, as a set of conditional additional lower-priority changes to be made when possible in addition to the (single) main change, or as some sort of blend of both of those potential implementations. Neither of these two potential future features are particularly urgent or important — the first one is just something that'd be nice to have but definitely isn't required, while the second one is something that I would really like to implement but can do without if I need to.

import urllib.request
import urllib
import json
import requests
import time
import glob, os
import mwparserfromhell
import re
import pymysql.cursors
import sys

# Set up session object for all requests
s = requests.Session() 
# NOTE: All cookies received will be stored in the session object

headers = {
	'User-Agent': 'enwiki InfoboxBot power/dam infobox editor by Garzfoth, v1.2b' # UPDATE WITH CHANGES
}

######################################################################################
# ==> REMEMBER TO UPDATE THE DATABASE STRING IF CHANGING THE TEMPLATE SET <==
######################################################################################
database = "infobox_power_station"
#database = "infobox_dam"
database_host = "localhost"
database_user = "" # CONFIDENTIAL (REMOVED)
database_password = "" # CONFIDENTIAL (REMOVED)
database_charset = "utf8mb4"

login_username = "" # CONFIDENTIAL (REMOVED)
login_botpassword = "" # CONFIDENTIAL (REMOVED)

######################################################################################
# ==> REMEMBER TO SET THE KEY / PREVALUE / VALUE / LIMIT <==
######################################################################################
ib_key = "th_fuel_primary"
ib_prevalue = "Natural gas"
ib_value = "[[Natural gas]]"
sql_limit = 5

if sql_limit > 0:
	sql_post = " LIMIT "+str(sql_limit)
else:
	sql_post = ""

connection = pymysql.connect(host=database_host,
							user=database_user,
							password=database_password,
							db=database,
							charset=database_charset,
							cursorclass=pymysql.cursors.DictCursor)
try:
	with connection.cursor() as cursor:
		# Execute SQL
		######################################################################################
		# ==> REMEMBER TO UPDATE THE SQL QUERY IF NECESSARY <==
		######################################################################################
		sql = "SELECT pageid, title, tpl_n FROM `data` WHERE `key` = '"+ib_key+"' AND `value` = '"+ib_prevalue+"'"+sql_post
		cursor.execute(sql)
		result = cursor.fetchall()
finally:
	connection.close()

######################################################################################
# ==> REMEMBER TO UPDATE THE EDIT SUMMARY IF NECESSARY <==
######################################################################################
editsummary = "Automated edit: fixing infobox parameter \""+ib_key+"\""


print("----------")
def print_summary():
	print("database: "+database)
	print("SQL: "+sql)
	print("ib_key: "+ib_key)
	print("ib_prevalue: "+ib_prevalue)
	print("ib_value: "+ib_value)
	print("sql_limit: "+str(sql_limit))
	print("Transformation: "+ib_prevalue+" => "+ib_value)
	print("Edit summary: "+editsummary)
print_summary()
print("----------")
print("If the above values are incorrect, please press Ctrl+C within 10 seconds...")
'''n = 10 # seconds
n = n * 2 # half-second increments
for i in range(n):
	sys.stdout.write('\r')
	j = (i + 1) / n
	sys.stdout.write("[%-20s] %d%%" % ('='*int(20*j), 100*j))
	sys.stdout.flush()
	time.sleep(0.5) # half-second increments
sys.stdout.write("\n")
sys.stdout.flush()'''

no = 10 # seconds
increments = 0.5 # half-second increments
nf = no * (1 / increments)
n = int(nf)
for i in range(1, n):
	sys.stdout.write('\r')
	j = (i + 1) / n
	remaining = no - (i * increments)
	sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*int(20*j), remaining, 100*j))
	sys.stdout.flush()
	time.sleep(increments)
sys.stdout.write('\r')
sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*20, 0.00, 100))
sys.stdout.write("\n")
sys.stdout.flush()
print("Main program starting...")
print("----------")

# Start login process
# Acquire login token
query = {
	"action": "query",
	"format": "json",
	"meta": "tokens",
	"type": "login"
}
encodedquery = urllib.parse.urlencode(query)
baseurl = "https://en.wikipedia.org/w/api.php?"
#print(baseurl+encodedquery)
login1 = s.get(baseurl+encodedquery, headers=headers)
print("Login #1 (login token): " + login1.json()["query"]["tokens"]["logintoken"])
# login1["token"]
# Log in
query = {
	"action": "login",
	"format": "json",
	"lgname": login_username
}
querypost = {
	"action": "login",
	"format": "json",
	"lgname": login_username,
	"lgpassword": login_botpassword,
	"lgtoken": login1.json()["query"]["tokens"]["logintoken"]
}
encodedquery = urllib.parse.urlencode(query)
#print(baseurl+encodedquery)
login2 = s.post("https://en.wikipedia.org/w/api.php", data=querypost, headers=headers)
#print("Login #2: " + login2.json()["login"]["result"])
print(login2.json())
print("----------")

def allow_bots(text, user):
	user = user.lower().strip()
	text = mwparserfromhell.parse(text)
	for tl in text.filter_templates():
		if tl.name.matches(['bots', 'nobots']):
			break
	else:
		return True
	for param in tl.params:
		bots = [x.lower().strip() for x in param.value.split(",")]
		if param.name == 'allow':
			if ''.join(bots) == 'none': return False
			for bot in bots:
				if bot in (user, 'all'):
					return True
		elif param.name == 'deny':
			if ''.join(bots) == 'none': return True
			for bot in bots:
				if bot in (user, 'all'):
					return False
	if (tl.name.matches('nobots') and len(tl.params) == 0):
		return False
	return True

iteration = 0
result_count = len(result)

for item in result:
	iteration = iteration + 1
	print("["+str(iteration)+"/"+str(result_count)+"]: " + item["title"] + " - " + str(item["pageid"]) + " - #" + str(item["tpl_n"]))
	# Acquire content/timestamp for page to edit
	query = {
		"action": "query",
		"format": "json",
		"curtimestamp": 1,
		"prop": "revisions",
		"pageids": item["pageid"],
		"rvprop": "content|timestamp"
	}
	encodedquery = urllib.parse.urlencode(query)
	#print(baseurl+encodedquery)
	response = s.get(baseurl+encodedquery, headers=headers)
	if allow_bots(response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["*"], "InfoboxBot"):
		print("Bot edits allowed!")
		wikicode = mwparserfromhell.parse(response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["*"])
		templates = wikicode.filter_templates()
		#tpl = next(x for x in templates if x.startswith("{{Infobox power station") or x.startswith("{{infobox power station") or x.startswith("{{Infobox power plant") or x.startswith("{{infobox power plant") or x.startswith("{{Infobox wind farm") or x.startswith("{{infobox wind farm") or x.startswith("{{Infobox nuclear power station") or x.startswith("{{infobox nuclear power station"))
		if (database == "infobox_power_station"):
			#tpl = next(x for x in templates if x.name.matches(["Infobox power station", "Infobox power plant", "Infobox wind farm", "Infobox nuclear power station"]))
			ibx_match_array = ["Infobox power station", "Infobox power plant", "Infobox wind farm", "Infobox nuclear power station"]
		elif (database == "infobox_dam"):
			#tpl = next(x for x in templates if x.name.matches(["Infobox dam", "Infobox hydroelectric power station"]))
			ibx_match_array = ["Infobox dam", "Infobox hydroelectric power station"]
		tpl_iter = 0
		for tpl in templates:
			if tpl.name.matches(ibx_match_array):
				if (tpl_iter == item["tpl_n"]):
					# This is the one we want to edit
					#length = len(tpl.params)
					#print("Params found: "+str(length))
					tpl.add(ib_key, ib_value)
					#print(str(wikicode))
					# response.json()["curtimestamp"]
					# response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["timestamp"]
					# Obtain CSRF token
					query = {
						"action": "query",
						"format": "json",
						"meta": "tokens",
						"type": "csrf"
					}
					encodedquery = urllib.parse.urlencode(query)
					csrf = s.get(baseurl+encodedquery, headers=headers)
					# Make the edit (but first, some useful comments)
					# https://www.mediawiki.org/wiki/API:Assert
					# assert = "user" / "bot" ==> request will fail if user is not logged in / if user is not a bot
					# https://www.mediawiki.org/wiki/API:Edit
					# summary = "edit summary"
					# minor = 1 ==> marks the edit as minor
					# notminor = 1 ==> "If set, don't mark the edit as minor, even if you have the "Mark all my edits minor by default" preference enabled"
					# bot = 1 ==> marks the edit as bot-made
					querypost = {
						"action": "edit",
						"assert": "user",
						"format": "json",
						"pageid": item["pageid"],
						"text": str(wikicode),
						"summary": editsummary,
						"minor": 1,
						"basetimestamp": response.json()["query"]["pages"][str(item["pageid"])]["revisions"][0]["timestamp"],
						"starttimestamp": response.json()["curtimestamp"],
						"nocreate": 1,
						"watchlist": "nochange",
						"token": csrf.json()["query"]["tokens"]["csrftoken"]
					}
					finalresult = s.post("https://en.wikipedia.org/w/api.php", data=querypost, headers=headers)
					print(finalresult)
					# Connect to the database
					connection = pymysql.connect(host=database_host,
												user=database_user,
												password=database_password,
												db=database,
												charset=database_charset,
												cursorclass=pymysql.cursors.DictCursor)
					try:
						with connection.cursor() as cursor:
							# Update the record
							sql = "UPDATE `data` SET `value` = %s WHERE `pageid` = %s AND `title` = %s AND `tpl_n` = %s AND `key` = %s"
							cursor.execute(sql, (ib_value, item["pageid"], item["title"], item["tpl_n"], ib_key))
						# connection is not autocommit by default. So you must commit to save your changes.
						connection.commit()
					finally:
						connection.close()
					print("Updated in DB.")
					if iteration != result_count:
						# Let's not kill the Wikipedia API
						#time.sleep(10)
						no = 10 # seconds
						increments = 0.5 # half-second increments
						nf = no * (1 / increments)
						n = int(nf)
						for i in range(1, n):
							sys.stdout.write('\r')
							j = (i + 1) / n
							remaining = no - (i * increments)
							sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*int(20*j), remaining, 100*j))
							sys.stdout.flush()
							time.sleep(increments)
						sys.stdout.write('\r')
						sys.stdout.write("[%-20s] wait %.2f seconds (%d%%)" % ('='*20, 0.00, 100))
						sys.stdout.write("\n")
						sys.stdout.flush()
					break
				tpl_iter += 1
	else:
		print("Bot edits not allowed!")
print("----------")
print("Done!")
print("----------")
print_summary()
print("----------")