运营需要查询近15天和近30天的订单信息并定期发送至,数据在kibana里,所以需要从elasticsearch5里查询,于是我用python3开始搞来搞去,15天没问题,30天出问题了,订单数量大于10000条,返回报错,于是开始百度。
百度上的回答全是复制粘贴,真特么坑。。。
于是我发现elasticseatch5最大只能返回10000条数据,大于10000条数据就要改参数。
例如:
# 修改索引查询最大只能返回10000条数据的限制, 此时修改为300万。 PUT grade/_settings { "index.max_result_window": 3000000 }
又例如:
# 在创建索引的时候加上 { "settings": { "index": { "max_result_window": 10000000000 } } }
但这些并不适合我,因为elasticsearch我不会用,我小白运维,不想改参数,在创建索引时加参数有不适合我,还有加一些奇奇怪怪的参数的,也都不好使,于是我就请教了公司的大佬;
大佬说:
elasticsearch里有自增id,也有创建时间,可以先指定时间范围然后分页查询500条,id倒叙,取第一个id,这个id是结束ID;
PS:这里可以判断上一句话里的查询结果,如果小于500条,就代表一次性取完了,就可以return了。
然后指定时间范围分页查询500条,id正叙,取第一个id,这个id是开始ID;
得到这两个id,就可以进行id范围查询,每次查询完成加起始id加500,然后继续查询,直到返回结果小于500条。
数据已脱敏,可能会看起来怪怪的
main.py
# coding: utf-8 import csv,sys,pymysql,copy,os,time from elasticsearch5 import Elasticsearch from pymysql.cursors import DictCursor from pub import sendEmail,mail_HTML,shell_ini,time_ini,es_sql,es_ini base_dir = sys.path[0] + "/data/" es = Elasticsearch( # es连接设置 ['********'], http_auth=('********', '********'), port=9200, use_ssl=False ) result = es.search() def main(index=None,keys=None,filename=None,day=1,title=None,wheres={},other=False): # 主函数 各种操作的集合地 if not index:return("Please input index!") if not keys:return("Please input keys!") if not filename:return("Please input filename!") print(filename) days = time_ini(date=day) # 获取日期用于创建目录 dir = base_dir + days["start_day"] + "_" + days["stop_day"] + "/" # 带日期的绝对路径 os.makedirs(dir, mode=493, exist_ok=True) # 创建目录 mode 权限 八进制 493=755 511=777 time_data = time_ini(day=day) # 获取毫秒级时间戳 sql_data = es_sql(keys=keys,time=time_data,wheres=wheres) # 获取处理好的es语句 resp = es_ini(type="search",index=index,connect=es,data=sql_data) # 获取es查询到的数据 new_title = copy.deepcopy(keys) # 从keys深拷贝数据做csv标题 if title: # 判断是否手动添加title for i in title: # 循环添加title new_title.append(i) # 判断是否指定csv标题 csv_filename = dir + filename # csv文件的绝对路径 用于判断是否有数据 file_csv = open(csv_filename, 'a', encoding='utf_8_sig') # 打开一个文件 csv_writer = csv.writer(file_csv) # 实例化csv if not os.path.getsize(csv_filename): # 判断文件是否有数据 有的话不用再次写入标题 csv_writer.writerow(new_title) # 写入csv标题 for i in resp: # 开始循环数据 tmp_list = [] # csv直接收一个列表参数 for k in keys: # 开始循环key k = i["_source"].get(k) # 从数据中得到单个key tmp_list.append(k) # 将单个key写入到临时列表 if index == "****": # 特殊操作 微调 用于添加字段 where_name = wheres["match_phrase"]["tablename"]["query"] if where_name == "****": tmp_list.append(2) elif where_name == "****": tmp_list.append(1) # print(index,tmp_list[0]) csv_writer.writerow(tmp_list) # 循环keys结束后 得到全部到key 开始写入csv文件 writerow 只接收受一个列表 print("==========") file_csv.close() # 关闭csv 一个py文件中多次操作csv 可能导致文件写入无数据 怀疑是文件句柄的原因 def ****(day=1): # 单个表的操作 keys = [ 'id', '****', '****', ] main(index="****",keys=keys,filename="****.csv",day=day) def ****(day=1): # 单个表的操作 keys = [ 'id', '****', '****', ] main(index="****",keys=keys,filename="****.csv",day=day) def ****(day=1): # 单个表的操作 keys = [ 'id', '****', '****', ] wheres = { # 短句匹配 "match_phrase": { "tablename": { "query": "****" } } } main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"]) def ****(day=1): # 单个表的操作 keys = [ 'id', '****', '****', ] wheres = { # 短句匹配 "match_phrase": { "tablename": { "query": "****" } } } main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"]) def ****(day=1): # 单个表的操作 keys = [ 'id', '****', '****', ] main(index="****",keys=keys,filename="****.csv",day=day) def zip_ini(day=1): # 压缩文件 days = time_ini(date=day) day_dir = days["start_day"] + "_" + days["stop_day"] yum_command = "yum -y install zip unzip" # 压缩命令 if not shell_ini(yum_command).get(0): print("ZIP Package Installation Failed.") # 安装zip包 zip_command = "cd {} && zip -Dr {}.zip ./{}".format(base_dir,day_dir,day_dir) zip_com = shell_ini(zip_command).get(0) # get(0) 如果有0的key为真 否则返回None if zip_com: print("压缩成功.") zip_dir = base_dir + day_dir ls_command = "ls {}".format(zip_dir) ls_com = shell_ini(ls_command).get(0) if ls_com: resp_data = {} for i in ls_com: file_dir = zip_dir + "/" + i file_count = int(len(open(file_dir,'r').readlines())) - 1 # 统计文件行数 -1去掉标题 resp_data[i] = file_count return({base_dir + day_dir + ".zip":resp_data}) # 返回字典 {绝对路径的文件名:{表名:行数}} else: return("error of if zip.") else: print("压缩失败.") tmp_str = "" for i in shell_ini(zip_command).values(): for ii in i:tmp_str += ii print(tmp_str) return(None) def sendmail(day=1): ****(day=day) ****(day=day) ****(day=day) ****(day=day) ****(day=day) ret = zip_ini(day) user_mail_list = ["运营的邮箱@xxxx.com"] for k,v in ret.items(): sendEmail(user_mail_list,"title",htmlText=mail_HTML(data=v),appenfile=k) if __name__ == '__main__': start_time = int(time.time()) try: if len(sys.argv) == 2: day = sys.argv[1] print("day=",day) sendmail(day=int(day)) print("Mail Sent.") else: print("Unknown Operation.") except: print("Unknown Error.") end_time = int(time.time()) print("Time: ",end_time - start_time)
pub.py
# encoding:utf-8 import time import datetime import subprocess import os from datetime import timedelta def time_ini(day=None,date=None): # 获取毫秒级时间戳 返回字典 开始时间和结束时间 if day: fn = day elif date: fn = date new_str_date = datetime.date.today() old_str_date = new_str_date - timedelta(days=fn) if date:return({"start_day":str(old_str_date),"stop_day":str(new_str_date)}) new_unix_date = round(time.mktime(new_str_date.timetuple()) * 1000) old_unix_date = round(time.mktime(old_str_date.timetuple()) * 1000) return({"start_time":old_unix_date,"stop_time":new_unix_date}) def es_ini(type=None,index=None,connect=None,data=None): # es 查询操作 返还原始数据 """ type = string("search") data = { "query":{ # 查询语句 "range":{ # 范围查询 "created_at":{ # 指定字段 "gte":1672545600000, # gte 大于或等于;gt 大于 "lte":1672549999999, # lte 小于或等于;lt 小于 "format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳 } } } } index = index connect = elasticsearch instantiation """ if not connect:return("Please input elasticsearch instantiation!") if type == "search": ret_list = [] # return 返回列表 data.update({"sort":[{"id":{"order":"desc"}}]}) # 更改es语句用于获取 结束id result = connect.search(index=index,body=data) # 倒序返回数据500条 result_list = result["hits"]["hits"] # 获取 数据列表 end_id = int(result_list[0]["sort"][0]) # 获取 结束id if len(result_list) < 500:return(result_list) # 判断返回数据条数 如果小于500 表示只有这些 直接return data.update({"sort":[{"id":{"order":"asc"}}]}) # 更改es语句用于获取 开始id result = connect.search(index=index,body=data) # 正序返回数据500条 ret_list += result["hits"]["hits"] # 获取 数据列表 for i in range(1,40): # 最多返回2万条数据 print("当前条数",i*500,"+") start_id = int(ret_list[-1]["sort"][0]) # return 列表存的数据是正序 获取最后一条数据的id作为start_id es_range = { # 更改查询es 从 时间范围查询 变为 id 范围查询 "range":{ # 范围查询 "id":{ # 指定字段 "gte":start_id + 1, # 用了大于等于 所以需要+1 防止重复;gte 大于或等于;gt 大于 "lte":end_id, # lte 小于或等于;lt 小于 "format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳 } } } data["query"]["bool"]["must"][-1] = es_range # 更改es range查询条件 result = connect.search(index=index,body=data) result_list = result["hits"]["hits"] ret_list += result_list if len(result_list) < 500: print("总条数",len(ret_list)) return(ret_list) # 返回列表 return(None) # for 循环出错 else: return("Unknown operation!") def shell_ini(command=None): if not command: return("Please input command.") res = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=300, executable='/bin/bash') tmp_list = [] for i in res.stdout.replace("\n",",").strip(",").split(","): tmp_list.append(i.strip("\t")) return({res.returncode:tmp_list}) def sendEmail(user_list=["默认邮箱"],subject=None,htmlText=None,plainText=None,appenfile=None): import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart strTo = ','.join(user_list) server = "smtp.exmail.qq.com" #设置服务器 user = "****" #用户名 passwd = "****" #口令 strFrom = '****' # 设定root信息 msgRoot = MIMEMultipart('related') msgRoot['Subject'] = subject msgRoot['From'] = strFrom msgRoot['To'] = strTo msgAlternative = MIMEMultipart('alternative') msgRoot.attach(msgAlternative) #设定纯文本信息 if plainText: msgText = MIMEText(plainText, 'plain', 'utf-8') msgAlternative.attach(msgText) #设定HTML信息 if htmlText: msgText = MIMEText(htmlText, 'html', 'utf-8') msgAlternative.attach(msgText) #添加附件 if appenfile: filename = os.path.split(appenfile) att1 = MIMEText(open(appenfile, 'rb').read(), 'base64', 'utf-8') att1["Content-Type"] = 'application/octet-stream' # 这里的filename可以任意写,写什么名字,邮件中显示什么名字 att1["Content-Disposition"] = 'attachment; filename="{}"'.format(filename[-1]) msgAlternative.attach(att1) #发送邮件 smtp = smtplib.SMTP() smtp.connect(server) smtp.login(user, passwd) smtp.sendmail(strFrom,user_list,msgRoot.as_string()) smtp.quit() def mail_HTML(data): html = """ <table border="1"> <tr> <th>表名称</th> <th>数量</th> <th>描述</th> </tr> """ for k,v in data.items(): msg = """ <tr> <td>{tablename}</td> <td>{count}</td> <td>{desc}</td> </tr> """ c = "" if k == "****.csv": c = "****" elif k == "****.csv": c = "****" elif k == "****.csv": c = "****" elif k == "****.csv": c = "****" msg = msg.format( tablename = k, count = v, desc = c, ) html += msg html += "</table>" return html def es_sql(keys=None,time=None,wheres={}): # 查询语句 传入需要查询的key和毫秒级时间 if not keys: return("Please input keys.") elif not time: return("Plesae input time") else: data = { "version":True, # 显示版本 "from":0, # 分页 "size":500, # 分页 "sort":[ # 排序 { "id":{ # 默认 desc 递减;asc 递增 "order":"asc" } } ], "query":{ # 查询语句 "bool":{ # 布尔查询 "must":[ # 布尔查询下的文档匹配查询;返回的文档必须满足 must 子句的条件,并且参与计算分值 { "match_all":{ # 匹配所有文档,给它们的_score都是1.0;_score可以通过boost参数改变: } }, wheres , { "range":{ # 范围查询 "created_at":{ # 指定字段 "gte":time["start_time"], # gte 大于或等于;gt 大于 "lte":time["stop_time"], # lte 小于或等于;lt 小于 "format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳 } } } ], "must_not":[ # 布尔查询下的文档匹配查询;不会计算分数,并且会使用缓存。 ] } }, "_source":{ # 默认 includes/excludes 只显示/不显示;当你不需要看到整个文档时,单个字段可以从 _source 字段提取和通过 get 或者 search 请求返回 "includes":keys, "excludes":[ ] }, # "aggs":{ # 聚合查询 # "uid":{ # 聚合名字 任意 # "terms":{ # 度量运算的方式(划分桶的方式) # "field":"id", # 度量运算的字段(划分桶的字段) # "min_doc_count":1, # 过滤文档数量小于1的 # }, # "aggs":{ # "last_opentime":{ # "max":{ # "field": "last_opentime" # } # } # } # } # }, "stored_fields":[ # 用于存储没有经过任何分析(without any analysis)的字段值,以实现在查询时能得到这些原始值. "*" ], "script_fields":{ # 基于不同的字段定制一些属性 }, # "docvalue_fields":[ # 将文档字段以文档值机制保存的值返回;文档值机制是非text类型字段支持的一种在硬盘中保存字段原始值的机制 # "created_at", # "last_opentime", # "updated_at" # ], } return(data)
elasticsearch 真好玩 我是真不会 太难了。
写了我一个礼拜,事已至此,实现了15天和30天查询数据并发送邮件到运营邮箱了,可以交工了。