看啥推荐读物
专栏名称: zhangdianp
服务端开发
目录
相关文章推荐
今天看啥  ›  专栏  ›  zhangdianp

Elasticsearch 如何做更好的数据建模

zhangdianp  · 掘金  ·  · 2020-02-22 04:21
阅读 249

Elasticsearch 如何做更好的数据建模

相对关系型数据库,我们知道 Elasticsearch 有很多优点:高性能、可扩展、近实时搜索、支持大数据量的数据分析。然后它不是万能的,他并没有对处理索引实体之间的关系给出很好的解决方法,不像关系型数据库那样使用范式来规范你的数据。所以如何更好的在 Elasticsearch 中进行数据建模非常重要。

关系数据的管理

我们以「电影」和「演员」的的关系举例来说明 Elasticsearch 中如何实现关系管理,一部电影中会存在多个演员,我们可以通过以下几种方式实现这种关系数据的管理:

movie 有两个属性:title,actors

actor 有两个属性:first_name, last_name
复制代码
普通对象
PUT my_movies
{
  "mappings" : {
      "properties" : {
        "actors" : {
          "properties" : {
            "first_name" : {"type" : "keyword"},
            "last_name" : {"type" : "keyword"}
          }
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "keyword" : {"type" : "keyword","ignore_above" : 256}
          }
        }
      }
  }
}
复制代码
  • 更新 actor 信息时,需要同时更新 movie 的信息,对于更新频繁的需求场景,性能较差
  • 作为字符串存储无法实现真正的关联查询,比如我们要查询具有 「actor.first_name = a」 且 「actor.last_name = b」演员的电影,一个演员的 「actor.first_name = a」 而另外一个演员的 「actor.last_name = b」的电影也会被查询出来
  • 数据存在冗余,同一个 actor 在不同的 movie 里 会被存储多份数据
  • 读取性能最佳,不需要关联查询
Nested 对象
PUT my_movies
{
    "mappings" : {
      "properties" : {
        "actors" : {
          "type": "nested", //指定 actors 是一个 Nested 对象,默认情况下type="object"
          "properties" : {
            "first_name" : {"type" : "keyword"},
            "last_name" : {"type" : "keyword"}
          }},
        "title" : {
          "type" : "text",
          "fields" : {"keyword":{"type":"keyword","ignore_above":256}}
        }
      }
    }
}
复制代码
  • 只需要将对象的类型 type 设置为 "nested" 便可以定义成 Nested 对象
  • Nested 文档中每个 actor 会被保存在独立的 Lucene 文档中,在查询时和根文档作 join
  • 每个嵌套对象是独立索引的,这样查询中字段的相关性可以得以保证:
//对于查询「actor.first_name = a」 且 「actor.last_name = b」演员的电影就不会出现问题
POST my_movies/_search
{
  "query": {
      "nested": { //嵌套对象被隐藏在独立的文档中,查询时必须使用 nested 查询,否则无法查询到
        "path": "actors", //必须指定 path,因为一个索引中可能存在多个 nest 字段
        "query": {
          "bool": {
            "must": [
              {"match": {"actors.first_name": "a"}},
              {"match": {"actors.last_name": "b"}}
            ]
          }
        }
      }
   }
}

复制代码
  • Nest 对象还支持排序 nested sorting 和聚合 Nested Aggregation
  • 对于嵌套对象的增删改仍然需要重新索引整个文档,因此对于更新频繁的场景,性能较差
  • 嵌套文档查询返回的是整个文档,而不是匹配的嵌套文档
  • 相关参数:
index.mapping.nested_fields.limit:设置每个 nested 对象中可以拥有字段个数上限,默认是 50
index.mapping.nested_objects.limit:设置每个文档可以拥有的 nested 对象最大个数,默认是 10000
复制代码
Parent/Child 对象
PUT my_movies
{
  "mappings": {
    "properties": {
      "movie_comments_relation": { //属于 my_movies 的一个字段,该字段是 join 类型用于指定父子文档的关系
        "type": "join",     //指明 join 类型
        "relations": {      //声明 Parent/Child 的关系
          "movie": "actor" //movie 是 Parent 的名称, actor 是 Child 的名称
        }
      },
      "content": {"type": "text"},
      "title": {"type": "keyword"}
    }
  }
}
复制代码
  • 父文档和子文档是两个独立文档,但是存在同一个索引中,也就是一个索引中既有父文档,也有子文档
  • Parent/Child 对象通过 join 把两个文档关联起来,真正实现文档的一对多的关系
  • 父文档和子文档必须存储在同一个分片中,所以在对 Child 文档进行「增删改读」时必须提供 routing 参数,父子文档的映射关系维护在 Doc Values 中
  • 一个索引中只能存在一个 join 类型的字段,也就是只能建一种 Parent/Child 对象关系
  • Parent/Child 的主要优点:
- 更新父文档时,不需要重新索引对应的子文档
- 创建、修改、删除子文档,不会影响父文档和其它子文档,适用于对于子文档较多或者更新频率较频繁的场景
- 子文档可以作为搜索结果独立返回
复制代码
  • 索引父文档和子文档
#索引 ID=movie1 的父文档
PUT my_movies/_doc/movie1
{
  "title":"黑客帝国",
  "blog_comments_relation":{  //对于父文档的创建这里也可以直接缩略写成 "blog_comments_relation": "movie"
    "name":"movie"   //通过 blog_comments_relation.name = movie 来指定这是在创建一个父文档
  }
}

#索引子文档
PUT my_movies/_doc/actor?routing=movie1 //加上 routing 为了让父文档和子文档索引在同一个分片上,确保查询 join 的性能,routing 参数必须传
{
  "first_name":"Jack",
  "last_name":"Moble",
  "blog_comments_relation":{
    "name":"actor",     //指定当前索引是一个子文档
    "parent":"movie1"    //他的父文档的 ID 是 movie1
  }
}
复制代码
  • Parent/Child 的查询
# 直接根据子文档的 ID 是拿不到子文档的信息的,必须添加 routing 参数,指定他对应的父文档的 ID
# 根据 Parent Id 查询父文档对应的子文档
POST my_movies/_search
{
  "query": {
    "parent_id": {
      "type": "movie",
      "id": "movie1"
    }
  }
}

# Has Child 查询,根据子文档的一些信息,返回所属于的父文档
POST my_movies/_search
{
  "query": {
    "has_child": {
      "type": "actor",
      "query" : {//查询子文档的 first_name 等于 "Jack" 的所有父文档
        "match": {"first_name" : "Jack"}
       }
    }
  }
}

# Has Parent 查询,根据父文档的一些信息,返回相关的子文档信息
POST my_movies/_search
{
  "query": {
    "has_parent": {
      "parent_type": "movie",
      "query" : {
        "match": {"title" : "Learning Hadoop"}
       }
    }
  }
}
复制代码
应用端关联
  • 可以通过在业务逻辑中进行关联处理,模拟关系型数据库的关联关系
  • 将 movie 和 actor 分别存储在两个索引中,然后在 actor 中添加一个字段表示父文档的 ID 进行关联
  • 应用端关联需要在查询中,可能往往需要两次查询,消耗一定性能,但是查询处理简单,实现方便

索引的重建和更新

模型的扩展性和稳定性非常重要,如果没有定义好,后期随着需求的变动,可能会出现频繁的索引重建问题,那么什么情况下需要重建索引呢?

- Mapping 信息发生变更:字段类型、分词器、字典更新等
- Setting 信息发生变更:主分片数变更等
- 集群内、集群间的数据迁移
复制代码

Elasticsearch 提供了 Update By Query 和 Reindex 两种方式进行索引的更新和重建:

Update By Query
  • Update By Query 在现有的索引上进行重建,适用于新增加一个字段的场景
//将 Dynamic 属性设置为 false,表示 mapping 信息不会动态更改,即使新增加了一个字段,也不会被索引,仅仅存储在 _source 中
PUT test
{
  "mappings": {
    "dynamic": false, 
    "properties": {
      "text": {"type": "text"}
    }
  }
}
// 新增加的字段 flag 不会被索引查询
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
// 可以通过 _update_by_query 进行索引重建,使 flag 字段可以被索引到
POST test/_update_by_query?refresh&conflicts=proceed
复制代码
  • Update By Query 版本冲突问题
- 使用 Update By Query 进行文档更新时会首先做一个快照并记录下版本号,如果在更新过程中有新的数据插入就会引起版本冲突
- 默认情况下,如果有一个文档在更新时有版本冲突,那么整个更新就会失败,但是已经更新的文档无法回退
- 可以将参数 conficts 设置为 proceed,在更新文档时遇到版本冲突不会中止更新
复制代码
  • 可以同时对多个索引进行 Update By Query
POST twitter,blog/_update_by_query
复制代码
  • 通过 routing 参数更新指定分片(shard)的索引
POST twitter/_update_by_query?routing=1
复制代码
  • Update By Query 采用滚动更新逻辑,默认每次 1000 个文档,可以通过 scroll_size 修改
POST twitter/_update_by_query?scroll_size=100
复制代码
  • Update By Query 可以使用 pipeline 对文档进行预处理
PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST twitter/_update_by_query?pipeline=set-foo
复制代码
  • Task API:由于索引更新可能比较耗时,ES 通过 Task API 提供了异步的方式来获取更新进度
# 通过 wait_for_completion = false 设置异步更新,此时会返回一个 taskId
POST twitter/_update_by_query?wait_for_completion=false

# 通过 taskId 可以直接获取到更新进度
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
复制代码
ReIndex API

ES 不允许在原有 Mapping 上对已有数据的字段类型进行修改,只能重新创建的新索引,然后再设置正确的字段类型,再重新导入数据,这个时候就需要用到 ReIndex API。

# 和 Update By Query 一样,可以通过参数 wait_for_completion=false 异步的方式获取进度
# 和 Update By Query 一样,可以通过参数 conflicts=proceed 来控制遇到版本冲突继续执行
POST  _reindex?wait_for_completion=false&conflicts=proceed 
{
  "source": {
    "index": "blogs"
  },
  "dest": {
    "index": "blogs_fix""op_type": "create" //如果 dest 中文档存在可能会导致版本冲突,这时可以加 op_type = create,表示只有文档不存在的时候才会写入
  }
}
复制代码
  • 什么情况下会用到 ReIndex API
- 修改索引的主分片数
- 改变字段中的 Mapping 字段类型
- 集群内数据迁移,跨集群数据迁移
复制代码
  • 使用 ReIndex API 必须将 _source 字段设置为 true
  • ReIndex API 还支持跨集群重建索引,可以实现数据的迁移
# 目标源需要添加白名单,表示允许访问的地址:reindex.remote.whitelist: "otherhost:9200"
POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200", //集群的地址
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {  // test 字段 是 data 的文档都重建索引
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}
复制代码
  • max_docs:可以通过 max_docs 参数来限制每次重建索引的文档数
POST _reindex
{
  "max_docs": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}
复制代码
  • 可以将多个索引文件重建到一个目标索引
POST _reindex
{
  "source": {
    "index": ["twitter", "blog"]
  },
  "dest": {
    "index": "all_together"
  }
}
复制代码
  • 可以只选择部分字段进行索引重建
POST _reindex
{
  "source": {
    "index": "twitter",
    "_source": ["user", "_doc"]  # 只重建每个文档的 user 和 _doc 字段
  },
  "dest": {
    "index": "new_twitter"
  }
}
复制代码
  • 可以通过脚本来修改文档的元信息来进行 reindex
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}
复制代码

Ingest Pipeline

在前面的文章 Elasticsearch 分布式原理以及相关读写逻辑 中,我们了解了 Elasticsearch 中节点的分类,其实还有一类节点叫 Ingest Pipeline Node:

  • Ingest Pipeline Node 具有预处理数据的能力,可以拦截 Index 或 Bulk API 的请求,对数据进行转换,并重新返回给 Index 或者 Bulk API
  • 默认情况下,每个节点都是一个 Ingest Node,可以通过参数 node.injest=false 禁止
  • Ingest Pipeline 在某些情况下可以使我们无需要 Logstash,就可以对数据进行预处理
  • Ingest Pipeline 使用 Pipeline & Processor 的方式对通过的数据按照管道数据进行加工
  • 每个 Processor 是对加工行为的抽象封装,ES 提供了很多内置的 Processor,也可以支持插件定义自己的 Processor
  • 内置的 Processor主要有下面几种:
- Split Processor:将字符串值分成一个数组
- Remove/Rename Processor: 移除一个重命名字段
- Append:新增加一个字段
- Convert:数据类型转换
- Date/JSON: 日期格式转换
- Date Index Name Processor: 将通过该处理器的文档,分配到指定时间格式的索引中
- Fail Processor: 异常处理
- Foreach Proccesor: 对每个数组里面的字段进行处理
- Grok Processor:日志的日期格式切割
- Gsub / Join / Split: 字符串替换、数组转字符串,字符串转数组
- Lowercase / Upcase: 大小写转换
复制代码
  • 如何使用 Ingest Pipeline:
# 通过 _ingest/pipeline/_simulate 接口模拟检查 Processor 工作是否正常
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [ #可以定义多个 Processor
      {
        "split": {         # 使用 split Processor
          "field": "tags", # 对字段 field 字段进行预处理
          "separator": "," # 按照逗号进行切分
        }
      }
    ]
  },
  "docs": [  # 要处理的文档
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}


# 新生成一个名字叫 blog_pipeline 的 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
  "description": "a blog pipeline",
  "processors": [  # 一个 Pipeline 可以有多个 processor, 管道处理
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },

      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
}

# 测试该 pipeline 是否可以正常使用
POST _ingest/pipeline/blog_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}

# 使用 pipeline 更新数据
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
  "title": "Introducing cloud computering",
  "tags": "openstack,k8s",
  "content": "You konw, for cloud"
}

# 使用 blog_pipeline 在 update_by_query 时对数据进行更新操作,只更新哪些没有 field = views 文档
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "views"
                }
            }
        }
    }
}
复制代码
  • Ingest Node VS Logstash
Logstash Ingest Node
数据输入和输出 支持从不同数据源读取写入不同数据源 只能从 ES REST API 获取数据并写入 ES
数据缓存 实现了简单的数据队列,支持重写 不支持缓存
数据处理 支持大量的插件,也支持定制化开发 支持内置插件和定制化开发
配置和使用 需要独立部署,增加了一定的架构复杂度 无需额外部署

Painless Script

  • Painless 是专门为 Elasticsearch 设计的脚本语言,是 Elasticsearch 默认的脚本语言
  • Painless 可以在 Elasticsearch 中作为 inline 脚本直接使用,也可以存储起来后续被多次查询使用
  • Painless Script 在性能方面比其他脚本要快好几倍
  • 扩展了 Java 的语法,支持所有 Java 的数据类型及 Java API 子集
  • Painless Script 具有安全、支持显示类型和动态定义类型等特性
  • Painless 主要有以下用途:
- 更新、删除、数据聚合等操作
- 对返回的字段进行计算
- 对文档的算分进行处理
- 在 Ingest Pipeline 中执行脚本
- 在 Reindex API, Update By Query 中对数据进行处理
复制代码
  • stored script:
# 保存脚本在 Cluster State
POST _scripts/update_views
{
  "script":{
    "lang": "painless",
    "source": "ctx._source.views += params.new_views"
  }
}
复制代码
  • 脚本缓存:脚本的编译开销非常大,因此 ES 会将脚本编译后缓存在 Cache 中
- Inline Scripts 和 Store Scripts 都会被缓存
- 默认缓存 100 个脚本
- script.cache.max_size 设置最大缓存数
- script.cache.expire 设置缓存超时时间
- script.max_compilations_rate: 默认 5 分钟最多执行 75 次编译
复制代码

另外如果进一步了解 Painness Script 请移步 官方文档

store field VS _source

将数据存储在 ES 中主要有「search」 和 「retrieve」两个用途:

  • search:文本的搜索,我们不知道具体有哪些信息,不知道具体的文档 ID,只是根据关键字去倒排索引中查询
  • retrieve:根据 ID 来获取存储的原始数据

其中 「search」 可以通过倒排索引实现全文的检索功能,而 「retrieve」则需要通过 store field 或者 _source 来实现。

什么是 _source
  • 我们在索引文档时,ES 会同时将文档的原始 JSON 数据存储在 _source 字段中
  • _source 字段本身不会被索引,因此不能搜索,主要为了在搜索其它字段时返回原始的 JSON 数据
  • 如果你不想存储 _source 字段,那么就可以设置 _source = false,同时下面的功能将无法进行支持:
- update,update_by_query, reindex 相关的 API
- 高亮功能
- 搜索的时候获取不到原始的 JSON 数据
复制代码
  • 如果你只是想存储原始 JSON 里的部分字段,那么可以进行 include 或者 exclude
PUT logs
{
  "mappings": {
    "_source": {
      "includes": [
        "*.count",
        "meta.*"
      ],
      "excludes": [
        "meta.description",
        "meta.other.*"
      ]
    }
  }
}
复制代码
  • 在搜索和查询时,如果只是想获取部分原始字段,可以通过 _source 字段获取
# 首先会解析整个 _source,然后抽取出部分字段返回
GET /_search
{
    "_source": [ "obj1.*", "obj2.*" ],
    "query" : {
        "term" : { "user" : "kimchy" }
    }
}
复制代码
什么是 field store
  • 通过给某个字段设置 store 属性,可以对该字段进行原始数据的单独存储
PUT my_index
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text",
        "store": true 
      },
      "date": {
        "type": "date",
        "store": true 
      },
      "content": {
        "type": "text"
      }
    }
  }
}
复制代码
  • store 属性默认没有开启
  • 查询时通过 stored_fields 获取需要的原始数据
GET my_index/_search
{
  "stored_fields": [ "title", "date" ] 
}
复制代码
如何正确的存储原始数据
  • 如果有些特别大的字段,只是为了检索使用,那么可以选择不存储在 _source 字段里,减少磁盘占用以及 retrieve 时 JSON 的解析和抽取字段的消耗
  • 如果存在特别大的字段且 retrieve 频率较高,可以将其 store 属性设置为 true,这样可以单独解析,不影响其它字段
  • 同一个索引中,不建议多个字段都设置 store 属性,因为每个字段的获取都需要消耗一次 IO,而 _source 只需要消耗一次 IO
  • 大部分情况不建议设置 store 属性,因为 _source 已经可以满足大部分需求且性能也较快
  • 关闭 _source 属性将丢失很多功能,需要慎重选择

如何更好的建模

建模是对真实世界抽取描述的一种工具和方法,上面我们介绍了在 Elasticsearch 中建模用到的一些概念和工具,这里我们总结下在建模过程中需要考虑和注意哪些细节。

字段类型的选择
  • text: 用于全文本字段,文本会被分词索引,用于需要分词搜索的场景,一般不建议进行聚合分析和排序,如果需要聚合和排序则要开启:fielddata = true
  • keyword: 用于 id、枚举等不需要分词的场景,适用于精确匹配的场景,并且默认支持排序和聚合
  • 多字段类型;如果我们对某个文档既有分词搜索的场景也有精确匹配搜索的场景,那么可以给 text 类型加一个子字段
PUT /employees/
{
  "mappings" : {
      "properties" : {
        "age" : {
          "type" : "integer"
        },
        "job" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword", # text 类型加一个子类型 keyword
              "ignore_above" : 50
            }
          }
        },
      }
   }
}

# 通过子字段 keword 可以按照整个文本进行聚合
POST employees/_search
{
  "size": 0,
  "aggs": {
    "jobs": {
      "terms": {
        "field":"job.keyword"
      }
    }
  }
}

# 通过子字段 keyword 可以对文本进行精确匹配搜索
POST /employees/_search
{
  "query": {
    "term": {
      "job.keyword": {
        "value": "XHDK-A-1293-#fJ3"
      }
    }
  }
}

复制代码
  • 数值类型:尽量选择贴近需求的类型,太大可能会导致性能问题,太小可能会随着业务量的增加后期不够使用
  • 日期/布尔:日期和布尔一般选择时不需要太多考虑,我们很容易进行选择
字段属性设置
  • 是否需要索引、排序和聚合分析:如果只是为了存储数据,可以设置 enabled=false
  • 是否需要排序和聚合:根据是否需要排序和聚合的场景选择性设置 「doc_values 和 fielddata」 两个属性
  • 是否需要索引:通过设置字段 index 属性选择是否开启索引功能,关闭时无法被搜索,但是还是可以支持聚合排序,数据保存在 _source 中
  • eager_global_ordinals:对于更新和聚合查询教频繁 keyword 类型字段,可以设置 eager_global_ordinals = true,全局设置 ordinals 映射提高查询性能
  • 原始数据如何存储:请看上文的 「store field vs _source」讲解
  • 倒排索引要存储哪些数据:合理设置 index_option 的值,可以有效提高倒排索引的性能
  • 是否需要相关性算分:可以设置参数 norms 进行关闭,norms 开启后会存储很多算分因子用来计算相关性算分,浪费大量的存储空间
其它建模优化建议
  • Index Alias: 可以通过 Index Alias 将应用和索引名字解耦,无需修改名称,无需停机,实现无缝 Reindex
  • Index Template:通过设置索引模板,规范索引创建的流程
  • 避免使用过多的字段,可以通过 index.mapping.total_fields.limit 限制最大字段数
1)业务不容易维护 
2)Mapping 信息保存在 Cluster State 中,会对集群性能有影响
3)删除和修改数据需要 Reindex
复制代码
  • 生产环境尽量不要打开 Dynamic 属性,事先定义好字段属性,字段动态变动造成难以维护
  • 尽量避免使用正则和模糊匹配查询,查询性能很差
  • 尽量避免空值引起的聚合分析不准确,设置字段的 null_value 属性,或者可以通过聚合查询时 missing 属性修改这种情况
  • 为索引的 Mapping 文件加入 Meta 信息,可以更好的进行版本管理,将 Mapping 文件上传到 git 进行管理
PUT softwares/
{
  "mappings": {
    "_meta": {
      "software_version_mapping": "1.0"
    }
  }
}
复制代码
  • Kibana 目前暂不支持 nested 类型 和 parent /child 类型,在关联对象建模时要做一定的取舍

参考文献




原文地址:访问原文地址
快照地址: 访问文章快照