添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

如何用Elasticsearch按@timestamp对分页的日志进行排序?

1 人关注

我的目标是按时间戳对我从Elasticsearch收到的数百万条日志进行排序。

例子日志。

{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:00:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:01:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:02:09.000Z"}
{"realIp": "192.168.0.2", "@timestamp": "2020-12-06T02:04:09.000Z"}

不幸的是,我无法将所有的日志从Elastic中整理出来。看来我必须自己来做。

我曾尝试过的办法是将数据整理出弹性。

es = Search(index="somelogs-*").using(client).params(preserve_order=True)
for hit in es.scan():
    print(hit['@timestamp'])

另一种方法。

notifications = (es
    .query("range", **{
        "@timestamp": {
            'gte': 'now-48h',
            'lt' : 'now'
    .sort("@timestamp")
    .scan()

所以我在寻找一种方法,可以自己对这些日志进行排序,或者直接通过Elasticsearch进行排序。目前,我把所有的数据都保存在本地的 "logs.json "中,在我看来,我必须自己对其进行分类。

python
python-3.x
elasticsearch
elasticsearch-dsl
elasticsearch-dsl-py
oki doki
oki doki
发布于 2020-12-23
2 个回答
Gino Mempin
Gino Mempin
发布于 2021-01-04
已采纳
0 人赞同

你肯定应该让Elasticsearch做排序,然后把已经排序的数据返回给你。

问题是,你在使用 .scan() .它使用Elasticsearch的扫描/滚动API,不幸的是,它只在每个页面/片断上应用排序参数,而不是整个搜索结果。这一点在 elasticsearch-dsl关于分页的文档 :

Pagination

如果你想访问你的查询所匹配的所有文件,你可以 使用扫描方法,该方法使用scan/scroll elasticsearch API。

for hit in s.scan():
    print(hit.title)

注意,在这种情况下,结果不会被排序。

(强调是我的)

使用分页法绝对是一个选择,特别是当你有一个"millions of logs"正如你所说。有一个search_after pagination API:

Search after

你可以使用search_after参数来检索下一页的 检索下一页,使用一组排序值从上一页开始。
要获得第一页的结果,提交一个带有sort的搜索请求。 搜索响应包括一个由sort组成的数组。 每一个命中的值。
要获得下一页的结果,请使用最后一次命中的sort值作为search_after参数,重新运行之前的搜索。...搜索的querysort参数必须保持不变。如果提供,from参数必须是0(默认)或-1
你可以重复这个过程以获得更多的结果页。

(省略了原始的JSON请求,因为我将在下面展示一个Python的样本)

这里有一个例子,如何用用于Python的elasticsearch-dsl.请注意,我限制了fields和结果的数量,以使其更容易测试。这里的重要部分是sortextra(search_after=)

search = Search(using=client, index='some-index')
# The main query
search = search.extra(size=100)
search = search.query('range', **{'@timestamp': {'gte': '2020-12-29T09:00', 'lt': '2020-12-29T09:59'}})
search = search.source(fields=('@timestamp', ))
search = search.sort({
    '@timestamp': {
        'order': 'desc'
# Store all the results (it would be better to be wrap all this in a generator to be performant)
hits = []
# Get the 1st page
results = search.execute()
hits.extend(results.hits)
total = results.hits.total
print(f'Expecting {total}')
# Get the next pages
# Real use-case condition should be "until total" or "until no more results.hits"
while len(hits) < 1000:  
    print(f'Now have {len(hits)}')
    last_hit_sort_id = hits[-1].meta.sort[0]
    search = search.extra(search_after=[last_hit_sort_id])
    results = search.execute()
    hits.extend(results.hits)
with open('results.txt', 'w') as out:
    for hit in hits:
        out.write(f'{hit["@timestamp"]}\n')

这将导致一个已经分类了日期。

# 1st 10 lines
2020-12-29T09:58:57.749Z
2020-12-29T09:58:55.736Z
2020-12-29T09:58:53.627Z
2020-12-29T09:58:52.738Z
2020-12-29T09:58:47.221Z
2020-12-29T09:58:45.676Z
2020-12-29T09:58:44.523Z
2020-12-29T09:58:43.541Z
2020-12-29T09:58:40.116Z
2020-12-29T09:58:38.206Z
# 250-260
2020-12-29T09:50:31.117Z
2020-12-29T09:50:27.754Z
2020-12-29T09:50:25.738Z
2020-12-29T09:50:23.601Z
2020-12-29T09:50:17.736Z
2020-12-29T09:50:15.753Z
2020-12-29T09:50:14.491Z
2020-12-29T09:50:13.555Z
2020-12-29T09:50:07.721Z
2020-12-29T09:50:05.744Z
2020-12-29T09:50:03.630Z 
# 675-685
2020-12-29T09:43:30.609Z
2020-12-29T09:43:30.608Z
2020-12-29T09:43:30.602Z
2020-12-29T09:43:30.570Z
2020-12-29T09:43:30.568Z
2020-12-29T09:43:30.529Z
2020-12-29T09:43:30.475Z
2020-12-29T09:43:30.474Z
2020-12-29T09:43:30.468Z
2020-12-29T09:43:30.418Z
2020-12-29T09:43:30.417Z
# 840-850
2020-12-29T09:43:27.953Z
2020-12-29T09:43:27.929Z
2020-12-29T09:43:27.927Z
2020-12-29T09:43:27.920Z
2020-12-29T09:43:27.897Z
2020-12-29T09:43:27.895Z
2020-12-29T09:43:27.886Z
2020-12-29T09:43:27.861Z
2020-12-29T09:43:27.860Z
2020-12-29T09:43:27.853Z
2020-12-29T09:43:27.828Z
# Last 3
2020-12-29T09:43:25.878Z
2020-12-29T09:43:25.876Z
2020-12-29T09:43:25.869Z 

在使用上有一些考虑search_after正如API文档中所讨论的那样。

  • Use a Point In Time or PIT parameter

    如果在这些请求之间发生刷新,你的结果的顺序可能会改变,导致不同页面的结果不一致。为了防止这种情况,你可以创建一个point in time (PIT)来保留你的搜索中的当前索引状态。

  • You need to first make a POST request to get a PIT ID
  • Then add an extra 'pit': {'id':xxxx, 'keep_alive':5m} parameter to every request
  • Make sure to use the PIT ID from the last response
  • Use a tiebreaker

    我们建议你在你的排序中包括一个分界线字段。这个分界线字段应该为每个文件包含一个唯一的值。如果你不包括一个分界线字段,你的分页结果可能会错过或重复点击。

  • This would depend on your Document schema
    # Add some ID as a tiebreaker to the `sort` call
    search = search.sort(
        {'@timestamp': {
            'order': 'desc'
        {'some.id': {
            'order': 'desc'
    # Include both the sort ID and the some.ID in `search_after`
    last_hit_sort_id, last_hit_route_id = hits[-1].meta.sort
    search = search.extra(search_after=[last_hit_sort_id, last_hit_route_id])
    

    谢谢你Gino Mempin。它的作用!

    但我也想明白了,一个简单的改变也能起到同样的作用。

    通过添加.params(preserve_order=True)elasticsearch将对所有数据进行排序。

    es = Search(index="somelog-*").using(client)
    notifications = (es
        .query("range", **{
            "@timestamp": {
                'gte': 'now-48h',
                'lt' : 'now'
    
  •