运营需要查询近15天和近30天的订单信息并定期发送至,数据在kibana里,所以需要从elasticsearch5里查询,于是我用python3开始搞来搞去,15天没问题,30天出问题了,订单数量大于10000条,返回报错,于是开始百度。
百度上的回答全是复制粘贴,真特么坑。。。
于是我发现elasticseatch5最大只能返回10000条数据,大于10000条数据就要改参数。
例如:
# 修改索引查询最大只能返回10000条数据的限制, 此时修改为300万。
PUT grade/_settings
{
"index.max_result_window": 3000000
}
又例如:
# 在创建索引的时候加上
{
"settings": {
"index": {
"max_result_window": 10000000000
}
}
}
但这些并不适合我,因为elasticsearch我不会用,我小白运维,不想改参数,在创建索引时加参数有不适合我,还有加一些奇奇怪怪的参数的,也都不好使,于是我就请教了公司的大佬;
大佬说:
elasticsearch里有自增id,也有创建时间,可以先指定时间范围然后分页查询500条,id倒叙,取第一个id,这个id是结束ID;
PS:这里可以判断上一句话里的查询结果,如果小于500条,就代表一次性取完了,就可以return了。
然后指定时间范围分页查询500条,id正叙,取第一个id,这个id是开始ID;
得到这两个id,就可以进行id范围查询,每次查询完成加起始id加500,然后继续查询,直到返回结果小于500条。
数据已脱敏,可能会看起来怪怪的
main.py
# coding: utf-8
import csv,sys,pymysql,copy,os,time
from elasticsearch5 import Elasticsearch
from pymysql.cursors import DictCursor
from pub import sendEmail,mail_HTML,shell_ini,time_ini,es_sql,es_ini
base_dir = sys.path[0] + "/data/"
es = Elasticsearch( # es连接设置
['********'],
http_auth=('********', '********'),
port=9200,
use_ssl=False
)
result = es.search()
def main(index=None,keys=None,filename=None,day=1,title=None,wheres={},other=False): # 主函数 各种操作的集合地
if not index:return("Please input index!")
if not keys:return("Please input keys!")
if not filename:return("Please input filename!")
print(filename)
days = time_ini(date=day) # 获取日期用于创建目录
dir = base_dir + days["start_day"] + "_" + days["stop_day"] + "/" # 带日期的绝对路径
os.makedirs(dir, mode=493, exist_ok=True) # 创建目录 mode 权限 八进制 493=755 511=777
time_data = time_ini(day=day) # 获取毫秒级时间戳
sql_data = es_sql(keys=keys,time=time_data,wheres=wheres) # 获取处理好的es语句
resp = es_ini(type="search",index=index,connect=es,data=sql_data) # 获取es查询到的数据
new_title = copy.deepcopy(keys) # 从keys深拷贝数据做csv标题
if title: # 判断是否手动添加title
for i in title: # 循环添加title
new_title.append(i) # 判断是否指定csv标题
csv_filename = dir + filename # csv文件的绝对路径 用于判断是否有数据
file_csv = open(csv_filename, 'a', encoding='utf_8_sig') # 打开一个文件
csv_writer = csv.writer(file_csv) # 实例化csv
if not os.path.getsize(csv_filename): # 判断文件是否有数据 有的话不用再次写入标题
csv_writer.writerow(new_title) # 写入csv标题
for i in resp: # 开始循环数据
tmp_list = [] # csv直接收一个列表参数
for k in keys: # 开始循环key
k = i["_source"].get(k) # 从数据中得到单个key
tmp_list.append(k) # 将单个key写入到临时列表
if index == "****": # 特殊操作 微调 用于添加字段
where_name = wheres["match_phrase"]["tablename"]["query"]
if where_name == "****":
tmp_list.append(2)
elif where_name == "****":
tmp_list.append(1)
# print(index,tmp_list[0])
csv_writer.writerow(tmp_list) # 循环keys结束后 得到全部到key 开始写入csv文件 writerow 只接收受一个列表
print("==========")
file_csv.close() # 关闭csv 一个py文件中多次操作csv 可能导致文件写入无数据 怀疑是文件句柄的原因
def ****(day=1): # 单个表的操作
keys = [
'id',
'****',
'****',
]
main(index="****",keys=keys,filename="****.csv",day=day)
def ****(day=1): # 单个表的操作
keys = [
'id',
'****',
'****',
]
main(index="****",keys=keys,filename="****.csv",day=day)
def ****(day=1): # 单个表的操作
keys = [
'id',
'****',
'****',
]
wheres = { # 短句匹配
"match_phrase": {
"tablename": {
"query": "****"
}
}
}
main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"])
def ****(day=1): # 单个表的操作
keys = [
'id',
'****',
'****',
]
wheres = { # 短句匹配
"match_phrase": {
"tablename": {
"query": "****"
}
}
}
main(index="****",keys=keys,filename="****.csv",day=day,wheres=wheres,title=["src"])
def ****(day=1): # 单个表的操作
keys = [
'id',
'****',
'****',
]
main(index="****",keys=keys,filename="****.csv",day=day)
def zip_ini(day=1): # 压缩文件
days = time_ini(date=day)
day_dir = days["start_day"] + "_" + days["stop_day"]
yum_command = "yum -y install zip unzip" # 压缩命令
if not shell_ini(yum_command).get(0): print("ZIP Package Installation Failed.") # 安装zip包
zip_command = "cd {} && zip -Dr {}.zip ./{}".format(base_dir,day_dir,day_dir)
zip_com = shell_ini(zip_command).get(0) # get(0) 如果有0的key为真 否则返回None
if zip_com:
print("压缩成功.")
zip_dir = base_dir + day_dir
ls_command = "ls {}".format(zip_dir)
ls_com = shell_ini(ls_command).get(0)
if ls_com:
resp_data = {}
for i in ls_com:
file_dir = zip_dir + "/" + i
file_count = int(len(open(file_dir,'r').readlines())) - 1 # 统计文件行数 -1去掉标题
resp_data[i] = file_count
return({base_dir + day_dir + ".zip":resp_data}) # 返回字典 {绝对路径的文件名:{表名:行数}}
else:
return("error of if zip.")
else:
print("压缩失败.")
tmp_str = ""
for i in shell_ini(zip_command).values():
for ii in i:tmp_str += ii
print(tmp_str)
return(None)
def sendmail(day=1):
****(day=day)
****(day=day)
****(day=day)
****(day=day)
****(day=day)
ret = zip_ini(day)
user_mail_list = ["运营的邮箱@xxxx.com"]
for k,v in ret.items():
sendEmail(user_mail_list,"title",htmlText=mail_HTML(data=v),appenfile=k)
if __name__ == '__main__':
start_time = int(time.time())
try:
if len(sys.argv) == 2:
day = sys.argv[1]
print("day=",day)
sendmail(day=int(day))
print("Mail Sent.")
else:
print("Unknown Operation.")
except:
print("Unknown Error.")
end_time = int(time.time())
print("Time: ",end_time - start_time)
pub.py
# encoding:utf-8
import time
import datetime
import subprocess
import os
from datetime import timedelta
def time_ini(day=None,date=None): # 获取毫秒级时间戳 返回字典 开始时间和结束时间
if day:
fn = day
elif date:
fn = date
new_str_date = datetime.date.today()
old_str_date = new_str_date - timedelta(days=fn)
if date:return({"start_day":str(old_str_date),"stop_day":str(new_str_date)})
new_unix_date = round(time.mktime(new_str_date.timetuple()) * 1000)
old_unix_date = round(time.mktime(old_str_date.timetuple()) * 1000)
return({"start_time":old_unix_date,"stop_time":new_unix_date})
def es_ini(type=None,index=None,connect=None,data=None): # es 查询操作 返还原始数据
"""
type = string("search")
data = {
"query":{ # 查询语句
"range":{ # 范围查询
"created_at":{ # 指定字段
"gte":1672545600000, # gte 大于或等于;gt 大于
"lte":1672549999999, # lte 小于或等于;lt 小于
"format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
}
}
}
}
index = index
connect = elasticsearch instantiation
"""
if not connect:return("Please input elasticsearch instantiation!")
if type == "search":
ret_list = [] # return 返回列表
data.update({"sort":[{"id":{"order":"desc"}}]}) # 更改es语句用于获取 结束id
result = connect.search(index=index,body=data) # 倒序返回数据500条
result_list = result["hits"]["hits"] # 获取 数据列表
end_id = int(result_list[0]["sort"][0]) # 获取 结束id
if len(result_list) < 500:return(result_list) # 判断返回数据条数 如果小于500 表示只有这些 直接return
data.update({"sort":[{"id":{"order":"asc"}}]}) # 更改es语句用于获取 开始id
result = connect.search(index=index,body=data) # 正序返回数据500条
ret_list += result["hits"]["hits"] # 获取 数据列表
for i in range(1,40): # 最多返回2万条数据
print("当前条数",i*500,"+")
start_id = int(ret_list[-1]["sort"][0]) # return 列表存的数据是正序 获取最后一条数据的id作为start_id
es_range = { # 更改查询es 从 时间范围查询 变为 id 范围查询
"range":{ # 范围查询
"id":{ # 指定字段
"gte":start_id + 1, # 用了大于等于 所以需要+1 防止重复;gte 大于或等于;gt 大于
"lte":end_id, # lte 小于或等于;lt 小于
"format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
}
}
}
data["query"]["bool"]["must"][-1] = es_range # 更改es range查询条件
result = connect.search(index=index,body=data)
result_list = result["hits"]["hits"]
ret_list += result_list
if len(result_list) < 500:
print("总条数",len(ret_list))
return(ret_list) # 返回列表
return(None) # for 循环出错
else:
return("Unknown operation!")
def shell_ini(command=None):
if not command:
return("Please input command.")
res = subprocess.run(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
timeout=300,
executable='/bin/bash')
tmp_list = []
for i in res.stdout.replace("\n",",").strip(",").split(","):
tmp_list.append(i.strip("\t"))
return({res.returncode:tmp_list})
def sendEmail(user_list=["默认邮箱"],subject=None,htmlText=None,plainText=None,appenfile=None):
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
strTo = ','.join(user_list)
server = "smtp.exmail.qq.com" #设置服务器
user = "****" #用户名
passwd = "****" #口令
strFrom = '****'
# 设定root信息
msgRoot = MIMEMultipart('related')
msgRoot['Subject'] = subject
msgRoot['From'] = strFrom
msgRoot['To'] = strTo
msgAlternative = MIMEMultipart('alternative')
msgRoot.attach(msgAlternative)
#设定纯文本信息
if plainText:
msgText = MIMEText(plainText, 'plain', 'utf-8')
msgAlternative.attach(msgText)
#设定HTML信息
if htmlText:
msgText = MIMEText(htmlText, 'html', 'utf-8')
msgAlternative.attach(msgText)
#添加附件
if appenfile:
filename = os.path.split(appenfile)
att1 = MIMEText(open(appenfile, 'rb').read(), 'base64', 'utf-8')
att1["Content-Type"] = 'application/octet-stream'
# 这里的filename可以任意写,写什么名字,邮件中显示什么名字
att1["Content-Disposition"] = 'attachment; filename="{}"'.format(filename[-1])
msgAlternative.attach(att1)
#发送邮件
smtp = smtplib.SMTP()
smtp.connect(server)
smtp.login(user, passwd)
smtp.sendmail(strFrom,user_list,msgRoot.as_string())
smtp.quit()
def mail_HTML(data):
html = """
<table border="1">
<tr>
<th>表名称</th>
<th>数量</th>
<th>描述</th>
</tr>
"""
for k,v in data.items():
msg = """
<tr>
<td>{tablename}</td>
<td>{count}</td>
<td>{desc}</td>
</tr>
"""
c = ""
if k == "****.csv":
c = "****"
elif k == "****.csv":
c = "****"
elif k == "****.csv":
c = "****"
elif k == "****.csv":
c = "****"
msg = msg.format(
tablename = k,
count = v,
desc = c,
)
html += msg
html += "</table>"
return html
def es_sql(keys=None,time=None,wheres={}): # 查询语句 传入需要查询的key和毫秒级时间
if not keys:
return("Please input keys.")
elif not time:
return("Plesae input time")
else:
data = {
"version":True, # 显示版本
"from":0, # 分页
"size":500, # 分页
"sort":[ # 排序
{
"id":{ # 默认 desc 递减;asc 递增
"order":"asc"
}
}
],
"query":{ # 查询语句
"bool":{ # 布尔查询
"must":[ # 布尔查询下的文档匹配查询;返回的文档必须满足 must 子句的条件,并且参与计算分值
{
"match_all":{ # 匹配所有文档,给它们的_score都是1.0;_score可以通过boost参数改变:
}
},
wheres
,
{
"range":{ # 范围查询
"created_at":{ # 指定字段
"gte":time["start_time"], # gte 大于或等于;gt 大于
"lte":time["stop_time"], # lte 小于或等于;lt 小于
"format":"epoch_millis" # epoch_millis 毫秒级时间戳;epoch_second 秒级时间戳
}
}
}
],
"must_not":[ # 布尔查询下的文档匹配查询;不会计算分数,并且会使用缓存。
]
}
},
"_source":{ # 默认 includes/excludes 只显示/不显示;当你不需要看到整个文档时,单个字段可以从 _source 字段提取和通过 get 或者 search 请求返回
"includes":keys,
"excludes":[
]
},
# "aggs":{ # 聚合查询
# "uid":{ # 聚合名字 任意
# "terms":{ # 度量运算的方式(划分桶的方式)
# "field":"id", # 度量运算的字段(划分桶的字段)
# "min_doc_count":1, # 过滤文档数量小于1的
# },
# "aggs":{
# "last_opentime":{
# "max":{
# "field": "last_opentime"
# }
# }
# }
# }
# },
"stored_fields":[ # 用于存储没有经过任何分析(without any analysis)的字段值,以实现在查询时能得到这些原始值.
"*"
],
"script_fields":{ # 基于不同的字段定制一些属性
},
# "docvalue_fields":[ # 将文档字段以文档值机制保存的值返回;文档值机制是非text类型字段支持的一种在硬盘中保存字段原始值的机制
# "created_at",
# "last_opentime",
# "updated_at"
# ],
}
return(data)
elasticsearch 真好玩 我是真不会 太难了。
写了我一个礼拜,事已至此,实现了15天和30天查询数据并发送邮件到运营邮箱了,可以交工了。