Elasticsearch簡介 什麼是 Elasticsearch? Elasticsearch 是一個開源的分散式 RESTful搜索和分析引擎,能夠解決越來越多不同的應用場景。 本文內容 本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本為7.3.1 數據準備 Qgis使用漁網工具,對範圍 ...
Elasticsearch簡介
什麼是 Elasticsearch?
Elasticsearch 是一個開源的分散式 RESTful搜索和分析引擎,能夠解決越來越多不同的應用場景。
本文內容
本文主要是介紹了ES GEO數據寫入和空間檢索,ES版本為7.3.1
數據準備
Qgis使用漁網工具,對範圍進行切割,得到網格的Geojson
新建索引設置映射
def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}):
# ignore 404 and 400
es.indices.delete(index=index_name, ignore=[400, 404])
print("delete_index")
# ignore 400 cause by IndexAlreadyExistsException when creating an index
my_mapping = {
"properties": {
"location": {"type": "geo_shape"},
"id": {"type": "long"}
}
}
create_index = es.indices.create(index=index_name)
mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping, include_type_name=True)
print("create_index")
if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:
print("Index creation failed...")
數據插入
使用multiprocessing和elasticsearch.helpers.bulk進行數據寫入,每一萬條為一組寫入,剩下的為一組,然後多線程寫入。分別寫入4731254條點和麵數據。寫入時候使用多核,ssd,合適的批量數據可以有效加快寫入速度,通過這些手段可以在三分鐘左右寫入四百多萬的點或者面數據。
def mp_worker(features):
count = 0
es = Elasticsearch(hosts=[ip], timeout=5000)
success, _ = bulk(es,features, index=index_name, raise_on_error=True)
count += success
return count
def mp_handler(input_file, index_name, doc_type_name="en"):
with open(input_file, 'rb') as f:
data = json.load(f)
features = data["features"]
del data
act=[]
i=0
count=0
actions = []
for feature in features:
action = {
"_index": index_name,
"_type": doc_type_name,
"_source": {
"id": feature["properties"]["id"],
"location": {
"type": "polygon",
"coordinates": feature["geometry"]["coordinates"]
}
}
}
i=i+1
actions.append(action)
if (i == 9500):
act.append(actions)
count=count+i
i = 0
actions = []
if i!=0:
act.append(actions)
count = count + i
del features
print('read all %s data ' % count)
p = multiprocessing.Pool(4)
i=0
for result in p.imap(mp_worker, act):
i=i+result
print('write all %s data ' % i)
GEO(point)查詢距離nkm附近的點和範圍選擇
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
starttime = time.time()
_index = "gis_point"
_doc_type = "20190824"
ip = "127.0.0.1:9200"
# 附近nkm 選擇
_body = {
"query": {
"bool": {
"must": {
"match_all": {}
},
"filter": {
"geo_distance": {
"distance": "9km",
"location": {
"lat": 18.1098857850465471,
"lon": 109.1271036098896730
}
}
}
}
}
}
# 範圍選擇
# _body={
# "query": {
# "geo_bounding_box": {
# "location": {
# "top_left": {
# "lat": 18.4748659238899933,
# "lon": 109.0007435371629470
# },
# "bottom_right": {
# "lat": 18.1098857850465471,
# "lon": 105.1271036098896730
# }
# }
# }
# }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m")
for resp in scanResp:
print(resp)
endtime = time.time()
print(endtime - starttime)
GEO(shape)範圍選擇
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
starttime = time.time()
_index = "gis"
_doc_type = "20190823"
ip = "127.0.0.1:9200"
# envelope format, [[minlon,maxlat],[maxlon,minlat]]
_body = {
"query": {
"bool": {
"must": {
"match_all": {}
},
"filter": {
"geo_shape": {
"location": {
"shape": {
"type": "envelope",
"coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]]
},
"relation": "within"
}
}
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")
for resp in scanResp:
print(resp)
endtime = time.time()
print(endtime - starttime)
GEO(point)距離聚合
from elasticsearch import Elasticsearch
import time
starttime = time.time()
_index = "gis_point"
_doc_type = "20190824"
ip = "127.0.0.1:9200"
# 距離聚合
_body = {
"aggs" : {
"rings_around_amsterdam" : {
"geo_distance" : {
"field" : "location",
"origin" : "18.1098857850465471,109.1271036098896730",
"ranges" : [
{ "to" : 100000 },
{ "from" : 100000, "to" : 300000 },
{ "from" : 300000 }
]
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search( body=_body, index=_index)
for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']:
print(i)
endtime = time.time()
print(endtime - starttime)
中心點聚合
_body ={
"aggs" : {
"centroid" : {
"geo_centroid" : {
"field" : "location"
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search( body=_body, index=_index)
print(scanResp['aggregations'])
範圍聚合
_body = {
"aggs": {
"viewport": {
"geo_bounds": {
"field": "location"
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
print(scanResp['aggregations']['viewport'])
geohash聚合
##低精度聚合,precision代表geohash長度
_body = {
"aggregations": {
"large-grid": {
"geohash_grid": {
"field": "location",
"precision": 3
}
}
}
}
# 高精度聚合,範圍聚合以及geohash聚合
# _body = {
# "aggregations": {
# "zoomed-in": {
# "filter": {
# "geo_bounding_box": {
# "location": {
# "top_left": "18.4748659238899933,109.0007435371629470",
# "bottom_right": "18.4698857850465471,108.9971036098896730"
# }
# }
# },
# "aggregations": {
# "zoom1": {
# "geohash_grid": {
# "field": "location",
# "precision": 7
# }
# }
# }
# }
# }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
for i in scanResp['aggregations']['large-grid']['buckets']:
print(i)
#for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
# print(i)
切片聚合
# 低精度切片聚合,precision代表級別
_body = {
"aggregations": {
"large-grid": {
"geotile_grid": {
"field": "location",
"precision": 8
}
}
}
}
# 高精度切片聚合,範圍聚合以切片聚合
# _body={
# "aggregations" : {
# "zoomed-in" : {
# "filter" : {
# "geo_bounding_box" : {
# "location" : {
# "top_left": "18.4748659238899933,109.0007435371629470",
# "bottom_right": "18.4698857850465471,108.9991036098896730"
# }
# }
# },
# "aggregations":{
# "zoom1":{
# "geotile_grid" : {
# "field": "location",
# "precision": 18
# }
# }
# }
# }
# }
# }
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
for i in scanResp['aggregations']['large-grid']['buckets']:
print(i)
# for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:
# print(i)
Elasticsearch和PostGIS相同功能對比
PostGIS最近點查詢
SELECT id,geom, ST_DistanceSphere(geom,'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry)
FROM h5
ORDER BY geom <->
'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry
LIMIT 1
Elasticsearch最近點查詢
from elasticsearch import Elasticsearch
import time
starttime = time.time()
_index = "gis_point"
_doc_type = "20190824"
ip = "127.0.0.1:9200"
_body={
"sort": [
{
"_geo_distance": {
"unit": "m",
"order": "asc",
"location": [
109.1681036098896730,
18.1299957850465471
],
"distance_type": "arc",
"mode": "min",
"ignore_unmapped": True
}
}
],
"from": 0,
"size": 1,
"query": {
"bool": {
"must": {
"match_all": {}
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = es.search(body=_body, index=_index)
endtime = time.time()
print(endtime - starttime)
PostGIS範圍查詢
select id,geom,fid FROM public."California"
where
ST_Intersects(geom,ST_MakeEnvelope(-117.987103609889,33.40988578504,-117.003537162947,33.494865923889993, 4326))=true
[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]
Elasticsearch範圍查詢
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
import time
starttime = time.time()
_index = "gis_california"
ip = "127.0.0.1:9200"
# envelope format, [[minlon,maxlat],[maxlon,minlat]]
_body = {
"query": {
"bool": {
"must": {
"match_all": {}
},
"filter": {
"geo_shape": {
"geom": {
"shape": {
"type": "envelope",
"coordinates": [[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]]
},
"relation": "INTERSECTS"
}
}
}
}
}
}
es = Elasticsearch(hosts=[ip], timeout=5000)
scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")
i=0
for resp in scanResp:
i=i+1
a=resp
print(i)
endtime = time.time()
print(endtime - starttime)
兩種場景中PostGIS的性能更好
參考資料:
1.Elasticsearch(GEO)空間檢索查詢
2.Elasticsearch官網
3.PostGIS拆分LineString為segment,point
4.億級“附近的人”,打通“特殊服務”通道
5.PostGIS教程二十二:最近鄰域搜索