SpringCould的学习笔记
黑马SpringCould的学习笔记
黑马学成在线项目v3.1讲义 - 飞书云文档 (feishu.cn)
一、ffmpeg
FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。
1、安装
Download FFmpeg
下载完后将文件夹bin添加到系统变量path中
显示版本即可
2、简单使用
将1a.mp4文件转成1a.avi
进入cmd输入
当然,你可以修改后缀名。将视频转成需要的格式
3、通过java打开应用
1 2 3 4 5 6 7 8 @Test public void test () throws IOException { ProcessBuilder builder = new ProcessBuilder (); builder.command("E:\\Program Files\\Tencent\\QQNT\\QQ.exe" ); builder.redirectErrorStream(true ); Process p = builder.start(); }
4、Mp4VideoUtil
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class Mp4VideoUtil extends VideoUtil { String ffmpeg_path = "D:\\Program Files\\ffmpeg-20180227-fa0c9d6-win64-static\\bin\\ffmpeg.exe" ; String video_path = "D:\\BaiduNetdiskDownload\\test1.avi" ; String mp4_name = "test1.mp4" ; String mp4folder_path = "D:/BaiduNetdiskDownload/Movies/test1/" ; public Mp4VideoUtil (String ffmpeg_path, String video_path, String mp4_name, String mp4folder_path) { super (ffmpeg_path); this .ffmpeg_path = ffmpeg_path; this .video_path = video_path; this .mp4_name = mp4_name; this .mp4folder_path = mp4folder_path; } private void clear_mp4 (String mp4_path) { File mp4File = new File (mp4_path); if (mp4File.exists() && mp4File.isFile()){ mp4File.delete(); } } public String generateMp4 () { clear_mp4(mp4folder_path); List<String> commend = new ArrayList <String>(); commend.add(ffmpeg_path); commend.add("-i" ); commend.add(video_path); commend.add("-c:v" ); commend.add("libx264" ); commend.add("-y" ); commend.add("-s" ); commend.add("1280x720" ); commend.add("-pix_fmt" ); commend.add("yuv420p" ); commend.add("-b:a" ); commend.add("63k" ); commend.add("-b:v" ); commend.add("753k" ); commend.add("-r" ); commend.add("18" ); commend.add(mp4folder_path ); String outstring = null ; try { ProcessBuilder builder = new ProcessBuilder (); builder.command(commend); builder.redirectErrorStream(true ); Process p = builder.start(); outstring = waitFor(p); } catch (Exception ex) { ex.printStackTrace(); } Boolean check_video_time = this .check_video_time(video_path, mp4folder_path); if (!check_video_time){ return outstring; }else { return "success" ; } } public static void main (String[] args) throws IOException { String ffmpeg_path = "D:\\tools\\ffmpeg\\ffmpeg.exe" ; String video_path = "C:\\Users\\Lenovo\\Desktop\\upload\\1b.avi" ; String mp4_name = "nacos01.mp4" ; String mp4_path = "C:\\Users\\Lenovo\\Desktop\\upload\\1b.mp4" ; Mp4VideoUtil videoUtil = new Mp4VideoUtil (ffmpeg_path,video_path,mp4_name,mp4_path); String s = videoUtil.generateMp4(); System.out.println(s); } }
修改main函数中的一些参数以测试
二、XXL-JOB
1、分布式任务调度
对于上面ffmpeg视频转码可以认为是一个任务的执行。那么对于多个任务,如何高效处理呢?
使用多线程:充分利用单机资源
分布式加多线程:在多线程基础上,利用多台计算机。
我们可以先思考一下下面业务场景的解决方案:
每隔24小时执行数据备份任务。
12306网站会根据车次不同,设置几个时间点分批次放票。
某财务系统需要在每天上午10点前结算前一天的账单数据,统计汇总。
商品成功发货后,需要向客户发送短信提醒。
1、简单实现间隔时间执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) { final long timeInterval = 1000 ; Runnable runnable = new Runnable () { public void run () { while (true ) { try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread thread = new Thread (runnable); thread.start(); }
2、Timer方式实现
1 2 3 4 5 6 7 8 9 public static void main (String[] args) { Timer timer = new Timer (); timer.schedule(new TimerTask (){ @Override public void run () { } }, 1000 , 2000 ); }
3、ScheduledExecutor方式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String [] agrs) { ScheduledExecutorService service = Executors.newScheduledThreadPool(10 ); service.scheduleAtFixedRate( new Runnable () { @Override public void run () { System.out.println("todo something" ); } }, 1 , 2 , TimeUnit.SECONDS); }
4、第三方Quartz方式实现
项目地址:https://github.com/quartz-scheduler/quartz
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String [] agrs) throws SchedulerException { SchedulerFactory schedulerFactory = new StdSchedulerFactory (); Scheduler scheduler = schedulerFactory.getScheduler(); JobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class); jobDetailBuilder.withIdentity("jobName" ,"jobGroupName" ); JobDetail jobDetail = jobDetailBuilder.build(); CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity("triggerName" , "triggerGroupName" ) .startNow() .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?" )) .build(); scheduler.scheduleJob(jobDetail,trigger); scheduler.start(); } public class MyJob implements Job { @Override public void execute (JobExecutionContext jobExecutionContext) { System.out.println("todo something" ); } }
什么是分布式任务调度?
就是部署多个分布式实例,在任务调度方面采用分布式任务调度。一个服务部署多个冗余实例。
2、xxlJob
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。
官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job/#《分布式任务调度平台XXL-JOB》
分为调度中心 、任务执行器 、任务
3、安装与导入
1、首先下载XXL-JOB
GitHub:https://github.com/xuxueli/xxl-job 这里使用2.3.1
2、下载后以项目打开
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
3、根据doc官方文档中的快速入门构建
在/xxl-job/doc/db/tables_xxl_job.sql中执行sql脚本,创建数据库表
运行core
配置admin的配置文件
编译源码成为jar包然后运行
也可以直接通过docker镜像搭建
1 2 3 4 5 6 7 8 docker run -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本} /** * 如需自定义 mysql 等配置,可通过 "-e PARAMS" 指定,参数格式 PARAMS="--key=value --key2=value2" ; * 配置项参考文件:/xxl-job/xxl-job-admin/src/main/resources/application.properties * 如需自定义 JVM内存参数 等配置,可通过 "-e JAVA_OPTS" 指定,参数格式 JAVA_OPTS="-Xmx512m" ; */ docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai" -p 8080:8080 -v /tmp:/data/applogs --name xxl-job-admin -d xuxueli/xxl-job-admin:{指定版本}
通过docker运行后http://192.168.101.65:8088/xxl-job-admin
默认账号密码admin/123456
任务调度中心新增执行器
在任务工程中绑定xxl-job
导入依赖
1 2 3 4 5 <dependency > <groupId > com.xuxueli</groupId > <artifactId > xxl-job-core</artifactId > </dependency >
在nacos下的media-service-dev.yaml下配置xxl-job
1 2 3 4 5 6 7 8 9 10 11 12 xxl: job: admin: addresses: http://192.168.101.65:8088/xxl-job-admin executor: appname: media-process-service address: ip: port: 9999 logpath: /data/applogs/xxl-job/jobhandler logretentiondays: 30 accessToken: default_token
在media-service的config中添加xxl-job config代码(配置完后成为执行器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor () { logger.info(">>>>>>>>>>> xxl-job config init." ); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor (); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } }
配置成功后在对应执行器可以看到连接成功
自定义执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); @XxlJob("demoJobHandler") public void demoJobHandler () throws Exception { XxlJobHelper.log("XXL-JOB, Hello World." ); for (int i = 0 ; i < 5 ; i++) { System.out.println(i); XxlJobHelper.log("beat at:" + i); TimeUnit.SECONDS.sleep(2 ); } } }
添加下任务
bean模式:在项目工程中编写执行器的任务代码。
GLUE模式:将任务代码编写在调度中心。
打开任务位于的项目,可以看到任务按照时间间隔执行
4、分片广播
前面学习了xxl-job的基础使用。接下来展示如何进行分布式任务处理
我们会启动多个执行器组成一个集群,去执行任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 高级配置: - 路由策略:当执行器集群部署时,提供丰富的路由策略,包括; FIRST (第一个):固定选择第一个机器; LAST (最后一个):固定选择最后一个机器; ROUND(轮询):; RANDOM(随机):随机选择在线的机器; CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举; LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举; FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务; - 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。 - 调度过期策略: - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间; - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间; - 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略; 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行; 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败; 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务; - 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务; - 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
1、添加分片任务
2、分配任务模版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @XxlJob("shardingJobHandler") public void shardingJobHandler () throws Exception { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); logger.info("分片参数:当前分片序号 = {}, 总分片数 = {}" , shardIndex, shardTotal); for (int i = 0 ; i < shardTotal; i++) { if (i == shardIndex) { logger.info("第 {} 片, 命中分片开始处理" , i); } else { logger.info("第 {} 片, 忽略" , i); } } }
3、启动多个实例。注意:实例端口号,与连接到任务调度中心的端口号都不能冲突
1 -Dserver.port=8082 -Dxxl.job.executor.port=9982
总分片数: 启动实例数量
我们可以根据当前分片序号,分配给它需要处理的任务。当某个实例执行器挂掉了,会动态调节总分片数减一
5、实例
配置好执行器、分片广播任务。
根据不同的任务参数取出任务
1 2 @Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}") List<MediaProcess> selectListByShardIndex (@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count) ;
处理对应的视频转码,更新URL。更新一系列数据库数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 @Component @Slf4j public class VideoTask { @Autowired MediaFileService mediaFileService; @Autowired MediaFileProcessService mediaFileProcessService; @Value("${videoprocess.ffmpegpath}") private String ffmpegpath; @XxlJob("videoJobHandler") public void shardingJobHandler () throws Exception { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); int processors = Runtime.getRuntime().availableProcessors(); List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors); int size = mediaProcessList.size(); log.debug("取到的视频处理任务数:" ,size); if (size<=0 ){ return ; } ExecutorService executorService = Executors.newFixedThreadPool(size); CountDownLatch countDownLatch = new CountDownLatch (size); mediaProcessList.forEach(mediaProcess -> { executorService.execute(()->{ try { Long taskId = mediaProcess.getId(); boolean b = mediaFileProcessService.startTask(taskId); if (!b){ log.debug("抢占任务失败,任务id:{}" ,taskId); return ; } String bucket = mediaProcess.getBucket(); String fileId = mediaProcess.getFileId(); String objectName = mediaProcess.getFilePath(); File file = mediaFileService.downloadFileFromMinIO(bucket, objectName); if (file == null ){ log.debug("下载视频出错,任务id:{},bucket:{},objectName:{}" ,taskId,bucket,objectName); mediaFileProcessService.saveProcessFinishStatus(taskId,"3" ,fileId,null ,"下载视频到本地失败" ); return ; } String video_path = file.getAbsolutePath(); String mp4_name = fileId+".mp4" ; File tempFile = null ; try { tempFile = File.createTempFile("minio" , ".mp4" ); } catch (IOException e) { log.error("创建临时文件异常:{}" ,e.getMessage()); mediaFileProcessService.saveProcessFinishStatus(taskId,"3" ,fileId,null ,"创建临时文件异常" ); return ; } String mp4_path = tempFile.getAbsolutePath(); Mp4VideoUtil videoUtil = new Mp4VideoUtil (ffmpegpath,video_path,mp4_name,mp4_path); String result = videoUtil.generateMp4(); if (!result.equals("success" )){ log.debug("视频转码失败,原因:{},bucket:{},objectName:{}" ,result,bucket,objectName); mediaFileProcessService.saveProcessFinishStatus(taskId,"3" ,fileId,null ,result); return ; } objectName = objectName.substring(0 , objectName.lastIndexOf("." )); objectName += ".mp4" ; boolean b1 = mediaFileService.addMediaFilesToMinIO(tempFile.getAbsolutePath(), "video/mp4" , bucket, objectName); if (!b1){ log.debug("上传MP4到minio失败,taskId:{}" ,taskId); mediaFileProcessService.saveProcessFinishStatus(taskId,"3" ,fileId,null ,"上传mp4到minio失败" ); return ; } String url = "/" + bucket + "/" + getFilePath(fileId, ".mp4" ); mediaFileProcessService.saveProcessFinishStatus(taskId,"2" ,fileId,url,"保存任务状态成功" ); }finally { countDownLatch.countDown(); } }); }); countDownLatch.await(30 , TimeUnit.MINUTES); } private String getFilePath (String fileMd5,String fileExt) { return fileMd5.substring(0 ,1 ) + "/" + fileMd5.substring(1 ,2 ) + "/" + fileMd5 + "/" +fileMd5 +fileExt; } }
任务补偿机制 : 视频处理过程线程挂掉了,导致视频状态一直处理中。应该设置任务执行期限是处理一个视频的最大时间,比如定为30分钟,通过任务的启动时间去判断任务是否超过执行期限。
达到最大失败次数: 当达到最大次数的时候,可以设计程序提醒人工,进行人工处理,让其自行转码视频。
分块文件清理: 设置一个定时任务。定时清理超时还未成功上传的分块文件。
三、Freemarker
1、模版引擎
1、浏览器请求web服务器
2、服务器渲染页面,渲染的过程就是向jsp页面(模板)内填充数据(模型)。
3、服务器将渲染生成的页面返回给浏览器。
所以模板引擎就是:模板+数据=输出,Jsp页面就是模板,页面中嵌入的jsp标签就是数据,两者相结合输出html网页。
常见模版引擎有: Jsp、Freemarker、Thymeleaf 、Velocity 等
这里介绍Freemarker官方地址:http://freemarker.foofun.cn/
2、使用
1、引入依赖
1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-freemarker</artifactId > </dependency >
2、Nacos添加共用配置freemarker-config-dev.yaml
1 2 3 4 5 6 7 8 9 10 11 spring: freemarker: enabled: true cache: false settings: template_update_delay: 0 suffix: .ftl charset: UTF-8 template-loader-path: classpath:/templates/ resources: add-mappings: false
3、项目拉取共用配置
4、resources下创建templates目录,添加test.ftl模板文件
1 2 3 4 5 6 7 8 9 10 <!DOCTYPE html > <html > <head > <meta charset ="utf-8" > <title > Hello World!</title > </head > <body > Hello ${name}! </body > </html >
编辑controller方法,准备数据
1 2 3 4 5 6 7 8 9 10 11 12 13 @Controller public class FreemarkerController { @GetMapping("/testfreemarker") public ModelAndView test () { ModelAndView modelAndView = new ModelAndView (); modelAndView.addObject("name" ,"小明" ); modelAndView.setViewName("test" ); return modelAndView; } }
5、启动工程,访问对应接口: http://localhost:63040/content/testfreemarker
3、数据填充
1、数据模型
1 2 3 4 5 6 7 8 9 @Data public class CoursePreviewDto { private CourseBaseInfoDto courseBase; private List<TeachplanDto> teachplans; }
2、接口设计
1 2 3 4 5 6 7 8 9 10 11 12 @GetMapping("/coursepreview/{courseId}") public ModelAndView preview (@PathVariable("courseId") Long courseId) { ModelAndView modelAndView = new ModelAndView (); CoursePreviewDto coursePreviewInfo = coursePublishService.getCoursePreviewInfo(courseId); modelAndView.addObject("model" ,coursePreviewInfo); modelAndView.setViewName("course_template" ); return modelAndView; }
3、数据填充规则
http://freemarker.foofun.cn/ref_directives.html
4、额外介绍
这里介绍一下黑马学成在线这个项目静态资源是如何访问
1、本地hosts文件原理
在我们访问一个网址的时候,会优先读取本机hosts文件是否存储了对应的host规则:
1 127.0.0.1 www.51xuecheng.cn 51xuecheng.cn ucenter.51xuecheng.cn teacher.51xuecheng.cn file.51xuecheng.cn
2、nginx的配置文件
这里是nginx的反向代理。当我们访问nginx监听的 http://www.51xuecheng.cn/static/js/querystring/index.js 静态文件时
nginx会帮我们反向代理到真实存储这些静态文件的位置:D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/js/
并替换、补充路径: D:/itcast2022/xc_edu3.0/code_1/xc-ui-pc-static-portal/js/querystring/index.js
达到反向代理: 不了解资源真实地址。
只要配置好,访问nginx配置的网址就会自动反向代理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 http { include mime.types; default_type application/octet-stream; client_max_body_size 100M ; client_body_buffer_size 128k ; sendfile on ; keepalive_timeout 65 ; server { listen 80 ; server_name www.51xuecheng.cn localhost; ssi on ; ssi_silent_errors on ; location / { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/; index index.html index.htm; } location /static/img/ { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/img/; } location /static/css/ { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/css/; } location /static/js/ { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/js/; } location /static/plugins/ { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/plugins/; add_header Access-Control-Allow-Origin http://ucenter.51xuecheng.cn; add_header Access-Control-Allow-Credentials true ; add_header Access-Control-Allow-Methods GET; } location /plugins/ { alias D:/itcast2022/xc_edu3.0 /code_1/xc-ui-pc-static-portal/plugins/; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }
继续介绍学成在线文件服务器如何访问的
1、添加本地host
1 127.0 .0 .1 www.51xuecheng.cn file.51xuecheng.cn
2、nginx配置文件
访问对应网址 http://file.51xuecheng.cn/video/c/7/c7e11b5f1520d4aca1a5d0ef9275ca0c/c7e11b5f1520d4aca1a5d0ef9275ca0c.mp4
自动替换成 http://192.168.101.65:9000/video/c/7/c7e11b5f1520d4aca1a5d0ef9275ca0c/c7e11b5f1520d4aca1a5d0ef9275ca0c.mp4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 upstream fileserver{ server 192.168.101.65:9000 weight=10 ; } server { listen 80 ; server_name file.51xuecheng.cn; ssi on ; ssi_silent_errors on ; location /video { proxy_pass http://fileserver; } location /mediafiles { proxy_pass http://fileserver; } }
如果http://fileserver后有别的路径则会将/video替换,否则不会替换。
例如下面几个等价:
1 2 3 4 5 6 7 8 9 10 11 location /video2 { proxy_pass http://fileserver/video; } location /video { proxy_pass http://fileserver/video; } location /video { proxy_pass http://fileserver; }
其他使用以上介绍的规则即可套用。
5、页面静态化
1、调用的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private void generateCourseHtml (MqMessage mqMessage,long courseId) { Long taskId = mqMessage.getId(); MqMessageService mqMessageService = this .getMqMessageService(); int stageOne = mqMessageService.getStageOne(taskId); if (stageOne > 0 ){ log.debug("课程静态化任务完成,无需处理" ); return ; } File file = coursePublishService.generateCourseHtml(courseId); if (file == null ){ XueChengPlusException.cast("生成的静态页面为空" ); } coursePublishService.uploadCourseHtml(courseId,file); mqMessageService.completedStageOne(taskId); }
2、coursePublishService.generateCourseHtml方法定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public File generateCourseHtml (Long courseId) { Configuration configuration = new Configuration (Configuration.getVersion()); File file = null ; try { String classpath = this .getClass().getResource("/" ).getPath(); configuration.setDirectoryForTemplateLoading(new File (classpath+"/templates/" )); configuration.setDefaultEncoding("utf-8" ); Template template = configuration.getTemplate("course_template.ftl" ); CoursePreviewDto coursePreviewDto = this .getCoursePreviewInfo(courseId); HashMap<String, Object> map = new HashMap <>(); map.put("model" ,coursePreviewDto); String html = FreeMarkerTemplateUtils.processTemplateIntoString(template, map); InputStream inputStream = IOUtils.toInputStream(html, "utf-8" ); file = File.createTempFile("coursepublish" ,".html" ); FileOutputStream fileOutputStream = new FileOutputStream (file); IOUtils.copy(inputStream,fileOutputStream); inputStream.close(); fileOutputStream.close(); }catch (Exception e){ log.error("页面静态化出现问题:课程id:{}" ,courseId,e); e.printStackTrace(); } return file; }
3、upload远程调用媒资服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void uploadCourseHtml (Long courseId, File file) { try { MultipartFile multipartFile = MultipartSupportConfig.getMultipartFile(file); String upload = mediaServiceClient.upload(multipartFile, "course/" +courseId+".html" ); if (upload == null ){ log.debug("远程调用走降级的逻辑,得到上传的结果为空,课程id:{}" ,courseId); XueChengPlusException.cast("上传静态文件中存在异常" ); } }catch ( Exception e){ e.printStackTrace(); XueChengPlusException.cast("上传静态文件中存在异常" ); } }
四、分布式事务
1、CAP理论
1 2 3 4 begin transaction; commit transation;
远程调用由于网络问题会失效,导致分布式事务问题。
CAP :
Consistency : 一致性,不管用户访问哪一个节点,得到的数据都是最新的。
Availability : 可用性,任何时候都能访问到可用的节点。
Partition tolerance :分区容忍性,由于网络错误导致的请求中断,消息丢失,但系统任然对外提供服务
对于分布式系统: 必须满足P,C 、A冲突,只可满足一个,所以是AP or CP(强调一致性、可用性)。
2、BASE理论
在实际开发中,我们的场景一开始符合AP,但是最终会同步满足CP。 虽然AP舍弃了C一致性,但是最终还是达到了一致。也就满足了最终一致性。
BASE :
Basically Available(基本可用):出现问题时保证核心服务可用
Soft state(软状态): 可以存在中间状态,比如打印机打印,提示正在打印,需要时间打印,但是最终是会打印成功的。
Eventually consistent (最终一致性): 退款后资金不会马上到账,而是一段时间后到账,舍弃了强一致性,满足最终一致性
实现CP :使用Seata的AT模式、TCC模式等。
实现AP :使用消息队列、任务调度方式(将课程信息由数据库同步到elasticsearch、MinIO、redis等等中)
五、消息处理SDK
学成在线中使用了自定义的消息处理SDK,达到复用的目的:
消息处理中,通过抽象的形式,执行某一个需要我们自己填充的方法
保证任务幂等性: 任务执行完成后会从消息表删除,如果消息的状态是完成或不存在消息表中则不用执行。
保证任务不重复执行: 任务调度采用分片广播,根据分片参数去获取任务,另外阻塞调度策略为丢弃任务。由于是信息同步类任务,所以重复执行也没关系,不采用抢占任务的方式。
定义数据表多个字段state,每个字段代表一个阶段完成
消息数据库表服务类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public interface MqMessageService extends IService <MqMessage> { public List<MqMessage> getMessageList (int shardIndex, int shardTotal, String messageType,int count) ; public int completed (long id) ; public int completedStageOne (long id) ; public int completedStageTwo (long id) ; public int completedStageThree (long id) ; public int completedStageFour (long id) ; public int getStageOne (long id) ; public int getStageTwo (long id) ; public int getStageThree (long id) ; public int getStageFour (long id) ; }
消息SDK提供消息处理抽象类:
只需要实现这个抽象方法execute(),在里面写我们任务具体逻辑。 执行process方法中会自动执行execute()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Slf4j @Data public abstract class MessageProcessAbstract { @Autowired MqMessageService mqMessageService; public abstract boolean execute (MqMessage mqMessage) ; public void process (int shardIndex, int shardTotal, String messageType,int count,long timeout) { try { List<MqMessage> messageList = mqMessageService.getMessageList(shardIndex, shardTotal,messageType, count); int size = messageList.size(); log.debug("取出待处理消息" +size+"条" ); if (size<=0 ){ return ; } ExecutorService threadPool = Executors.newFixedThreadPool(size); CountDownLatch countDownLatch = new CountDownLatch (size); messageList.forEach(message -> { threadPool.execute(() -> { log.debug("开始任务:{}" ,message); try { boolean result = execute(message); if (result){ log.debug("任务执行成功:{})" ,message); int completed = mqMessageService.completed(message.getId()); if (completed>0 ){ log.debug("任务执行成功:{}" ,message); }else { log.debug("任务执行失败:{}" ,message); } } } catch (Exception e) { e.printStackTrace(); log.debug("任务出现异常:{},任务:{}" ,e.getMessage(),message); } countDownLatch.countDown(); log.debug("结束任务:{}" ,message); }); }); countDownLatch.await(timeout,TimeUnit.SECONDS); System.out.println("结束...." ); } catch (InterruptedException e) { e.printStackTrace(); } } }
1、使用
调用消息服务类添加消息
1 2 3 4 5 6 private void saveCoursePublishMessage (Long courseId) { MqMessage mqMessage = mqMessageService.addMessage("course_publish" , String.valueOf(courseId), null , null ); if (mqMessage==null ){ XueChengPlusException.cast(CommonError.UNKOWN_ERROR); } }
开启任务调度xxljob,并且配置成功后.
任务调度不会识别消息数据表是否有新消息,只会执行process方法。process中会判断是否有未处理完的消息并且分配给不同的节点执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 @Slf4j @Component public class CoursePublishTask extends MessageProcessAbstract { @XxlJob("CoursePublishJobHandler") public void coursePublishJobHandler () throws Exception{ int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); process(shardIndex,shardTotal,"course_publish" ,30 ,60 ); } @Override public boolean execute (MqMessage mqMessage) { Long courseId = Long.valueOf(mqMessage.getBusinessKey1()); generateCourseHtml(mqMessage,courseId); saveCourseIndex(mqMessage,courseId); return true ; } private void generateCourseHtml (MqMessage mqMessage,long courseId) { Long taskId = mqMessage.getId(); MqMessageService mqMessageService = this .getMqMessageService(); int stageOne = mqMessageService.getStageOne(taskId); if (stageOne > 0 ){ log.debug("课程静态化任务完成,无需处理" ); return ; } File file = coursePublishService.generateCourseHtml(courseId); if (file == null ){ XueChengPlusException.cast("生成的静态页面为空" ); } coursePublishService.uploadCourseHtml(courseId,file); mqMessageService.completedStageOne(taskId); } public void saveCourseIndex (MqMessage mqMessage,long courseId) { Long taskId = mqMessage.getId(); MqMessageService mqMessageService = this .getMqMessageService(); int stageTwo = mqMessageService.getStageTwo(taskId); if (stageTwo >0 ){ log.debug("课程信息已写入,无需执行。。。" ); return ; } CoursePublish coursePublish = coursePublishMapper.selectById(courseId); CourseIndex courseIndex = new CourseIndex (); BeanUtils.copyProperties(coursePublish,courseIndex); Boolean add = searchServiceClient.add(courseIndex); if (!add){ XueChengPlusException.cast("远程调用添加索引失败" ); } mqMessageService.completedStageTwo(taskId); } }