Elasticsearch跨集群备份

概述

目前7.0以上版本跨集群备份的方式有很多,例如elasticsearch-dump,reindex,snapshot,logstash。

方案对比

方案 elasticsearch-dump reindex snapshot logstash
基本原理 逻辑备份,类似mysqldump将数据一条一条导出后再执行导入 reindex 是 Elasticsearch 提供的一个 API 接口,可以把数据从一个集群迁移到另外一个集群 从源集群通过Snapshot API 创建数据快照,然后在目标集群中进行恢复 从一个集群中读取数据然后写入到另一个集群
网络要求 集群间互导需要网络互通,先导出文件再通过文件导入集群则不需要网络互通 网络需要互通 无网络互通要求 网络需要互通
迁移速度 一般
适合场景 适用于数据量小的场景 适用于数据量大,在线迁移数据的场景 适用于数据量大,接受离线数据迁移的场景 适用于数据量一般,近实时数据传输
配置复杂度 中等 简单 复杂 中等

reindex介绍

这里介绍其中一种比较简单的方式。
使用官方提供的reindex API就可以实现,但前提是两个ES集群是网络互通的。

_reindex不会尝试设置目标索引。它不会复制源索引的设置信息。您应该在运行reindex操作之前设置目标索引,包括设置映射,分片数,副本等。

源ES集群的版本是7.8.0,目标ES的版本是7.13.0

在目标ES的kibana上执行命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
POST _reindex?requests_per_second=500&wait_for_completion=false
{
"source": {
"remote": {
"host": "http://10.0.20.13:9200",
"username": "xiaoming",
"password": "eqIR1CGhM8lQ"
},
"index": "scene_model",
"_source": ["title","publish_time","total_pv","biz_type"],
"size":500,
"query": {
"bool": {
"must": [
{
"range": {
"publish_time": {
"gte": "2021-01-01"
}
}
},
{
"range": {
"total_pv": {
"gt": 20
}
}
},
{
"term": {
"biz_type": {
"value": "0"
}
}
}
]
}
}

},
"dest": {
"index": "scene_model_tmp"
}
}

就可以将源机器上的scene_model的部分数据备份到目标机器上。

部分代码解释

_reindex
表示此次使用Elasticsearch reindex API的方式进行索引数据的备份。

requests_per_second=500
表示每秒请求查询多少文档

wait_for_completion=false
这个设置可以让ES在后台执行此操作,而不是在页面上等待任务的执行完成。如果不设置这个参数,对于要执行很长时间的任务,一会儿就会返回错误信息,比如502或者其它的一些报错信息。

source 和 dest
分别指定源ES集群和目标ES的一些配置信息。

source 中的remote
指定源服务器的ip,用户名和密码(如果有的话)。

source中的index
指定从源ES集群的哪个索引备份数据。

source中的_source字段
备份的时候需要备份哪些字段。默认会备份所有的字段,但是有时候只需要关注几个字段,其它的字段不需要,就通过_source字段指定那些需要的字段就行了。

source中的 size 字段
默认情况下reindex一个批次会请求1000条数据,你可以使用size这个参数修改批次的大小。参考链接

source中的query 子句
可以设定查询条件,只有符合查询条件的数据才会备份。
dest中的index指定目标索引。

查询和终止任务

在请求elasticsearch api 发出后有时候运行请求后想停止,需要用到两个api

1
GET  _tasks?detailed=true&actions=*reindex //查询正在运行的任务

actions 参数就是你要查询的动作,*代表所有动作

1
POST _tasks/RnT2C85FQIqfi4zRGyfJMw:50571743/_cancel  //取消任务

RnT2C85FQIqfi4zRGyfJMw 代表:第一条api返回的node字段
50571743代表:第一条api返回的id字段

参考文档
1 https://blog.csdn.net/cr7258/article/details/114957725
2 https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs-reindex.html

hadoop集群RM和ZK异常的解决

修改了Flume的配置,然后需要将有过期配置的服务重启了一下,然后集群上有些服务就异常了。

Yarn服务异常: 2个ResourceManager同时处于standby状态

首先yarn服务异常,2个ResourceManager (分别为hadoop101和hadoop102) 同时处于standby状态。然后根据stackoverflow上的解释,使用如下命令

1
yarn resourcemanager -format-state-store

在hadoop101上以hdfs用户执行,过了1分钟,问题仍旧没有解决,2个ResourceManager仍然同时处于standby状态。

Read More

Parse_url函数 (Spark SQL)

from https://docs.microsoft.com/zh-cn/azure/databricks/sql/language-manual/functions/parse_url

从 url 中提取一部分。

语法

1
parse_url(url, partToExtract [, key] )

参数

  • url:一个 STRING 表达式。
  • partToExpract:一个 STRING 表达式。
  • key:一个 STRING 表达式。

    返回

    字符串。

partToExtract 必须是以下各项之一:

  • ‘HOST’
  • ‘PATH’
  • ‘QUERY’
  • ‘REF’
  • ‘PROTOCOL’
  • ‘FILE’
  • ‘AUTHORITY’
  • ‘USERINFO’
    key 区分大小写。

如果未找到请求的 partToExtract 或 key,则返回 NULL。

示例

SQL

1
2
3
4
5
6
SELECT parse_url('http://spark.apache.org/path?query=1', 'HOST');
spark.apache.org
SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY');
query=1
SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY', 'query');
1

Clickouse新特性调研 Release v21.11, 2021-11-09

向后兼容变化

  • 改变了SQL/JSON函数中json_path和json参数的位置。
  • 移除了MergeTree表的write_final_mark设置,这个值总是为true,新的版本会兼容所有的表,不需要其他的设置。
  • 移除了bayesAB函数。
  • 如果你已经开始使用clickhouse-keeper 特性,可以看一下这条内容。现在ClickHouse Keeper快照默认使用ZSTD编码的方式进行压缩,而不再是LZ4块压缩,可以通过compress_snapshots_with_zstd_format 设置来关闭此特性(在所有的集群副本上保持设置一致)。一般都是向后兼容的,但是也有可能出现不兼容的情况,当一个新节点向不能读取ZSTD格式快照的旧节点发送快照的时候(在恢复的时候可能会发生)。

Read More

Tornado 报错TypeError: Object of Type Datetime Is Not JSON Serializable

在使用Tornado从mysql查询出来数据,然后返回通过self.write(result)给前端,但是如果数据中包含日期时间类型的字段,比如timestamp类型,就会报TypeError: Object of Type Datetime Is Not JSON Serializable错误。

这个问题其实很好解决,看tornado的源码就可以知道,self.write()属于web.py中的方法,这个方法中有一段如下的代码

1
2
3
if isinstance(chunk, dict):
chunk = escape.json_encode(chunk)
self.set_header("Content-Type", "application/json; charset=UTF-8")

其中escape.json_encode(chunk)方法中调用了
json.dumps(value).replace("</", "<\\/")

这就知道如何做了,改变json.dumps对时间日期字段的解析方式就可以了,先定义一个解码类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import json
from decimal import Decimal
from datetime import datetime,date

class DateEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(o, date):
return o.strftime("%Y-%m-%d")
elif isinstance(o, Decimal):
return float(o)
else:
return json.JSONEncoder.default(self, o)

然后将 def json_encode(value: Any) -> str: 方法的实现改为

1
return json.dumps(value,cls=DateEncoder).replace("</", "<\\/")

就可以了,重启一下服务,就可以了。

这种方式看似可以,实则有个大坑,会导致很多接口调用参数传递失败,还是不建议这么做,如果真的要修改字符字段的类型,在程序中修改吧。

Axios CORS跨域问题解决

前端通过使用axios请求后台服务,默认是不带cookie上报的,要想带上,需要进行如下设置:

1
axios.defaults.withCredentials = true

然后后端服务也要做些调整,比如Tornado需要设置

1
2
self.set_header("Access-Control-Allow-Credentials", "true")
self.set_header("Access-Control-Allow-Origin",origin)

如果不想在程序中设置的话,可以调整一下nginx的配置,比如某个location下增加如下配置:

1
2
3
4
5
6
add_header 'Access-Control-Allow-Credentials' "true";
add_header 'Access-Control-Allow-Origin' "$http_origin";
add_header 'Access-Control-Allow-Headers' '*';
add_header 'Access-Control-Max-Age' 1000;
add_header 'Access-Control-Allow-Methods' 'POST, GET, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'authorization, Authorization, Content-Type, Access-Control-Allow-Origin, Access-Control-Allow-Headers, X-Requested-By, Access-Control-Allow-Methods';

Maven项目打包

Maven项目打包的方式有多种,我常用的就是使用assembly的方式进行打包。

1 首先需要在pom项目中加入如下依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
    ...
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>

</project>

2 使用如下命令进行编译打包

1
mvn assembly:assembly

Nginx日志输出cookie的值

一 输出全部cookie的信息

1
2
3
4
5
log_format data_log '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" $cookie_C_I $http_x_forwarded_for'
'"$http_cookie"'
'"$upstream_addr" "$upstream_status" "$upstream_response_time" "$request_time" ';

通过$http_cookie就可以将请求的全部cookie获得。

二 输出单个cookie

输出单个cookie也很简单,只需要为cookie key加上$cookie_前缀就可以了,例如有一个cookie的key为_tracker_user_id_,那么在nginx中可以通过$cookie__tracker_user_id_就可以获取到了。

1
2
3
4
5
log_format data_log '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" $cookie_C_I $http_x_forwarded_for'
'"$cookie__tracker_user_id_"'
'"$upstream_addr" "$upstream_status" "$upstream_response_time" "$request_time"';

Kafka Cmake(原kafka Manager)的使用

lastest_version

下载压缩包

wget https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip

解压cmak

unzip cmak-3.0.0.5.zip

修改conf/application.conf

cmak.zkhosts=”hadoop101.eqxiu.com:2181” 改为真实的zk地址

下载open jdk11

wget https://download.java.net/openjdk/jdk11/ri/openjdk-11+28_linux-x64_bin.tar.gz

解压 open jdk11

tar -zxvf openjdk-11+28_linux-x64_bin.tar.gz

修改bin/cmak启动脚本

在文件的最上边加上JAVA_HOME的路径,比如:

1
JAVA_HOME=/data/software/jdk-11

启动cmak

1
./bin/cmak -Dhttp.port=10010

在浏览器上访问

1
http://your-ip-address:10010

参考
https://github.com/yahoo/CMAK
https://cloud.tencent.com/developer/article/1651137