mirror of
https://gitee.com/orangeform/orange-admin.git
synced 2026-01-18 02:56:30 +08:00
commit
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
package com.orange.demo.common.sequence.config;
|
||||
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
|
||||
/**
|
||||
* common-sequence模块的自动配置引导类。
|
||||
*
|
||||
* @author Orange Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
@EnableConfigurationProperties({IdGeneratorProperties.class})
|
||||
public class IdGeneratorAutoConfigure {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.orange.demo.common.sequence.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* common-sequence模块的配置类。
|
||||
*
|
||||
* @author Orange Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
@Data
|
||||
@ConfigurationProperties(prefix = "sequence")
|
||||
public class IdGeneratorProperties {
|
||||
|
||||
/**
|
||||
* 是否使用基于美团Leaf的分布式Id生成器。
|
||||
*/
|
||||
private Boolean advanceIdGenerator = false;
|
||||
/**
|
||||
* 基础版生成器所需的WorkNode参数值。仅当advanceIdGenerator为false时生效。
|
||||
*/
|
||||
private Integer snowflakeWorkNode = 1;
|
||||
/**
|
||||
* zk的地址。多个ip和端口之间逗号分隔。仅当advanceIdGenerator为true时生效。
|
||||
* 如:10.1.1.2:2181;10.1.1.3:2181。
|
||||
*/
|
||||
private String zkAddress;
|
||||
/**
|
||||
* 用于识别同一主机(ip相同)不同服务的端口号。与本机的ip一起构成zk中标识不同服务实例的key值。
|
||||
* 仅当advanceIdGenerator为true时生效。
|
||||
*/
|
||||
private Integer idPort;
|
||||
/**
|
||||
* zk中生成WorkNode的路径。不同的业务可以使用不同的路径,以免冲突。
|
||||
* 仅当advanceIdGenerator为true时生效。
|
||||
*/
|
||||
private String zkPath;
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.orange.demo.common.sequence.generator;
|
||||
|
||||
import cn.hutool.core.lang.Snowflake;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
|
||||
/**
|
||||
* 基础版snowflake计算工具类。
|
||||
* 和SnowflakeIdGenerator相比,相同点是均为基于Snowflake算法的生成器。不同点在于当前类的
|
||||
* WorkNodeId是通过配置文件静态指定的。而SnowflakeIdGenerator的WorkNodeId是由zk生成的。
|
||||
*
|
||||
* @author Orange Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
public class BasicIdGenerator implements MyIdGenerator {
|
||||
|
||||
private Snowflake snowflake;
|
||||
|
||||
/**
|
||||
* 构造函数。
|
||||
*
|
||||
* @param workNode 工作节点。
|
||||
*/
|
||||
public BasicIdGenerator(Integer workNode) {
|
||||
snowflake = IdUtil.createSnowflake(workNode, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取基于Snowflake算法的数值型Id。
|
||||
* 由于底层实现为synchronized方法,因此计算过程串行化,且线程安全。
|
||||
*
|
||||
* @return 计算后的全局唯一Id。
|
||||
*/
|
||||
@Override
|
||||
public long nextLongId() {
|
||||
return this.snowflake.nextId();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取基于Snowflake算法的字符串Id。
|
||||
* 由于底层实现为synchronized方法,因此计算过程串行化,且线程安全。
|
||||
*
|
||||
* @return 计算后的全局唯一Id。
|
||||
*/
|
||||
@Override
|
||||
public String nextStringId() {
|
||||
return this.snowflake.nextIdStr();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.orange.demo.common.sequence.generator;
|
||||
|
||||
/**
|
||||
* 分布式Id生成器的统一接口。
|
||||
*
|
||||
* @author Orange Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
public interface MyIdGenerator {
|
||||
|
||||
/**
|
||||
* 获取数值型分布式Id。
|
||||
*
|
||||
* @return 生成后的Id。
|
||||
*/
|
||||
long nextLongId();
|
||||
|
||||
/**
|
||||
* 获取字符型分布式Id。
|
||||
*
|
||||
* @return 生成后的Id。
|
||||
*/
|
||||
String nextStringId();
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
package com.orange.demo.common.sequence.generator;
|
||||
|
||||
import cn.hutool.core.thread.ThreadUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import com.orange.demo.common.core.util.IpUtil;
|
||||
import com.google.common.base.Preconditions;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* Snowflake Id生成器。该实现完全copy美团的leaf。
|
||||
*
|
||||
* @author MeiTuan.Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
@Slf4j
|
||||
public class SnowflakeIdGenerator implements MyIdGenerator {
|
||||
private static final long TWEPOCH = 1288834974657L;
|
||||
private static final long WORKER_ID_BITS = 10L;
|
||||
/**
|
||||
* 最大能够分配的workerid =1023
|
||||
*/
|
||||
private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
|
||||
private static final long SEQUENCE_BITS = 12L;
|
||||
private static final long TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
|
||||
private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);
|
||||
private long workerId;
|
||||
private long sequence = 0L;
|
||||
private long lastTimestamp = -1L;
|
||||
private static final Random RANDOM = RandomUtil.getRandom();
|
||||
|
||||
/**
|
||||
* @param zkAddress zk地址
|
||||
* @param idPort 用于识别相同ip内不同服务的端口号。仅作为标识用,不会对该端口进行监听。
|
||||
*/
|
||||
public SnowflakeIdGenerator(String zkAddress, int idPort, String zkPath) {
|
||||
Preconditions.checkArgument(
|
||||
timeGen() > TWEPOCH, "Snowflake not support twepoch greater than currentTime");
|
||||
final String ip = IpUtil.getFirstLocalIpAddress();
|
||||
SnowflakeZookeeperHolder holder =
|
||||
new SnowflakeZookeeperHolder(ip, String.valueOf(idPort), zkAddress, zkPath);
|
||||
log.info("twepoch:{} ,ip:{} ,zkAddress:{} port:{}", TWEPOCH, ip, zkAddress, idPort);
|
||||
boolean initFlag = holder.init();
|
||||
Preconditions.checkArgument(initFlag, "Snowflake Id Gen is not init ok");
|
||||
workerId = holder.getWorkerId();
|
||||
log.info("START SUCCESS USE ZK WORKERID-{}", workerId);
|
||||
Preconditions.checkArgument(
|
||||
workerId >= 0 && workerId <= MAX_WORKER_ID, "WorkerId must (>= 0 and <=> 1023");
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取字符型分布式Id。
|
||||
*
|
||||
* @return 生成后的Id。
|
||||
*/
|
||||
@Override
|
||||
public synchronized String nextStringId() {
|
||||
return String.valueOf(this.nextLongId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数值型分布式Id。
|
||||
*
|
||||
* @return 生成后的Id。
|
||||
*/
|
||||
@Override
|
||||
public synchronized long nextLongId() {
|
||||
long timestamp = timeGen();
|
||||
int maxGap = 10;
|
||||
if (timestamp < lastTimestamp) {
|
||||
long offset = lastTimestamp - timestamp;
|
||||
if (offset <= maxGap) {
|
||||
if (!ThreadUtil.sleep(offset << 1)) {
|
||||
log.error("Thread is interrupted while synchronizing to LastTimeStamp.");
|
||||
throw new SnowflakeGenerateException(
|
||||
"Thread is interrupted while synchronizing to LastTimeStamp..");
|
||||
}
|
||||
timestamp = timeGen();
|
||||
if (timestamp < lastTimestamp) {
|
||||
log.error("CurrentTime is less than LastTimeStamp too much (> 10ms) after synchronized.");
|
||||
throw new SnowflakeGenerateException(
|
||||
"CurrentTime is less than LastTimeStamp too much (> 10ms) after synchronized.");
|
||||
}
|
||||
} else {
|
||||
log.error("CurrentTime is less than LastTimeStamp too much (> 10ms).");
|
||||
throw new SnowflakeGenerateException(
|
||||
"CurrentTime is less than LastTimeStamp too much (> 10ms).");
|
||||
}
|
||||
}
|
||||
if (lastTimestamp == timestamp) {
|
||||
sequence = (sequence + 1) & SEQUENCE_MASK;
|
||||
if (sequence == 0) {
|
||||
//seq 为0的时候表示是下一毫秒时间开始对seq做随机
|
||||
sequence = RANDOM.nextInt(100);
|
||||
timestamp = tilNextMillis(lastTimestamp);
|
||||
}
|
||||
} else {
|
||||
//如果是新的ms开始
|
||||
sequence = RANDOM.nextInt(100);
|
||||
}
|
||||
lastTimestamp = timestamp;
|
||||
return ((timestamp - TWEPOCH) << TIMESTAMP_LEFT_SHIFT) | (workerId << SEQUENCE_BITS) | sequence;
|
||||
}
|
||||
|
||||
protected long tilNextMillis(long lastTimestamp) {
|
||||
long timestamp = timeGen();
|
||||
while (timestamp <= lastTimestamp) {
|
||||
timestamp = timeGen();
|
||||
}
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
protected long timeGen() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public static class SnowflakeGenerateException extends RuntimeException {
|
||||
public SnowflakeGenerateException(String msg, Throwable e) {
|
||||
super(msg, e);
|
||||
}
|
||||
public SnowflakeGenerateException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,231 @@
|
||||
package com.orange.demo.common.sequence.generator;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.curator.RetryPolicy;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryUntilElapsed;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Snowflake Id生成器所依赖的zk工具类。该实现完全copy美团的leaf。
|
||||
*
|
||||
* @author MeiTuan.Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
@Slf4j
|
||||
@Data
|
||||
public class SnowflakeZookeeperHolder {
|
||||
private static final int CONNECTION_TIMEOUT_MS = 10000;
|
||||
private static final int SESSION_TIMEOUT_MS = 6000;
|
||||
private String zkAddressNode = null;
|
||||
private String listenAddress;
|
||||
private int workerId;
|
||||
private String ip;
|
||||
private String port;
|
||||
private String connectionString;
|
||||
private String pathForever;
|
||||
private final String cachePath;
|
||||
private long lastUpdateTime;
|
||||
|
||||
public SnowflakeZookeeperHolder(String ip, String port, String connectionString, String zkPath) {
|
||||
this.ip = ip;
|
||||
this.port = port;
|
||||
this.listenAddress = ip + ":" + port;
|
||||
this.connectionString = connectionString;
|
||||
this.pathForever = "/snowflake/" + zkPath + "/forever";
|
||||
this.cachePath = System.getProperty("java.io.tmpdir")
|
||||
+ File.separator + zkPath + "/leafconf/{port}/workerID.properties";
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化zk中的持久化SEQUENTIAL节点数据。如果不存在就创建新的,存在则引用原有的。
|
||||
*
|
||||
* @return true初始化成功,否则失败。
|
||||
*/
|
||||
public boolean init() {
|
||||
try {
|
||||
CuratorFramework curator = createWithOptions(connectionString,
|
||||
new RetryUntilElapsed(1000, 4));
|
||||
curator.start();
|
||||
Stat stat = curator.checkExists().forPath(pathForever);
|
||||
if (stat == null) {
|
||||
//不存在根节点,机器第一次启动,创建/snowflake/ip:port-000000000,并上传数据
|
||||
zkAddressNode = createNode(curator);
|
||||
//worker id 默认是0
|
||||
updateLocalWorkerId(workerId);
|
||||
//定时上报本机时间给forever节点
|
||||
scheduledUploadData(curator, zkAddressNode);
|
||||
return true;
|
||||
} else {
|
||||
//ip:port->00001
|
||||
Map<String, Integer> nodeMap = Maps.newHashMap();
|
||||
//ip:port->(ipport-000001)
|
||||
Map<String, String> realNode = Maps.newHashMap();
|
||||
//存在根节点,先检查是否有属于自己的根节点
|
||||
List<String> keys = curator.getChildren().forPath(pathForever);
|
||||
for (String key : keys) {
|
||||
String[] nodeKey = key.split("-");
|
||||
realNode.put(nodeKey[0], key);
|
||||
nodeMap.put(nodeKey[0], Integer.parseInt(nodeKey[1]));
|
||||
}
|
||||
Integer workerid = nodeMap.get(listenAddress);
|
||||
if (workerid != null) {
|
||||
//有自己的节点,zk_AddressNode=ip:port
|
||||
zkAddressNode = pathForever + "/" + realNode.get(listenAddress);
|
||||
//启动worder时使用会使用
|
||||
workerId = workerid;
|
||||
if (!checkInitTimeStamp(curator, zkAddressNode)) {
|
||||
throw new CheckLastTimeException(
|
||||
"Init timestamp check error,forever node timestamp greater than this node time");
|
||||
}
|
||||
} else {
|
||||
//表示新启动的节点,创建持久节点 ,不用check时间
|
||||
String newNode = createNode(curator);
|
||||
zkAddressNode = newNode;
|
||||
String[] nodeKey = newNode.split("-");
|
||||
workerId = Integer.parseInt(nodeKey[1]);
|
||||
}
|
||||
doService(curator);
|
||||
updateLocalWorkerId(workerId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Start node ERROR", e);
|
||||
try {
|
||||
Properties properties = new Properties();
|
||||
properties.load(new FileInputStream(new File(cachePath.replace("{port}", port + ""))));
|
||||
workerId = Integer.parseInt(properties.getProperty("workerId"));
|
||||
log.warn("START FAILED ,use local node file properties workerID-{}", workerId);
|
||||
} catch (Exception e1) {
|
||||
log.error("Read file error ", e1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void doService(CuratorFramework curator) {
|
||||
scheduledUploadData(curator, zkAddressNode);
|
||||
}
|
||||
|
||||
private void scheduledUploadData(final CuratorFramework curator, final String zkAddressNode) {
|
||||
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
|
||||
new BasicThreadFactory.Builder().namingPattern("schedule-upload-time-%d").daemon(true).build());
|
||||
executorService.scheduleWithFixedDelay(() ->
|
||||
updateNewData(curator, zkAddressNode), 1, 3, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
private boolean checkInitTimeStamp(CuratorFramework curator, String zkAddressNode) throws Exception {
|
||||
byte[] bytes = curator.getData().forPath(zkAddressNode);
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Endpoint endPoint = mapper.readValue(new String(bytes), Endpoint.class);
|
||||
//该节点的时间不能小于最后一次上报的时间
|
||||
return endPoint.getTimestamp() <= System.currentTimeMillis();
|
||||
}
|
||||
|
||||
private String createNode(CuratorFramework curator) throws Exception {
|
||||
try {
|
||||
return curator.create()
|
||||
.creatingParentsIfNeeded()
|
||||
.withMode(CreateMode.PERSISTENT_SEQUENTIAL)
|
||||
.forPath(pathForever + "/" + listenAddress + "-", buildData().getBytes());
|
||||
} catch (Exception e) {
|
||||
log.error("create node error msg {} ", e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void updateNewData(CuratorFramework curator, String path) {
|
||||
try {
|
||||
if (System.currentTimeMillis() < lastUpdateTime) {
|
||||
return;
|
||||
}
|
||||
curator.setData().forPath(path, buildData().getBytes());
|
||||
lastUpdateTime = System.currentTimeMillis();
|
||||
} catch (Exception e) {
|
||||
log.info("update init data error path is {} error is {}", path, e);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildData() throws JsonProcessingException {
|
||||
Endpoint endpoint = new Endpoint(ip, port, System.currentTimeMillis());
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.writeValueAsString(endpoint);
|
||||
}
|
||||
|
||||
private void updateLocalWorkerId(int workId) {
|
||||
File leafConfFile = new File(cachePath.replace("{port}", port));
|
||||
boolean exists = leafConfFile.exists();
|
||||
log.info("file exists status is {}", exists);
|
||||
if (exists) {
|
||||
try {
|
||||
FileUtils.writeStringToFile(
|
||||
leafConfFile, "workId=" + workId, StandardCharsets.UTF_8, false);
|
||||
log.info("update file cache workId is {}", workId);
|
||||
} catch (IOException e) {
|
||||
log.error("update file cache error ", e);
|
||||
}
|
||||
} else {
|
||||
//不存在文件,父目录页肯定不存在
|
||||
try {
|
||||
boolean mkdirs = leafConfFile.getParentFile().mkdirs();
|
||||
log.info("init local file cache create parent dis status is {}, worker id is {}", mkdirs, workId);
|
||||
if (mkdirs) {
|
||||
if (leafConfFile.createNewFile()) {
|
||||
FileUtils.writeStringToFile(
|
||||
leafConfFile, "workId=" + workId, StandardCharsets.UTF_8, false);
|
||||
log.info("local file cache workId is {}", workId);
|
||||
}
|
||||
} else {
|
||||
log.warn("create parent dir error===");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.warn("craete workId conf file error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy) {
|
||||
return CuratorFrameworkFactory.builder().connectString(connectionString)
|
||||
.retryPolicy(retryPolicy)
|
||||
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
|
||||
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 上报数据结构
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
static class Endpoint {
|
||||
private String ip;
|
||||
private String port;
|
||||
private long timestamp;
|
||||
}
|
||||
|
||||
public static class CheckLastTimeException extends RuntimeException {
|
||||
public CheckLastTimeException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.orange.demo.common.sequence.wrapper;
|
||||
|
||||
import com.orange.demo.common.sequence.config.IdGeneratorProperties;
|
||||
import com.orange.demo.common.sequence.generator.BasicIdGenerator;
|
||||
import com.orange.demo.common.sequence.generator.MyIdGenerator;
|
||||
import com.orange.demo.common.sequence.generator.SnowflakeIdGenerator;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
/**
|
||||
* 分布式Id生成器的封装类。该对象可根据配置选择不同的生成器实现类。
|
||||
*
|
||||
* @author Orange Team
|
||||
* @date 2020-08-08
|
||||
*/
|
||||
@Component
|
||||
public class IdGeneratorWrapper {
|
||||
|
||||
@Autowired
|
||||
private IdGeneratorProperties properties;
|
||||
/**
|
||||
* Id生成器接口对象。
|
||||
*/
|
||||
private MyIdGenerator idGenerator;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
if (properties.getAdvanceIdGenerator()) {
|
||||
idGenerator = new SnowflakeIdGenerator(
|
||||
properties.getZkAddress(), properties.getIdPort(), properties.getZkPath());
|
||||
} else {
|
||||
idGenerator = new BasicIdGenerator(properties.getSnowflakeWorkNode());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取基于Snowflake算法的数值型Id。
|
||||
* 由于底层实现为synchronized方法,因此计算过程串行化,且线程安全。
|
||||
*
|
||||
* @return 计算后的全局唯一Id。
|
||||
*/
|
||||
public long nextLongId() {
|
||||
return idGenerator.nextLongId();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取基于Snowflake算法的字符串Id。
|
||||
* 由于底层实现为synchronized方法,因此计算过程串行化,且线程安全。
|
||||
*
|
||||
* @return 计算后的全局唯一Id。
|
||||
*/
|
||||
public String nextStringId() {
|
||||
return idGenerator.nextStringId();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.orange.demo.common.sequence.config.IdGeneratorAutoConfigure
|
||||
Reference in New Issue
Block a user