SpringCloud(二)

SpringCould的学习笔记

黑马SpringCould的学习笔记

‍‌⁠‍‍‌‬‍‬‌⁠‬‬‍‬‍‍‬‬‬黑马学成在线项目v3.1讲义 - 飞书云文档 (feishu.cn)

一、ffmpeg

FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。

1、安装

Download FFmpeg

25

下载完后将文件夹bin添加到系统变量path中

27

1
ffmpeg -version

显示版本即可

26

2、简单使用

将1a.mp4文件转成1a.avi

进入cmd输入

1
ffmpeg -i 1a.mp4 1a.avi

当然,你可以修改后缀名。将视频转成需要的格式

3、通过java打开应用

1
2
3
4
5
6
7
8
@Test
public void test() throws IOException {
ProcessBuilder builder = new ProcessBuilder();
builder.command("E:\\Program Files\\Tencent\\QQNT\\QQ.exe");
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();
}

4、Mp4VideoUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class Mp4VideoUtil extends VideoUtil {

String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe";//ffmpeg的安装位置
String video_path = "D:\\BaiduNetdiskDownload\\test1.avi";
String mp4_name = "test1.mp4";
String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/";
public Mp4VideoUtil(String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path){
super(ffmpeg_path);
this.ffmpeg_path = ffmpeg_path;
this.video_path = video_path;
this.mp4_name = mp4_name;
this.mp4folder_path = mp4folder_path;
}
//清除已生成的mp4
private void clear_mp4(String mp4_path){
//删除原来已经生成的m3u8及ts文件
File mp4File = new File(mp4_path);
if(mp4File.exists() && mp4File.isFile()){
mp4File.delete();
}
}
/**
* 视频编码,生成mp4文件
* @return 成功返回success,失败返回控制台日志
*/
public String generateMp4(){
//清除已生成的mp4
// clear_mp4(mp4folder_path+mp4_name);
clear_mp4(mp4folder_path);
/*
ffmpeg.exe -i lucene.avi -c:v libx264 -s 1280x720 -pix_fmt yuv420p -b:a 63k -b:v 753k -r 18 .\lucene.mp4
*/
List<String> commend = new ArrayList<String>();
//commend.add("D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe");
commend.add(ffmpeg_path);
commend.add("-i");
// commend.add("D:\\BaiduNetdiskDownload\\test1.avi");
commend.add(video_path);
commend.add("-c:v");
commend.add("libx264");
commend.add("-y");//覆盖输出文件
commend.add("-s");
commend.add("1280x720");
commend.add("-pix_fmt");
commend.add("yuv420p");
commend.add("-b:a");
commend.add("63k");
commend.add("-b:v");
commend.add("753k");
commend.add("-r");
commend.add("18");
// commend.add(mp4folder_path + mp4_name );
commend.add(mp4folder_path );
String outstring = null;
try {
ProcessBuilder builder = new ProcessBuilder();
builder.command(commend);
//将标准输入流和错误输入流合并,通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p = builder.start();
outstring = waitFor(p);

} catch (Exception ex) {

ex.printStackTrace();

}
// Boolean check_video_time = this.check_video_time(video_path, mp4folder_path + mp4_name);
Boolean check_video_time = this.check_video_time(video_path, mp4folder_path);
if(!check_video_time){
return outstring;
}else{
return "success";
}
}

public static void main(String[] args) throws IOException {
//ffmpeg的路径
String ffmpeg_path = "D:\\tools\\ffmpeg\\ffmpeg.exe";//ffmpeg的安装位置
//源avi视频的路径
String video_path = "C:\\Users\\Lenovo\\Desktop\\upload\\1b.avi";
//转换后mp4文件的名称
String mp4_name = "nacos01.mp4";
//转换后mp4文件的路径
String mp4_path = "C:\\Users\\Lenovo\\Desktop\\upload\\1b.mp4";
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String s = videoUtil.generateMp4();
System.out.println(s);
}
}

修改main函数中的一些参数以测试

二、XXL-JOB

1、分布式任务调度

对于上面ffmpeg视频转码可以认为是一个任务的执行。那么对于多个任务,如何高效处理呢?

  1. 使用多线程:充分利用单机资源

  2. 分布式加多线程:在多线程基础上,利用多台计算机。

我们可以先思考一下下面业务场景的解决方案:

​ 每隔24小时执行数据备份任务。

​ 12306网站会根据车次不同,设置几个时间点分批次放票。

​ 某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。

​ 商品成功发货后,需要向客户发送短信提醒。

1、简单实现间隔时间执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {    
//任务执行间隔时间
final long timeInterval = 1000;
Runnable runnable = new Runnable() {
public void run() {
while (true) {
//TODO:something
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread = new Thread(runnable);
thread.start();
}

2、Timer方式实现

1
2
3
4
5
6
7
8
9
public static void main(String[] args){  
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
//TODO:something
}
}, 1000, 2000); //1秒后开始调度,每2秒执行一次
}

3、ScheduledExecutor方式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String [] agrs){
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
//TODO:something
System.out.println("todo something");
}
}, 1, // 首次执行的延迟时间(1秒)
2, // 每次执行之间的时间间隔(2秒)
TimeUnit.SECONDS);
}

4、第三方Quartz方式实现

项目地址:https://github.com/quartz-scheduler/quartz

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String [] agrs) throws SchedulerException {
//创建一个Scheduler
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler scheduler = schedulerFactory.getScheduler();
//创建JobDetail
JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);
jobDetailBuilder.withIdentity("jobName","jobGroupName");
JobDetail jobDetail = jobDetailBuilder.build();
//创建触发的CronTrigger 支持按日历调度
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity("triggerName", "triggerGroupName")
.startNow()
.withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?"))
.build();
scheduler.scheduleJob(jobDetail,trigger);
scheduler.start();
}

public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext){
System.out.println("todo something");
}
}

什么是分布式任务调度?

就是部署多个分布式实例,在任务调度方面采用分布式任务调度。一个服务部署多个冗余实例。

2、xxlJob

XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。

官网:https://www.xuxueli.com/xxl-job/

文档:https://www.xuxueli.com/xxl-job/#《分布式任务调度平台XXL-JOB》

分为调度中心任务执行器任务

28

3、安装与导入

1、首先下载XXL-JOB

GitHub:https://github.com/xuxueli/xxl-job 这里使用2.3.1

2、下载后以项目打开

29

xxl-job-admin:调度中心

xxl-job-core:公共依赖

xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)

​ :xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;

​ :xxl-job-executor-sample-frameless:无框架版本;

3、根据doc官方文档中的快速入门构建

  1. 在/xxl-job/doc/db/tables_xxl_job.sql中执行sql脚本,创建数据库表

  2. 运行core

    1. 配置admin的配置文件

    2. 编译源码成为jar包然后运行

    3. 也可以直接通过docker镜像搭建

      1
      2
      3
      4
      5
      6
      7
      8
      docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin  -d xuxueli/xxl-job-admin:{指定版本}

      /**
      * 如需自定义 mysql 等配置,可通过 "-e PARAMS" 指定,参数格式 PARAMS="--key=value --key2=value2"
      * 配置项参考文件:/xxl-job/xxl-job-admin/src/main/resources/application.properties
      * 如需自定义 JVM内存参数 等配置,可通过 "-e JAVA_OPTS" 指定,参数格式 JAVA_OPTS="-Xmx512m"
      */
      docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本}
    4. 通过docker运行后http://192.168.101.65:8088/xxl-job-admin

      默认账号密码admin/123456

      30

  3. 任务调度中心新增执行器

    31

  4. 在任务工程中绑定xxl-job

    1. 导入依赖

      1
      2
      3
      4
      5
      <!--        任务调度-->
      <dependency>
      <groupId>com.xuxueli</groupId>
      <artifactId>xxl-job-core</artifactId>
      </dependency>
    2. 在nacos下的media-service-dev.yaml下配置xxl-job

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      xxl:
      job:
      admin:
      addresses: http://192.168.101.65:8088/xxl-job-admin
      executor:
      appname: media-process-service #执行器的应用名
      address:
      ip:
      port: 9999 #执行器启动的端口
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
      accessToken: default_token
    3. 在media-service的config中添加xxl-job config代码(配置完后成为执行器)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      @Configuration
      public class XxlJobConfig {
      private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

      //读取nacos的对应共享配置文件信息
      @Value("${xxl.job.admin.addresses}")
      private String adminAddresses;

      @Value("${xxl.job.accessToken}")
      private String accessToken;

      @Value("${xxl.job.executor.appname}")
      private String appname;

      @Value("${xxl.job.executor.address}")
      private String address;

      @Value("${xxl.job.executor.ip}")
      private String ip;

      @Value("${xxl.job.executor.port}")
      private int port;

      @Value("${xxl.job.executor.logpath}")
      private String logPath;

      @Value("${xxl.job.executor.logretentiondays}")
      private int logRetentionDays;


      @Bean
      public XxlJobSpringExecutor xxlJobExecutor() {
      logger.info(">>>>>>>>>>> xxl-job config init.");
      XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
      xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
      xxlJobSpringExecutor.setAppname(appname);
      xxlJobSpringExecutor.setAddress(address);
      xxlJobSpringExecutor.setIp(ip);
      xxlJobSpringExecutor.setPort(port);
      xxlJobSpringExecutor.setAccessToken(accessToken);
      xxlJobSpringExecutor.setLogPath(logPath);
      xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

      return xxlJobSpringExecutor;
      }

      }
    4. 配置成功后在对应执行器可以看到连接成功

      32

  5. 自定义执行任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Component
    public class SampleXxlJob {
    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

    /**
    * 1、简单任务示例(Bean模式)
    */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
    XxlJobHelper.log("XXL-JOB, Hello World.");

    for (int i = 0; i < 5; i++) {
    System.out.println(i);
    XxlJobHelper.log("beat at:" + i);
    TimeUnit.SECONDS.sleep(2);
    }
    // default success
    }
    }
  6. 添加下任务

    33

    34

    bean模式:在项目工程中编写执行器的任务代码。

    GLUE模式:将任务代码编写在调度中心。

  7. 打开任务位于的项目,可以看到任务按照时间间隔执行

4、分片广播

前面学习了xxl-job的基础使用。接下来展示如何进行分布式任务处理

我们会启动多个执行器组成一个集群,去执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
高级配置:
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
- 调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
  • 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;

  • 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。

1、添加分片任务

35

2、分配任务模版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

logger.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
logger.info("第 {} 片, 命中分片开始处理", i);
} else {
logger.info("第 {} 片, 忽略", i);
}
}

}

3、启动多个实例。注意:实例端口号,与连接到任务调度中心的端口号都不能冲突

1
-Dserver.port=8082 -Dxxl.job.executor.port=9982

36

37

总分片数: 启动实例数量

我们可以根据当前分片序号,分配给它需要处理的任务。当某个实例执行器挂掉了,会动态调节总分片数减一

5、实例

  1. 配置好执行器、分片广播任务。

  2. 根据不同的任务参数取出任务

    1
    2
    @Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")
    List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);
  3. 处理对应的视频转码,更新URL。更新一系列数据库数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@Component
@Slf4j
public class VideoTask {

@Autowired
MediaFileService mediaFileService;

@Autowired
MediaFileProcessService mediaFileProcessService;

@Value("${videoprocess.ffmpegpath}")
private String ffmpegpath;

/**
* 2、分片广播任务
*/
@XxlJob("videoJobHandler")
public void shardingJobHandler() throws Exception {

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
//cpu核心数
int processors = Runtime.getRuntime().availableProcessors();
//查询待处理的任务
List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);

//任务数量
int size = mediaProcessList.size();
log.debug("取到的视频处理任务数:",size);
if(size<=0){
return;
}
//创建一个线程池
ExecutorService executorService = Executors.newFixedThreadPool(size);

//使用一个计数器
CountDownLatch countDownLatch = new CountDownLatch(size);

mediaProcessList.forEach(mediaProcess -> {
executorService.execute(()->{
try{
//任务id
Long taskId = mediaProcess.getId();

//开启任务
boolean b = mediaFileProcessService.startTask(taskId);
if(!b){
log.debug("抢占任务失败,任务id:{}",taskId);
return;
}
//执行视频转码

//桶
String bucket = mediaProcess.getBucket();
//文件的id就是MD5
String fileId = mediaProcess.getFileId();

String objectName = mediaProcess.getFilePath();
//下载minIO的视频到本地
File file = mediaFileService.downloadFileFromMinIO(bucket, objectName);
if(file == null){
log.debug("下载视频出错,任务id:{},bucket:{},objectName:{}",taskId,bucket,objectName);
//保存任务处理失败的结果
mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"下载视频到本地失败");
return;
}


//源avi视频的路径
String video_path = file.getAbsolutePath();
//转换后mp4文件的名称
String mp4_name = fileId+".mp4";
//转换后mp4文件的路径
//先创建一个临时文件,作为转换后的文件
File tempFile = null;
try {
tempFile = File.createTempFile("minio", ".mp4");
} catch (IOException e) {
log.error("创建临时文件异常:{}",e.getMessage());
mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"创建临时文件异常");
return;
}
String mp4_path = tempFile.getAbsolutePath();
//创建工具类对象
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath,video_path,mp4_name,mp4_path);
//开始视频转换,成功将返回success
String result = videoUtil.generateMp4();
if (!result.equals("success")){
//MP4文件的url
log.debug("视频转码失败,原因:{},bucket:{},objectName:{}",result,bucket,objectName);
//保存任务状态为失败
mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,result);
return;
}


//对象名objectName改后缀
objectName = objectName.substring(0, objectName.lastIndexOf("."));
objectName += ".mp4";

//上传到minIO
boolean b1 = mediaFileService.addMediaFilesToMinIO(tempFile.getAbsolutePath(), "video/mp4", bucket, objectName);
if(!b1){
log.debug("上传MP4到minio失败,taskId:{}",taskId);
mediaFileProcessService.saveProcessFinishStatus(taskId,"3",fileId,null,"上传mp4到minio失败");
return;
}

//MP4文件的url

String url = "/" + bucket + "/" + getFilePath(fileId, ".mp4");
//保存任务处理结果
//保存任务状态为成功
mediaFileProcessService.saveProcessFinishStatus(taskId,"2",fileId,url,"保存任务状态成功");
}finally {
//计数器减去1
countDownLatch.countDown();
}
});
});
//阻塞,最多等待30分钟,阻塞最多等待一定时间都接触阻塞
countDownLatch.await(30, TimeUnit.MINUTES);
}
private String getFilePath(String fileMd5,String fileExt){
return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}

}

任务补偿机制: 视频处理过程线程挂掉了,导致视频状态一直处理中。应该设置任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。

达到最大失败次数: 当达到最大次数的时候,可以设计程序提醒人工,进行人工处理,让其自行转码视频。

分块文件清理: 设置一个定时任务。定时清理超时还未成功上传的分块文件。

三、Freemarker

1、模版引擎

1、浏览器请求web服务器

2、服务器渲染页面,渲染的过程就是向jsp页面(模板)内填充数据(模型)。

3、服务器将渲染生成的页面返回给浏览器。

所以模板引擎就是:模板+数据=输出,Jsp页面就是模板,页面中嵌入的jsp标签就是数据,两者相结合输出html网页。

常见模版引擎有: Jsp、Freemarker、Thymeleaf 、Velocity 等

这里介绍Freemarker官方地址:http://freemarker.foofun.cn/

2、使用

1、引入依赖

1
2
3
4
5
<!-- Spring Boot 对结果视图 Freemarker 集成 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>

2、Nacos添加共用配置freemarker-config-dev.yaml

1
2
3
4
5
6
7
8
9
10
11
spring:
freemarker:
enabled: true
cache: false #关闭模板缓存,方便测试
settings:
template_update_delay: 0
suffix: .ftl #页面模板后缀名
charset: UTF-8
template-loader-path: classpath:/templates/ #页面模板位置(默认为 classpath:/templates/)
resources:
add-mappings: false #关闭项目中的静态资源映射(static、resources文件夹下的资源)

3、项目拉取共用配置

38

4、resources下创建templates目录,添加test.ftl模板文件

1
2
3
4
5
6
7
8
9
10
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello World!</title>
</head>
<body>
Hello ${name}!
</body>
</html>

编辑controller方法,准备数据

1
2
3
4
5
6
7
8
9
10
11
12
13
@Controller
public class FreemarkerController {

@GetMapping("/testfreemarker")
public ModelAndView test(){
ModelAndView modelAndView = new ModelAndView();
//设置模型数据
modelAndView.addObject("name","小明");
//设置模板名称
modelAndView.setViewName("test");
return modelAndView;
}
}

5、启动工程,访问对应接口: http://localhost:63040/content/testfreemarker

39

3、数据填充

1、数据模型

1
2
3
4
5
6
7
8
9
@Data
public class CoursePreviewDto {
//课程基本信息、课程营销信息
private CourseBaseInfoDto courseBase;

//课程计划信息
private List<TeachplanDto> teachplans;
//课程师资信息
}

2、接口设计

1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("/coursepreview/{courseId}")
public ModelAndView preview(@PathVariable("courseId") Long courseId){
ModelAndView modelAndView = new ModelAndView();
//查询模型数据
CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(courseId);

modelAndView.addObject("model",coursePreviewInfo);

modelAndView.setViewName("course_template");

return modelAndView;
}

3、数据填充规则

http://freemarker.foofun.cn/ref_directives.html

40

4、额外介绍

这里介绍一下黑马学成在线这个项目静态资源是如何访问

1、本地hosts文件原理

在我们访问一个网址的时候,会优先读取本机hosts文件是否存储了对应的host规则:

1
127.0.0.1 www.51xuecheng.cn 51xuecheng.cn ucenter.51xuecheng.cn teacher.51xuecheng.cn file.51xuecheng.cn

2、nginx的配置文件

这里是nginx的反向代理。当我们访问nginx监听的 http://www.51xuecheng.cn/static/js/querystring/index.js 静态文件时

nginx会帮我们反向代理到真实存储这些静态文件的位置:D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/js/

并替换、补充路径: D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/js/querystring/index.js

达到反向代理: 不了解资源真实地址。

只要配置好,访问nginx配置的网址就会自动反向代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
http {
include mime.types;
default_type application/octet-stream;
client_max_body_size 100M; # 设置客户端请求体最大值
client_body_buffer_size 128k; # 设置请求体缓存区大小

sendfile on;

keepalive_timeout 65;

server {
listen 80;
server_name www.51xuecheng.cn localhost;

ssi on;
ssi_silent_errors on;

location / {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/;
index index.html index.htm;
}
#静态资源
location /static/img/ {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/img/;
}
location /static/css/ {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/css/;
}
location /static/js/ {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/js/;
}
location /static/plugins/ {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/plugins/;
add_header Access-Control-Allow-Origin http://ucenter.51xuecheng.cn;
add_header Access-Control-Allow-Credentials true;
add_header Access-Control-Allow-Methods GET;
}
location /plugins/ {
alias D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/plugins/;
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}

}
}

继续介绍学成在线文件服务器如何访问的

1、添加本地host

1
127.0.0.1 www.51xuecheng.cn file.51xuecheng.cn

2、nginx配置文件

访问对应网址 http://file.51xuecheng.cn/video/c/7/c7e11b5f1520d4aca1a5d0ef9275ca0c/c7e11b5f1520d4aca1a5d0ef9275ca0c.mp4

自动替换成 http://192.168.101.65:9000/video/c/7/c7e11b5f1520d4aca1a5d0ef9275ca0c/c7e11b5f1520d4aca1a5d0ef9275ca0c.mp4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#文件服务
upstream fileserver{
server 192.168.101.65:9000 weight=10;
}
server {
listen 80;
server_name file.51xuecheng.cn;
#charset koi8-r;
ssi on;
ssi_silent_errors on;
#access_log logs/host.access.log main;
location /video {
proxy_pass http://fileserver;
}

location /mediafiles {
proxy_pass http://fileserver;
}
}

如果http://fileserver后有别的路径则会将/video替换,否则不会替换。

例如下面几个等价:

1
2
3
4
5
6
7
8
9
10
11
location /video2 {
proxy_pass http://fileserver/video;
}

location /video {
proxy_pass http://fileserver/video;
}

location /video {
proxy_pass http://fileserver;
}

其他使用以上介绍的规则即可套用。

5、页面静态化

1、调用的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void generateCourseHtml(MqMessage mqMessage,long courseId){

Long taskId = mqMessage.getId();

MqMessageService mqMessageService = this.getMqMessageService();

//任务幂等性处理
//查询数据库取出该阶段的执行状态
int stageOne = mqMessageService.getStageOne(taskId);
if(stageOne > 0){
log.debug("课程静态化任务完成,无需处理");
return;
}

//开始课程静态化,生成html页面
File file = coursePublishService.generateCourseHtml(courseId);
if(file == null){
//存在问题
XueChengPlusException.cast("生成的静态页面为空");
}

//将html上传到minio
coursePublishService.uploadCourseHtml(courseId,file);

//设置任务的状态为完成
mqMessageService.completedStageOne(taskId);

}

2、coursePublishService.generateCourseHtml方法定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public File generateCourseHtml(Long courseId) {
Configuration configuration = new Configuration(Configuration.getVersion());
File file = null;
try{
//拿到classpath路径
String classpath = this.getClass().getResource("/").getPath();
//指定模版的路径
configuration.setDirectoryForTemplateLoading(new File(classpath+"/templates/"));
//指定编码
configuration.setDefaultEncoding("utf-8");
//得到模版
Template template = configuration.getTemplate("course_template.ftl");

//准备数据
CoursePreviewDto coursePreviewDto = this.getCoursePreviewInfo(courseId);
HashMap<String, Object> map = new HashMap<>();
map.put("model",coursePreviewDto);

//填写模版和数据
String html = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);


//得到输入流
InputStream inputStream = IOUtils.toInputStream(html, "utf-8");


//文件
file = File.createTempFile("coursepublish",".html");

//输出文件
FileOutputStream fileOutputStream = new FileOutputStream(file);



//使用流将html写入文件
IOUtils.copy(inputStream,fileOutputStream);


//关闭流
inputStream.close();
fileOutputStream.close();
}catch (Exception e){
log.error("页面静态化出现问题:课程id:{}",courseId,e);
e.printStackTrace();
}

return file;
}

3、upload远程调用媒资服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void uploadCourseHtml(Long courseId, File file){

try{
//文件上传到minio中调用媒资服务
//将file类型转成multipart
MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(file);
String upload = mediaServiceClient.upload(multipartFile, "course/"+courseId+".html");
if(upload == null){
log.debug("远程调用走降级的逻辑,得到上传的结果为空,课程id:{}",courseId);
XueChengPlusException.cast("上传静态文件中存在异常");
}
}catch ( Exception e){
e.printStackTrace();
XueChengPlusException.cast("上传静态文件中存在异常");
}
}

四、分布式事务

1、CAP理论

  • 本地事务: 数据库事务

  • 分布式事务: 事务的操作由多个服务通过网络合作完成。网络存在不可到达性

1
2
3
4
begin transaction; 
//1.本地数据库操作:张三减少金额
//2.远程调用:让李四增加金额
commit transation;

远程调用由于网络问题会失效,导致分布式事务问题。

CAP

  • Consistency : 一致性,不管用户访问哪一个节点,得到的数据都是最新的。

  • Availability : 可用性,任何时候都能访问到可用的节点。

  • Partition tolerance :分区容忍性,由于网络错误导致的请求中断,消息丢失,但系统任然对外提供服务

对于分布式系统: 必须满足P,C 、A冲突,只可满足一个,所以是AP or CP(强调一致性、可用性)。

2、BASE理论

在实际开发中,我们的场景一开始符合AP,但是最终会同步满足CP。 虽然AP舍弃了C一致性,但是最终还是达到了一致。也就满足了最终一致性。

BASE

  • Basically Available(基本可用):出现问题时保证核心服务可用

  • Soft state(软状态): 可以存在中间状态,比如打印机打印,提示正在打印,需要时间打印,但是最终是会打印成功的。

  • Eventually consistent (最终一致性): 退款后资金不会马上到账,而是一段时间后到账,舍弃了强一致性,满足最终一致性

实现CP:使用Seata的AT模式、TCC模式等。

实现AP:使用消息队列、任务调度方式(将课程信息由数据库同步到elasticsearch、MinIO、redis等等中)

五、消息处理SDK

学成在线中使用了自定义的消息处理SDK,达到复用的目的:

  • 消息处理中,通过抽象的形式,执行某一个需要我们自己填充的方法

  • 保证任务幂等性: 任务执行完成后会从消息表删除,如果消息的状态是完成或不存在消息表中则不用执行。

  • 保证任务不重复执行: 任务调度采用分片广播,根据分片参数去获取任务,另外阻塞调度策略为丢弃任务。由于是信息同步类任务,所以重复执行也没关系,不采用抢占任务的方式。

  • 定义数据表多个字段state,每个字段代表一个阶段完成

消息数据库表服务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface MqMessageService extends IService<MqMessage> {

/**
* @description 扫描消息表记录,采用与扫描视频处理表相同的思路
* @param shardIndex 分片序号,shardTotal 分片总数,count 扫描记录数
* @return java.util.List 消息记录
*/
public List<MqMessage> getMessageList(int shardIndex, int shardTotal, String messageType,int count);

/**
* @description 完成任务
* @param id 消息id
* @return int 更新成功:1
* @author Mr.M
* @date 2022/9/21 20:49
*/
public int completed(long id);

/**
* @description 完成阶段任务
* @param id 消息id
* @return int 更新成功:1
* @author Mr.M
* @date 2022/9/21 20:49
*/
public int completedStageOne(long id);
public int completedStageTwo(long id);
public int completedStageThree(long id);
public int completedStageFour(long id);

/**
* @description 查询阶段状态
* @param id
* @return int
* @author Mr.M
* @date 2022/9/21 20:54
*/
public int getStageOne(long id);
public int getStageTwo(long id);
public int getStageThree(long id);
public int getStageFour(long id);

}

消息SDK提供消息处理抽象类:

只需要实现这个抽象方法execute(),在里面写我们任务具体逻辑。 执行process方法中会自动执行execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Slf4j
@Data
public abstract class MessageProcessAbstract {

@Autowired
MqMessageService mqMessageService;


/**
* @param mqMessage 执行任务内容
* @return boolean true:处理成功,false处理失败
* @description 任务处理
*/
public abstract boolean execute(MqMessage mqMessage);


/**
* @description 扫描消息表多线程执行任务
* @param shardIndex 分片序号,shardTotal 分片总数, messageType 消息类型, count 一次取出任务总数,timeout 预估任务执行时间,到此时间如果任务还没有结束则强制结束 单位秒
*/
public void process(int shardIndex, int shardTotal, String messageType,int count,long timeout) {

try {
//扫描消息表获取任务清单
List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count);
//任务个数
int size = messageList.size();
log.debug("取出待处理消息"+size+"条");
if(size<=0){
return ;
}

//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
//计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
messageList.forEach(message -> {
threadPool.execute(() -> {
log.debug("开始任务:{}",message);
//处理任务
try {
boolean result = execute(message);
if(result){
log.debug("任务执行成功:{})",message);
//更新任务状态,删除消息表记录,添加到历史表
int completed = mqMessageService.completed(message.getId());
if (completed>0){
log.debug("任务执行成功:{}",message);
}else{
log.debug("任务执行失败:{}",message);
}
}
} catch (Exception e) {
e.printStackTrace();
log.debug("任务出现异常:{},任务:{}",e.getMessage(),message);
}
//计数
countDownLatch.countDown();
log.debug("结束任务:{}",message);

});
});

//等待,给一个充裕的超时时间,防止无限等待,到达超时时间还没有处理完成则结束任务
countDownLatch.await(timeout,TimeUnit.SECONDS);
System.out.println("结束....");
} catch (InterruptedException e) {
e.printStackTrace();

}

}

}

1、使用

调用消息服务类添加消息

1
2
3
4
5
6
private void saveCoursePublishMessage(Long courseId){
MqMessage mqMessage = mqMessageService.addMessage("course_publish", String.valueOf(courseId), null, null);
if(mqMessage==null){
XueChengPlusException.cast(CommonError.UNKOWN_ERROR);
}
}

开启任务调度xxljob,并且配置成功后.

任务调度不会识别消息数据表是否有新消息,只会执行process方法。process中会判断是否有未处理完的消息并且分配给不同的节点执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {

//部分重要代码
//任务调度入口
@XxlJob("CoursePublishJobHandler")
public void coursePublishJobHandler()throws Exception{
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

process(shardIndex,shardTotal,"course_publish",30,60);

}

//执行发布任务的逻辑方法,如果此方法抛出异常,表示任务执行失败
@Override
public boolean execute(MqMessage mqMessage) {
//从mqMessage拿到课程id
Long courseId = Long.valueOf(mqMessage.getBusinessKey1());
//课程静态化上传minio
generateCourseHtml(mqMessage,courseId);
//向elasticSearch写索引数据
saveCourseIndex(mqMessage,courseId);
//向redis向数据

//放回true表示完成
return true;
}

private void generateCourseHtml(MqMessage mqMessage,long courseId){

Long taskId = mqMessage.getId();

MqMessageService mqMessageService = this.getMqMessageService();

//任务幂等性处理
//查询数据库取出该阶段的执行状态
int stageOne = mqMessageService.getStageOne(taskId);
if(stageOne > 0){
log.debug("课程静态化任务完成,无需处理");
return;
}

//开始课程静态化,生成html页面
File file = coursePublishService.generateCourseHtml(courseId);
if(file == null){
//存在问题
XueChengPlusException.cast("生成的静态页面为空");
}

//将html上传到minio
coursePublishService.uploadCourseHtml(courseId,file);

//设置任务的状态为完成
mqMessageService.completedStageOne(taskId);

}

//保存课程索引信息
public void saveCourseIndex(MqMessage mqMessage,long courseId){
//任务id
Long taskId = mqMessage.getId();
MqMessageService mqMessageService = this.getMqMessageService();
//取出第二个阶段状态
int stageTwo = mqMessageService.getStageTwo(taskId);

//任务幂等性
if(stageTwo >0){
log.debug("课程信息已写入,无需执行。。。");
return;
}


//从课程发布表中查询课程信息
CoursePublish coursePublish = coursePublishMapper.selectById(courseId);
CourseIndex courseIndex = new CourseIndex();
BeanUtils.copyProperties(coursePublish,courseIndex);
//查询课程信息,调用搜索服务添加索引
Boolean add = searchServiceClient.add(courseIndex);
if(!add){
XueChengPlusException.cast("远程调用添加索引失败");
}
//完成本阶段的任务
mqMessageService.completedStageTwo(taskId);
}
}