要使用python对wordpress网站进行批量文章发布,最简单的方法是使用Python xmlrpc,详细用法可以见这里:http://www.snailtoday.com/archives/13599
不过,有时候碰到一些特殊的需求,使用Python xmlrpc无法满足我们的需求,这时需要直接操作wordpress数据库。
它需要牵涉到wordrpess的下面四张表,
整个逻辑学是这样的:
1.wp_posts生成文章的ID,
2.wp_terms这个表中生成term_id
3.将term_id写入到wp_term_taxonomy这个表中,获得term_taxonomy_id,如果原来的term_id已经在这个表中,那么就不要插入,直接查询它的term_taxonomy_id就可以了。
4.将文章id,term_taxonomy_id写入到wp_term_relationships这张表中,实现文章与标签的连接,一个文章id可以对应多个term_taxonomy_id。
参考代码:
# coding:utf-8 """ 这个脚本本地测试用 """ import pymysql import datetime,time,re,sys,random import requests import json import traceback from google_translate import Translate from my_logging import Mylogging from slugify import slugify #生成slug mylog = Mylogging() con = pymysql.connect(host='localhost',port=3306, user='root', password='', database='stackover_for_test', charset='utf8') cursor = con.cursor() con_server = pymysql.connect(host='localhost',port=3306, user='wp04', password='', database='wp04', charset='utf8') cursor_server = con_server.cursor() import sqlite3 #记录写入的数据 connection=sqlite3.connect('stackoverflow.db') print('Opened database successfully') cur = connection.cursor() cur.execute('CREATE TABLE IF NOT EXISTS posted_to_db(id INTEGER PRIMARY KEY,posted_ID INT,post_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL)') #插入有逗号的内容,用两个单引号替换一个单引号 print('Table created successfully') connection.commit() updatetime=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) def sql_w(sql,val): #向数据库写入数据,并返回最后一条数据记录的id。 cur = con_server.cursor() try: cur.execute(sql,val) tag_id = cur.lastrowid con_server.commit() except Exception as e: con_server.rollback() mylog.write_log("写入数据库报错Error!!!,报错信息为{}".format(e)) tag_id = 0 mylog.write_log(e) mylog.write_log("报错的SQL语句为:{}".format(sql)) # sys.exit() return tag_id def sql_r_num(sql): #查询一条数据,并返回这条数据。 cur = con.cursor() cur.execute(sql) data = cur.fetchone() return data[0] def get_data_from_db(stackoverflow_id): #通过ID从数据库中找到问题的相关数据 cursor.execute('''select * from Posts where id=%s and PostTypeId = 1''' % (stackoverflow_id)) #PostTypeId = 1 代表问题 results = cursor.fetchall() if len(results) > 0: return results else: return None def get_answers_from_db(stackoverflow_id): #通过问题的id,在post表中找到答案的那些记录,并返回。 cursor.execute('''select * from Posts where ParentId = %s''' % (stackoverflow_id)) answer_results = cursor.fetchall() return answer_results def split_tags(tags): #正则拆分tags,将“<svn><tortoisesvn><branch>” 变成['svn','branch']并返回。 req = r'<(.*?)>' #括号内的是要抓取的内容。如果没找到,返回[] content = re.findall(req,tags) #不加[0]是列表 if len(content) >0: return content else: return None def get_tag_result(taglist): """插入tgas,并返回tags的id""" tag_result_list = [] for tag in taglist: tag_id = insert_tag(tag) if tag_id != 0: tag_result_list.append(tag_id) # print("success!") return tag_result_list def get_code(body): #利用正则获取文章中的代码,并存入到一个列表中。 #将文章中的代码替换为<code>1</code>的形式. req_code = r'<code>.*?</code>' req_code = re.compile(req_code,re.S) #多行匹配 mycode = re.findall(req_code,body)#不加[0]是列表 # my_dict = {} num = 1 my_list = [] for i in mycode: new_code = "<code>{}</code>".format(num) my_list.append(i) body = body.replace(i,new_code) num+=1 return my_list, body def recover(my_list,body): #将翻译后的文章,代码替换回来 num = 1 for i in my_list: new_code = "<code>{}</code>".format(num) body = body.replace(new_code,i) num += 1 return body def insert_tag(tag): #先判断ask_tags表中是否已存在tag,不存在则将tag插入到表中并返回插入后的ID,存在则返回tag的ID. cursor_server.execute("select * from wp_terms where name = '%s'" %(tag)) results = cursor_server.fetchone() if results == None: mylog.write_log('数据库中没有此tag的记录') sql_users = '''INSERT INTO wp_terms (name,slug,term_group) VALUES ("%s","%s",%s)''' % ( tag,#name tag,#slug 0,#term_group ) print("xxxxxxxxxxx",sql_users) try: cursor_server.execute(sql_users) tag_id = cursor_server.lastrowid con_server.commit() mylog.write_log('tag写入数据库成功!ID为{}'.format(tag_id)) except Exception as e: con_server.rollback() traceback.print_exc() mylog.write_log('tag写入数据库失败!内容为{}'.format(e)) tag_id = 0 return tag_id else: mylog.write_log("数据库中已经有此tag的记录,id为{}".format(results[0])) return results[0] def insert_wp_term_relationships(articleid,tagid): #将文章id,tagid写入到wp_term_relationships关联表 for tag in tagid: sql ="INSERT INTO wp_term_relationships(object_id,term_taxonomy_id,term_order) VALUES (%s,%s,%s)"%(articleid,tag,0) #最前面的引号要变成双引号,ignore表示忽略重复数据,不过先要设定索引 print("----------",sql) try: cursor_server.execute(sql) tag_id = cursor_server.lastrowid con_server.commit() mylog.write_log('articleid为{}, tagid为{},写入数据库成功!ID为{}'.format(articleid,tag,tag_id)) except: cursor_server.rollback() traceback.print_exc() mylog.write_log('articleid, tagid写入数据库失败!') tag_id = 0 def insert_wp_term_taxonomy(tagid): #将tagid写入到wp_term_taxonomy表 my_list = [] for tag in tagid: #先判断wp_term_taxonomy表中是否已存在tag,不存在则将tag插入到表中并返回插入后的ID,存在则返回tag的ID. cursor_server.execute("select * from wp_term_taxonomy where term_id = '%s'" %(tag)) results = cursor_server.fetchone() if results == None: mylog.write_log('数据库中没有此tag的记录') sql ="INSERT INTO wp_term_taxonomy(term_id,taxonomy,description,parent,count) VALUES (%s,'%s','','0','1')"%(tag,'question_tag') #最前面的引号要变成双引号,ignore表示忽略重复数据,不过先要设定索引 print("----------",sql) try: cursor_server.execute(sql) tag_id = cursor_server.lastrowid con_server.commit() my_list.append(tag_id) mylog.write_log('term_id{},写入数据库成功!ID为{}'.format(tag,tag_id)) except: cursor_server.rollback() traceback.print_exc() mylog.write_log('tagid, tagid写入数据库失败!') tag_id = 0 return my_list def get_id_from_db(question_id): #查看数据库中是否已经存在url cur.execute('''select * from posted_to_db where posted_ID='%s' ''' % (question_id)) results = cur.fetchall() if len(results) > 0: return results else: return None def insert_result_to_db(question_id): #将发布的问题的id写入本地数据库,备查。 sql ="INSERT INTO posted_to_db(posted_ID) VALUES (%s)" % (question_id) # cur = con.cursor() try: cur.execute(sql) tag_id = cur.lastrowid connection.commit() mylog.write_log("将发布成功后的问题id插入本地数据库成功,问题ID为{}".format(question_id)) except Exception as e: connection.rollback() mylog.write_log('id为{}写入数据库失败!'.format(question_id)) mylog.write_log("写入数据库报错Error!!!") mylog.write_log(e) mylog.write_log("报错的SQL语句为:{}".format(sql)) tag_id = 0 # sys.exit() return tag_id def insert_question(db): #将问题写入数据库 for row in db: answerCount =row[2] body = row[3] OwnerUserId = row[13] #作者ID tags = row[17] tags = split_tags(tags) #转化成列表 tag_id_result = get_tag_result(tags) mylog.write_log("文章的tags为{}".format(tags)) body =body.replace('"','\'') body =body.replace("<blockquote>","<blockquote class ='notranslate'>") my_list = get_code(body)[0] body = get_code(body)[1] t1 = Translate(body) body = t1.get_chinese_conetent() body = body.replace("</ a>","</a>") time.sleep(5) body = recover(my_list,body) title = row[18] title = title.replace("('","") title = title.replace(",)","") # title = title.replace('"','\''),#title mylog.write_log("原标题是:{}".format(title)) slug = slugify(title) t2 = Translate(title) title = t2.get_chinese_conetent() mylog.write_log("新标题是:{}".format(title)) answer_results = get_answers_from_db(stackoverflow_id) num = 1 if answer_results: final_answer = "" for row in answer_results: if num < 11: answerCount =row[2] answer = row[3] my_list = get_code(answer)[0] #将文章中的代码块取出来 answer = get_code(answer)[1] #将文章中的body取出来 t3 = Translate(answer) answer = t3.get_chinese_conetent() #将body内容翻译成中文 answer = answer.replace("</ a>","</a>") if len(answer) > 0: answer = recover(my_list,answer) #如果调用翻译接口没有出错,有返回结果,则将body内容中的代码还原回去。 body = body + "<h5>answer</h5>" + answer print("新建的回答内容是:",body) print("*"*180) time.sleep(35) num +=1 mylog.write_log("答案获取完毕!") mylog.write_log("-"*80) # else: if title is not None: sql_ask_questions = '''INSERT INTO wp_posts (post_author,post_date,post_date_gmt,post_content,post_title,post_excerpt,post_status,comment_status,ping_status,post_password,post_name,to_ping,pinged,post_modified,post_modified_gmt,post_content_filtered,post_parent,guid,menu_order,post_type,post_mime_type,comment_count) VALUES (%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' val = ( 1,#post_author str(updatetime),#post_date str(updatetime),#post_date_gmt str(body),#post_content str(title),#post_title '',#post_excerpt 'publish',#post_status 'open',#comment_status 'open',#ping_status '',#post_password str(slug),#post_name '',#to_ping '',#pinged str(updatetime),#post_modified str(updatetime),#post_modified_gmt '',#post_content_filtered 0,#post_parent 'http://localhost/wp04/questions/question//',#guid 0,#menu_order 'question',#post_type '',#post_mime_type 0,#comment_count ) question_id = sql_w(sql_ask_questions,val) #写入问题 print("question id is ..............",question_id) if question_id == 0: print("questio id 为0,不插入啦!!!!!!!!!!!!!!!------------------------------") else: term_taxonomy_id_list = insert_wp_term_taxonomy(tag_id_result) insert_wp_term_relationships(question_id,term_taxonomy_id_list) mylog.write_log("问题写入完毕!") mylog.write_log("写入问题的ID为{}".format(question_id)) mylog.write_log("*"*70) else: question_id = 0 return (title) begin_num = int(sys.argv[1]) end_num = int(sys.argv[2]) for i in range(begin_num,end_num): mylog.write_log("正在处理id为{}的记录......".format(i)) stackoverflow_id = i db_record = get_id_from_db(stackoverflow_id) if db_record is None: mylog.write_log("开始抓取id为{}的数据".format(stackoverflow_id)) db_results = get_data_from_db(stackoverflow_id) if db_results is not None: insert_question_results = insert_question(db_results) else: mylog.write_log("id为{}的数据不存在。".format(i)) else: mylog.write_log('当前id为{}的文章之前已经发布过,略过......'.format(stackoverflow_id)) mylog.write_log('-'*70) connection.close() cursor_server.close() cursor.close()