万码学说
在求职的道路上,简历是我们展示个人能力、经验和潜力最直观也是最重要的工具之一。1、五大常见错误,千万要小心!(1)一份简历,应聘多样岗位不同岗位对雇员的要求是不一样的。通用的简历,无法突出你与目标岗位的关联性,反而会降低竞争力。要基于每个不同岗位的需求,调整简历内容,突出与该职位相关的技能和经验。 (2)模板滥用,配色缺乏审美高颜值的简历是求职的有力敲门砖,但滥用模板可能会导致配色混乱、排版杂乱,甚至是逻辑混乱。 简历应选择简洁、专业的模板,并保持一致的配色方案。同时,注意选择合适的颜色和排版,以给HR营造专业、清晰的印象。 (3)个人信息,不必过于暴露虽然简历需要提供足够的个人信息,但不要暴露过多不必要的隐私信息,比如:婚姻状况、家庭情况、身份证号等。要注意保护个人隐私,只提供与求职相关的信息,比如:联系方式、教育背景、科研经历、志愿活动和工作经验等。(4)经历杂乱,能力不够聚焦大家可能会有很多个人经历,但呈现在简历中,应该有舍有得、清晰明了、有逻辑地呈现。要避免杂乱地列举经历,基于目标岗位的能力要求,选择恰当的个人经历,并用相应的话术突出在这个经历中你的某几项与目标岗位需求匹配的能力,且需要为每个经历提供简洁的描述,强调你做事的结果和贡献。 (5)篇幅过长,错字病句标点如果简历篇幅过长并且逻辑性不强,可能会导致关键信息被埋没。要突出关键信息,避免在简历中包含冗长的段落和不必要的细节。一般情况,应届毕业生用一整页A4纸,大约写800-1000字即可。 此外,拼写、错字、病句、标点错误,在简历中非常常见,这是非常严重的低级错误,很容易被雇主判断为态度问题。 写完简历之后,最好出声音读一遍,仔细校对。这些小错误,可能会给雇主留下不重视、不专业、不踏实、不仔细、不够关注细节的不好印象。 2、如何能让自己的简历脱颖而出?(1)STAR原则S(Situation)交代事件的背景,为什么会去做这件事?T(Task)自己承担的任务是什么,怎样在事情的背景下明确自己的任务。A(Action)自己都完成了些什么,为了完成任务,做了哪些事,为什么要这么做。R(Result)最终获得了何种成果?行动中收获了什么,有没有完成目标。我们来看一个例子: 大家是不是觉得写到这里就完成了呢?如果希望提高简历通过率,我们还可以继续对内容进行优化,不仅让内容符合STAR原则,还可以让内容更具备条理性。以下是进一步的优化结果: 那么STAR原则只能用在简历经历的描述吗?当然不是!在面试时,面试官可能就某段经历,请应聘者展开讲述。很多同学依然会使用STAR原则来描述,但这样的描述平铺直叙,缺乏吸引力。此时完全可以直奔主题,对个人经历进行先抑后扬的故事化重组。 这个时候STAR原则就可以转为CAR原则(Chagllenge挑战、Action行动、Result成果)。例如:在XX基金公司任职期间,作为技术主要负责人我曾遇到一次重大挑战,当时公司B端微信小程序版本大迭代,不仅需要参与并把控前端开发事务,而且还需要整体对接后端开发人员及产品业务人员(Challenge挑战),于是我们几个部门负责人就一起商量,使用微信小程序开发框架,搭建项目架构,根据设计和产品需求,参与需求分析和功能设计,参与后端开发人员进行接口对接,实现数据的交互和展示(Action行动),最后完善小程序各项功能,提升用户体验,用户满意度比原来提升 12%(Result成果)。我觉得这件事既锻炼了我的团队协作能力,更让我对自己的潜力有十足的信心。(2)利用视觉化的方法A.数据与经历的可视化 在简历中使用数据体现经历,可以更加直观地展示经验和能力。 比如,你运营了一个公众号,就可以写,在校期间,运营什么公众号几个月,发表原创文章多少,原创字数累计多少,浏览量多少,用户新增多少,用户完读率多少,公转私多少,还可以进一步增加同比环比数据。一定要学会挖掘数据化的成果,这不仅可以让HR更快地了解你的成就,还能使你的简历更具备说服力。B. 图像化的处理在设计简历时,合理运用颜色、图标和排版,能够使你的简历更具吸引力。但要注意保持简洁和专业,避免过多的花哨元素。一般是选择一个主色调,并在整个简历中保持一致,以营造统一的视觉效果。整个页面所有色彩元素最多不要超过三种。此外,图标可以用来突出你的技能或特长。 比如,如果你擅长多种编程语言,可以用相应的图标来表示每种语言的熟练程度。这样,HR可以在短时间内快速了解你的技能结构。C. 有效利用模板选择一个干净、简洁但又有个性的模板,能够帮助你更好地展示个人信息。要适度地进行个性化调整,以突出你的独特之处。 注意!不要完全套用模板哦~需要构建自己的简历逻辑,将你的职业优势关键信息放置在显眼位置,并按照你自己的思路排布简历,不要简单地把自己的信息直接安到某个模板里。排列逻辑可以是时间轴倒叙,最近的工作、实习放在最前面,依次往后排,方便雇主尽快清晰的看到你现在的经历与匹配度;可以是按类别,比如按照实习、校园工作、科研经历、志愿活动这样排列;也可以是按能力排列,比如按照专业能力、综合能力这样排列。这样做,HR在浏览你的简历时,可以快速获取到你想要传达的最重要的信息。以上这些你都学会了吗?赶快按照上面的步骤和方法修改你的简历吧!如果你对优化简历仍有疑问,可以寻找万码小助手的帮助~ 祝大家在求职过程中取得成功!
绝对勇士
1 作用不同1.2 映射?展平?map 只执行映射flatMap 既执行映射,也执行展平什么叫只能执行映射?我理解是把一个数据执行一个方法,转换成另外一个数据。举个例子:mapper 函数把输入的字符串转换成大写。map()方法执行这个 mapper 函数。Function<String, String > mapper = String::toUpperCase; Flux<String> inFlux = Flux.just("hello", ".", "com"); Flux<String> outFlux = inFlux.map(mapper); // reactor 测试包提供的测试方法 StepVerifier.create(outFlux) .expectNext("HELLO", ".", "COM") .expectComplete() .verify();什么叫展平?mapper 函数把字符串转成大写,然后分割成一个一个字符。Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split("")); Flux<String> inFlux = Flux.just("hello", ".", "com"); // 这里只能使用 flatMap,因为参数是 Function<T, Publisher<V>> 形式 Flux<String> outFlux = inFlux.flatMap(mapper); List<String> output = new ArrayList<>(); outFlux.subscribe(output::add); // 输出 [H, E, L, L, O, ., C, O, M] System.out.println(output); 请注意,由于来自不同来源的数据项交错,它们在输出中的顺序可能与我们在输入中看到的不同。1.2 同步?异步?map 是同步的,非阻塞的,1-1(1个输入对应1个输出) 对象转换的;flatMap 是异步的,非阻塞的,1-N(1个输入对应任意个输出) 对象转换的;当流被订阅(subscribe)之后,映射器对输入流中的元素执行必要的转换(执行上述 mapper 操作)。这些元素中的每一个都可以转换为多个数据项,然后用于创建新的流。一旦一个由 Publisher 实例表示的新流准备就绪,flatMap 就会急切地订阅。operator 不会等待发布者完成,会继续下一个流的处理,这意味着订阅是非阻塞的。同时也说明 flatMap() 是异步的。由于管道同时处理所有派生流,因此它们的数据项可能随时进入。结果就是原有的顺序丢失。如果项目的顺序很重要,请考虑改用 flatMapSequential 运算符。2 方法签名的区别很明显2.1 方法签名map 参数是 Function<T, U> ,返回是 Flux<U>flatMap 参数是 Function<T, Publisher<V>> 返回是 Flux<V>举例:这里只能使用 flatMap,因为参数是 Function<T, Publisher<V>> 形式Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split("")); Flux<String> inFlux = Flux.just("hello", ".", "com"); // 这里只能使用 flatMap,因为参数是 Function<T, Publisher<V>> 形式 Flux<String> outFlux = inFlux.flatMap(mapper);这里只能使用 map,因为参数是 Function<String, String >Function<String, String > mapper = String::toUpperCase; Flux<String> inFlux = Flux.just("hello", ".", "com"); // 这里只能使用 map,因为参数是 Function<String, String > Flux<String> outFlux = inFlux.map(mapper);此外,看方法签名,可以看出,可以给 map() 传参 Function<T, Publisher<V>>,按照方法签名,它会返回Flux<Publisher<V>>,但它不知道如何处理 Publishers。比如下面的代码:编译不会报错,但是不知道后续怎么处理。Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split("")); Flux<String> inFlux = Flux.just("hello", ".", "com"); Flux<Publisher<String>> map = inFlux.map(mapper);下面的例子来源于 stackoverflow:使用 map 方法会产生 Mono<Mono<T>>,而使用 flatMap 会产生 Mono<T>。使用 map() 就是给 map 传参了Function<T, Publisher<V>>,它返回的也是 Mono<Publisher<V>>。// Signature of the HttpClient.get method Mono<JsonObject> get(String url); // The two urls to call String firstUserUrl = "my-api/first-user"; String userDetailsUrl = "my-api/users/details/"; // needs the id at the end // Example with map Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl). map(user -> HttpClient.get(userDetailsUrl + user.getId())); // This results with a Mono<Mono<...>> because HttpClient.get(...) // returns a Mono // Same example with flatMap Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl). flatMap(user -> HttpClient.get(userDetailsUrl + user.getId())); // Now the result has the type we expected2.3 返回map() 返回一个值的流flatMap() 返回一个流值的流Flux<String> stringFlux = Flux.just("hello word!"); Function<String, Publisher<String>> mapper = s -> Flux.just(s.toUpperCase().split("")); // 使用 flatMap() 返回的是 FluxFlatMap. Flux<String> flatMapFlux = stringFlux.flatMap(mapper); // 使用 map() 返回的是 FluxMapFuseable Flux<String> mapFlux = stringFlux.map(s -> s);flatMapFlux 类型是 FluxFlatMap;也就是说,使用 flatMap() 返回的是 FluxFlatMap.mapFlux 类型是 FluxMapFuseable。也就是说,使用 map() 返回的是 FluxMapFuseableFluxMapFuseable 是什么?FluxFlatMap 是什么?FluxFlatMap 和 FluxMapFuseable 是什么区别?各位看官可以一起讨论!参考链接:baeldung: Project Reactor: map() vs flatMap()csdn: map VS flatmapgeeksforgeeks: Difference Between map() And flatMap() In Java StreamstackOverFlow: map vs flatMap in reactor
绝对勇士
Reactor 整合 Resilence4j1 引入 pom 包<dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-all</artifactId> </dependency> <dependency> <groupId>io.github.resilience4j</groupId> <artifactId>resilience4j-spring-boot2</artifactId> </dependency>2 配置说明2.1 限流 ratelimiter两个限流配置:backendA 1s 中最多允许 10 次请求;backendB 每 500ms 最多允许 6 次请求。resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s配置属性默认值描述timeoutDuration5【s】一个线程等待许可的默认等待时间limitRefreshPeriod500【ns】限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值limitForPeriod50一个 limitRefreshPeriod (周期)允许访问的数量(许可数量)2.2 重试 retry注意指定需要重试的异常,不是所有的异常重试都有效。比如 DB 相关校验异常,如唯一约束等,重试也不会成功的。重试配置:resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException配置属性默认值描述maxAttempts3最大重试次数(包括第一次)waitDuration500【ms】两次重试之间的等待间隔intervalFunctionnumOfAttempts -> waitDuration修改失败后等待间隔的函数。默认情况下,等待时间是个常量。retryOnResultPredicateresult->false配置一个判断结果是否应该重试的 predicate 函数。如果结果应该重试,Predicate 必须返回 true,否则它必须返回 false。retryExceptionPredicatethrowable -> true和 retryOnResultPredicate 类似,如果要重试,Predicate 必须返回true,否则返回 false。retryExceptions空需要重试的异常类型列表ignoreExceptions空不需要重试的异常类型列表failAfterMaxAttemptsfalse当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值intervalBiFunction(numOfAttempts, Either<throwable, result>) -> waitDuration根据 maxAttempts 和结果或异常修改失败后等待间隔时间的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。2.3 超时 TimeLimiter超时配置:resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false超时配置比较简单,主要是配置 timeoutDuration 也就是超时的时间。cancelRunningFuture 的意思是:是否应该在运行的 Future 调用 cancel 去掉调用。2.4 断路器 circuitbreaker断路器有几种状态:关闭、打开、半开。注意:打开,意味着不能访问,会迅速失败。CircuitBreaker 使用滑动窗口来存储和汇总调用结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最后 N 秒的调用结果。断路器配置:resilience4j.circuitbreaker: instances: backendA: // 健康指标参数,非断路器属性 registerHealthIndicator: true slidingWindowSize: 100 配置属性默认值描述slidingWindowSize100记录断路器关闭状态下(可以访问的情况下)的调用的滑动窗口大小failureRateThreshold50(百分比)当失败比例超过 failureRateThreshold 的时候,断路器会打开,并开始短路呼叫slowCallDurationThreshold60000【ms】请求被定义为慢请求的阈值slowCallRateThreshold100(百分比)慢请求百分比大于等于该值时,打开断路器开关permittedNumberOfCalls10半开状态下允许通过的请求数maxWaitDurationInHalfOpenState0配置最大等待持续时间,该持续时间控制断路器在切换到打开之前可以保持在半开状态的最长时间。值 0 表示断路器将在 HalfOpen 状态下无限等待,直到所有允许的调用都已完成。2.5 壁仓 bulkheadresilience4j 提供了两种实现壁仓的方法:SemaphoreBulkhead 使用 Semaphore 实现FixedThreadPoolBulkhead 使用有界队列和固定线程池实现resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 12.5.1 SemaphoreBulkhead配置属性默认值描述maxConcurrentCalls25允许的并发执行的数量maxWaitDuration0尝试进入饱和隔板时线程应被阻止的最长时间2.5.2 FixedThreadPoolBulkhead配置属性默认值描述maxThreadPoolSizeRuntime.getRuntime().availableProcessors()线程池最大线程个数coreThreadPoolSizeRuntime.getRuntime().availableProcessors()-1线程池核心线程个数queueCapacity100线程池队列容量keepAliveDuration20【ms】线程数超过核心线程数之后,空余线程在终止之前等待的最长时间3 使用3.1 配置在 application.yml 文件中添加以下 resilience4j 配置:resilience4j.circuitbreaker: instances: backendA: registerHealthIndicator: true slidingWindowSize: 100 resilience4j.retry: instances: backendA: maxAttempts: 3 waitDuration: 10s enableExponentialBackoff: true exponentialBackoffMultiplier: 2 retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException backendB: maxAttempts: 3 waitDuration: 10s retryExceptions: - org.springframework.web.client.HttpServerErrorException - java.io.IOException resilience4j.bulkhead: instances: backendA: maxConcurrentCalls: 10 backendB: maxWaitDuration: 10ms maxConcurrentCalls: 20 resilience4j.thread-pool-bulkhead: instances: backendC: maxThreadPoolSize: 1 coreThreadPoolSize: 1 queueCapacity: 1 resilience4j.ratelimiter: instances: backendA: limitForPeriod: 10 limitRefreshPeriod: 1s timeoutDuration: 10ms registerHealthIndicator: true eventConsumerBufferSize: 100 backendB: limitForPeriod: 6 limitRefreshPeriod: 500ms timeoutDuration: 3s resilience4j.timelimiter: instances: backendA: timeoutDuration: 2s cancelRunningFuture: true backendB: timeoutDuration: 1s cancelRunningFuture: false3.2 使用注解实现直接在需要限流的方法上增加注解@RateLimiter 实现限流;增加注解@Retry 实现重试;增加注解 @CircuitBreaker 熔断;增加注解 @Bulkhead 实现壁仓。name 属性中分别填写限流器、重试、熔断、壁仓组件的名字。@Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA") @Retry(name = "backendA") @RateLimiter(name = "backendA") public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } @Bulkhead(name = "backendA") @CircuitBreaker(name = "backendA")//最多支持10个并发量 @Retry(name = "backendA")//使用 backendA 重试器,如果抛出 IOException 会重试三次。 @RateLimiter(name = "backendA")// 限流 10 Qps public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); } 注意:以上所有组件,都支持自定义。
摸鱼校尉
Webshell简介Webshell通常指以JSP、ASP、 PHP等网页脚本文件形式存在的一种服务器可执行文件,一般带有文件操作、命令执行功能,是一种网页后门。攻击者在入侵网站后,通常会将Webshell后门文件与网站服务器Web目录下正常的网页文件混在一起,使用浏览器或专用客户端进行连接,从而得到一个服务器操作环境,以达到控制网站服务器的目的。Webshell检测手段基于流量的Webshell检测基于流量的Webshel检测方便部署,我们可通过流量镜像直接分析原始信息。基于payload的行为分析,我们不仅可对已知的Webshell进行检测,还可识别出未知的、伪装性强的Webshell,对Webshell的访问特征 (IP/UA/Cookie) 、payload特征、 path特征、 时间特征等进行关联分析,以时间为索引,可还原攻击事件。基于文件的Webshell检测我们通过检测文件是否加密(混淆处理) ,创建Webshell样本hash库,可对比分析可疑文件。对文件的创建时间、修改时间、文件权限等进行检测,以确认是否为Webshell。基于日志的Webshell检测对常见的多种日志进行分析,可帮助我们有效识别Webshell的上传行为等。通过综合分析,可回溯整个攻击过程。Webshell应急响应指南如何判断被植入了Webshell?网页被篡改,或在网站中发现非管理员设置的内容;出现攻击者恶意篡改网页或网页被植入暗链的现象;安全设备报警,或被上级部门通报遭遇Webshell等。一. Webshell排查利用Webshell扫描工具(如D盾)对应用部署目录进行扫描,如网站D: \WWW\目录,或者将当前网站目录文件与此前备份文件进行比对,查看是否存在新增的不一致内容,确定是否包含Webshell相关信息, 并确定Webshel位置及创建时间。然后利用文本文件打开,进一步分析发现可疑内容。在Linux系统中也可以用命令://搜索目录下适配当前应用的网页文件,查看内容是否有Webshell特征 find ./ type f -name "*.jsp" | xargs grep "exec(" find ./ type f -name "*.php" | xargs grep "eval(" find ./ type f -name "*.asp" | xargs grep "execute(" find ./ type f -name "*.aspx" | xargs grep "eval(" ] //对于免杀Webshell,可以查看是否使用编码 find ./ type f -name "*.php" | xargs grep "base64_decode" 二. 确定入侵时间根据异常现象发生时间,结合网站目录中Webshell文件的创建时间,可大致定位事件发生的时间段。以便后续依据此时间进行溯源分析、追踪攻击者的活动路径。三. Web日志分析需要对Web日志进行分析,以查找攻击路径及失陷原因,常见Web中间件默认地址WindowsApache apache\logs\error.log apache\logs\access.log IIS C:\inetpub\logs\LogFiles C:\WINDOWS\system32\LogFiles Tomcat tomcat\access_logLinuxApache /etc/httpd/logs/access_log /var/log/httpd/access_log Nginx /usr/local/nginx/logs在Linux日志排查时,为方便日志检索及溯源分析,列举了常用日志检索命令定位具体的IP地址或文件名find . access_log | grep xargs ip find . access_log | grep xargs filename查看页面访问前10的IP地址cat access.log | cut -f1 -d " " | sort | uniq -c | sort -k 1 -r | head -10查看页面访问前10的URL地址cat access.log | cut -f4 -d " " | sort | uniq -c | sort -k 1 -r | head -10四. 漏洞分析通过日志分析发现的问题,针对攻击者的活动路径,可排查网站中存在的漏洞,并进行分许五. 漏洞复现复现攻击者的攻击路径六. 清除Webshell并修复漏洞处置时先断网,清理发现的 Webshell如果网站被挂黑链或者被篡改首页,那么应删除篡改内容,同时务必审计源码,保证源码中不存在恶意添加的内容在系统排查后,及时清理系统中隐藏的后门及攻击者操作的内容,存在 rootkit 后门,则建议重装系统对排查过程中发现的漏利用点进行修补,必要时可以做黑盒渗透测试,全面发现应用漏洞待上述操作处置完成,重新恢复网站运行七. Webshell防御方法配置必要的防火墙,并开启防火墙策略,防止暴露不必要的服务为攻击者提供利用条件对服务器进行安全加固,例如,关闭远程桌面功能、定期更换密码、禁止使用最高权限用户运行程序、使用 HTTPS 加密协议等加强权限管理,对敏感目录进行权限设置,限制上传目录的脚本执行权限,不允许配置执行权限安装 Webshell 检测工具,根据检测结果对已发现的可疑 Webshell 痕立即隔离查杀,并排查漏洞排查程序存在的漏洞,并及时修补漏洞时常备份数据库等重要文件防止糟糕的事情发生重装系统需要保持日常维护,并注意服务器中是否有来历不明的可执行脚本文件8.采用白名单机制上传文件,不在白名单内的一律禁止上传,上传目录权限遵循最小权限原则
绝对勇士
Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。1 创建项目使用 start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC然后导入 Reactor 包<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency>2 集成 H2 数据库application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。server.port=8081 ################ H2 数据库 基础配置 ############## spring.datasource.driverClassName=org.h2.Driver spring.datasource.url=jdbc:h2:~/user spring.datasource.username=sa spring.datasource.password= spring.jpa.database=h2 spring.jpa.hibernate.ddl-auto=update spring.h2.console.path=/h2-console spring.h2.console.enable=true3 创建测试类3.1 user 实体建立简单数据操作实体 User。import lombok.Data; import lombok.NoArgsConstructor; import javax.persistence.*; /** * @Author: prepared * @Date: 2022/8/29 21:40 */ @Data @NoArgsConstructor @Table(name = "t_user") @Entity public class User { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String userName; private int age; private String sex; public User(String userName, int age, String sex) { this.userName = userName; this.age = age; this.sex = sex; } } 3.2 UserRepository数据模型层使用 JPA 框架。import com.prepared.user.domain.User; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Repository public interface UserRepository extends JpaRepository<User, Long> { } 3.3 UserServiceservice 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。doOnError 监控异常情况;doFinally 监控整体执行情况,如:耗时、调用量监控等。import com.prepared.user.dao.UserRepository; import com.prepared.user.domain.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:45 */ @Service public class UserService { private Logger logger = LoggerFactory.getLogger(UserService.class); @Resource private UserRepository userRepository; public Mono<Boolean> save(User user) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.save(user) != null; }) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("save.user.error, user={}, e", user, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime); }); } public Mono<User> findById(Long id) { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.getReferenceById(id); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("findById.user.error, id={}, e", id, e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime); }); } public Mono<List<User>> list() { long startTime = System.currentTimeMillis(); return Mono.fromSupplier(() -> { return userRepository.findAll(); }).doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } public Flux<User> listFlux() { long startTime = System.currentTimeMillis(); return Flux.fromIterable(userRepository.findAll()) .doOnError(e -> { // 打印异常日志&增加监控(自行处理) logger.error("list.user.error, e", e); }) .doFinally(e -> { // 耗时 & 整体健康 logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime); }); } } 3.4 UserControllercontroller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。返回List可以使用Mono<List<User>> ,也可以使用 Flux<User>。Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素Flux<T> 是一个标准的 Publisher<T>,表示为发出 0 到 N 个元素的异步序列import com.prepared.user.domain.User; import com.prepared.user.service.UserService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; /** * @Author: prepared * @Date: 2022/8/29 21:47 */ @RestController public class UserController { @Resource private UserService userService; @RequestMapping("/add") public Mono<Boolean> add() { User user = new User("xiaoming", 10, "F"); return userService.save(user) ; } @RequestMapping("/list") public Mono<List<User>> list() { return userService.list(); } } @RequestMapping("/listFlux") public Flux<User> listFlux() { return userService.listFlux(); }3.5 SpringReactorApplication 添加注解支持Application 启动类添加注解 @EnableJpaRepositoriesimport org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; /** * Hello world! */ @SpringBootApplication @EnableJpaRepositories public class SpringReactorApplication { public static void main(String[] args) { SpringApplication.run(SpringReactorApplication.class, args); } }测试启动项目,访问 localhost:8081/add,正常返回 true。查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。后台日志:2022-09-05 20:13:17.385 INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService : list.user.time=181, 执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。总结响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?Future 的 get() 方法;Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度。下一篇,讲解如何将熔断、限流框架 resilience4j 整合到项目中,敬请期待。
绝对勇士
1 reactor 出现的背景、初衷和要达到什么样的目标Reactor 项目始于 2012 年。 经过长时间的内部孵化,于 2013 年发布 Reactor 1.x 版本。 Reactor 1 在各种架构下都能成功部署,包括开源的(如 Meltdown)和商业的(如 Pivotal RTI)。2014年,通过与一些新兴的响应式数据流规范合作,重新设计并于 2015 年 4 月发布 Reactor 2.0 版本。1.1 阻塞浪费资源互联网企业基本上都有着大量的用户,即使当代硬件的性能已经提升了很多,但是性能问题一直是互联网企业不能忽略的一个问题。通常有两种方式来提升应用的性能:使用更多的线程和硬件资源达到并行化。这也是很多企业采用的方式;在当前使用的资源上寻求更高效的处理。这在全球经济下行的背景下,是一种成本更低的方式;1.2 异步能拯救一切嘛?通过编写异步非阻塞的代码,可以将执行切换到使用了相同底层资源的另一活动任务上,然后在异步完成之后返回到当前任务。提升资源利用率。java 提供了两种编写异步(异步不一定非阻塞)代码的方式。Callbacks:不立即返回对象,但是提供了一个 callback 参数,当结果可返回时调用。Future:这也是现在大部分程序员在使用的方式。异步方法会立即返回一个 Future。Future 对象对获取该值进行了包装,这个对象可以一直轮询知道返回(除非设置了超时时间)。例如,ExecutorService 使用 Future 对象执行 Callable 任务。这些技术都有自己的问题:callback 不好组合,编写有难度,且很容易导致代码难以阅读和维护。Future 比callback好很多,但是也有自己的问题。调用 get() 方法会阻塞;缺乏对多值和高级错误处理的支持。1.3 从命令式到响应式作为响应式编程方向上的第一步,Microsoft在.NET生态中创建了响应式(Rx)扩展库。然后RxJava实现了JVM上的响应式编程。随着时间的推移,通过Reactive Streams的努力,一套基于JVM为响应式库定义接口与交互规则的标准规范Reactive Streams 出现了。其接口已经集成到了Java9中的 Flow 类下。响应式旨在解决上述 JVM 提供的异步方式的缺点,同时关注了其他一些方面:组合型和易读性数据作为 流 操作,有着丰富的操作符在订阅之前什么都不会发生(有什么优点?)背压,消费者可以向生产者发送信号表示发布速率太快与并发无关的高阶抽象reactor 是响应式编程的一种实现。现代应用程序需要处理大量并发请求并处理大量数据。标准的阻塞代码不再足以满足这些要求。反应式设计模式是一种基于事件的架构方法,用于异步处理来自单个或多个服务处理程序的大量并发服务请求。Project Reactor 基于这种模式,并有一个明确而雄心勃勃的目标,即在 JVM 上构建非阻塞、反应式应用程序。2 reactor 优势和劣势分别是什么优势异步非阻塞代码可读性高背压 解决消息的消费可能比生产慢。劣势对于非响应式 java 开发者来说,学习曲线陡峭。debug 难度高3 reactor 的适用场景创建事件驱动程序;亚马逊等大型在线购物平台的通知服务为银行业提供庞大的交易处理服务股票价格同时变动的股票交易业务4 reactor 组成部分和关键节点4.1 Mono一种生成数据流的方式。包含0-1个结果的异步序列。Mono.just(1);4.2 Flux另一种生成数据流的方式。包含0-N个结果的异步序列。Flux.just(1, 2, 3, 4)5 底层原理与关键实现生产者-消费者模式?迭代模式?6 其他竞品技术lxdd.gitbook.io/spring-webf…Spring WebfluxRxJavaSpring WebFlux (project-reactor) 和 RxJava2+ 都是响应式流的实现。Spring 正在使用 project-reactor,因此它得到了更多的支持、广告和更大的社区,所以用它的人比较多。6.1 Spring WebfluxSpring Webflux 是一个使用响应式库创建 web 服务的框架。它的主要目标是确保低资源使用(即线程数量少)的高可伸缩性。在底层,它使用 Project Reactor,但是,你也可以将它与 RxJava (或任何其他的响应流实现)一起使用,它甚至可以与 Kotlin 协程一起工作。换句话说, Reactor 是一个基础响应式包,Spring WebFlux 是一个框架,这个框架默认使用 Reactor,但是可以使用 RxJava,也可以使用 Kotlin 等其他响应式包。Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版中添加的。它是完全非阻塞的,支持 Reactive Streams 背压,并且可以在 Netty、Undertow 和 Servlet 3.1+ 容器等服务器上运行。Hello World 级示例:blog.csdn.net/get_set/art…6.2 RxJava2ReactiveX 结合了观察者模式、迭代器模式和函数式编程的最佳思想。它扩展了观察器模式,以支持数据序列和/或事件,并添加了操作符,允许您以声明的方式将序列组合在一起,同时抽象出诸如低级线程、同步、线程安全、并发数据结构和非阻塞I/O等问题。一般来说,RxJava 支持基于 JDK8- 的项目,project Reactor 支持 JDK8 +。但是对于初学者来说,你可以先学习 RxJava。Project Reactor 可以弥补 RxJava 的缺点,更适合后端开发。RxJava 有太多的问题,如果你不能很好地使用它,可能会导致内存溢出。但最后,如果你想很好地使用 Spring 5.2+,你需要学习 RxJava->Reactor->NIO->Netty->Reactor Netty。6.3 Reactor VS RxJavaRxJava 和 Reactor 是一些非常著名的库,用于与任何应用程序的后端相关的一些开发。Rxjava 支持的项目大多与 JDK8 相关,而 Reactor 则与所有与 JDK8 + 相关的项目相关。RxJava产生了许多可能导致内存相关问题的问题,但是当与 spring 5.2+ 一起使用时,它会变得非常好。reactor 通常被称为反应式编程范式,它主要涉及用于操作的反应式流 API,并使整个 API 流活动。www.educba.com/rxjava-vs-r…1、github地址:github.com/reactor/rea…2、官方文档:easywheelsoft.github.io/reactor-cor…3、segmentfault.com/a/119000001…4、www.infoq.com/articles/re…5、Spring Webflux : www.baeldung.com/spring-webf…6、Java Spring WebFlux vs RxJava:stackoverflow.com/questions/5…
摸鱼校尉
系统基本信息排查Windows系统排查命令行输入 msinfo32 ,就会打开系统信息窗口,可以显示本地计算机的硬件资源、组件、软件环境、正在运行的任务、服务、系统驱动程序、加载模块、启动程序等。重点关注以下几个位置即可Linux系统排查CPU信息命令行输入 lscpu 命令,查看CPU相关信息操作系统信息命令行输入 uname -a ,查看当前操作系统信息命令行速录 cat /proc/version 命令,查看Linux版本载入模块排查命令行速录 lsmod 命令,查看Linux已载入模块信息用户排查攻击者会采用的方法主要有如下几种:直接建立一个新的账户(有时是为了混淆视听,账户名称与系统常用名称相似)激活一个系统中的默认账户,但这个账户是不经常使用的建立一个隐藏账户 (在 Windows 系统中,一般在账户名称最后加$)Windows系统用户排查关于Windows系统的用户,用户组管理可以参考这篇文章:应急响应之系统排查方法_世界尽头与你的博客排查所有账户黑客创建的某些隐藏账户通过dos指令无法发现, 但是当我们查看注册表时就可以发现通过注册表检查是否存在账户名为“xxx$”或修改注册表创建的隐藏账户,再检查是否存在可疑账户,并进行禁用注册表路径:给SAM目录添加当前用户可读写属性HKEY_LOCAL_MACHINE\SAM\SAM\Domains\Account\Users\Names 同时,在此项下导出所有以 00000 开头的项,将所有导出的项与 000001F4 (该项对应Administrator用户)导出内容做比较,若其中的 F 值相同,则表示可能为克隆账户命令行输入 wmic useraccount get name,SID 查看系统中的用户信息Linux用户排查命令行输入 cat /etc/passwd 查看所有用户信息用户名:密码:用户ID:组ID:用户说明:家目录:登陆之后shell最后显示的 /bin/bash 表示该用户可登录 ; sbin/nologin 不可登录root账户排查输入命令 awk -F: '{if($3==0) print $1}' /etc/passwd 可查询可登录账户 UID 为0的账户,root是 UID为0的可登录账户,如果出现其他为 0 的账户,就要重点排查查看所有可登录账户 命令行输入 cat /etc/passwd | grep '/bin/bash' 或者 cat /etc/passwd | grep '/bin/zsh'查看用户错误的登录信息命令行输入 lastb 可查看显示用户错误的登录列表,包括错误的登录方法、IP 地址、时间等查看所有用户最后登录信息命令行输入 lastlog 排查空口令账户命令行输入 awk -F: 'length($2)==0 {print $1}' /etc/shadow如果有用户是空口令就会显示出来启动项排查启动项是系统开机时在前台或者后台运行的程序,攻击者有可能通过启动项使用病毒后门等实现持久化控制。Windows系统启动项排查任务管理器命令行输入 msconfig注册表排查很多病毒木马通过注册表实现持久化驻留开机启动项排查:命令行输入 regedit 即可注册表中重点关注以下三个:HKEY_CURRENT_USER\software\micorsoft\windows\currentversion\run HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Run HKEY_LOCAL_MACHINE\Software\Microsoft\Windows\CurrentVersion\Runonce查看是否存在命名异常的启动项目,是则取消勾选命名异常的启动项目,并到命令中显示的路径删除文件。Linux系统启动项排查命令行输入 ls -alt /etc/init.d计划任务排查Windows系统计划任务排查计划任务程序库命令行输入 taskschd.msc 可以查看任务的名称,状态,触发器等信息Powershell命令查看Powershell命令行输入 Get-ScheduledTask 可以查看计划任务的路径,名称,状态Powershell命令行输入 schtasksLinux计划任务排查查看当前的计划任务 crontab -l查询到一个挖矿恶意程序的任务计划设置,其会每隔 12 分钟远程下载恶意网站上的 crontab.sh 脚本文件日志排查Windows系统日志排查日志位置在Windows系统中,日志文件包括:系统日志、安全性日志及应用程序日志,其位置如下。在Windows 2000专业版/Windows XP/Windows Server 2003系统中:系统日志的位置为: C:\WINDOWS\System32\config\SysEvent.evt 安全性日志的位置为: C:\WINDOWS\System32\config\SecEvent.evt 应用程序日志的位置为: C:\WINNT\System32\config\AppEvent.evt在Windows Vista/Windows7/Windows8/Windows10/Windows Server2008及以上版本系统中:系统日志的位置为: %SystemRoot%\System32\Winevt\Logs\System.evtx 安全性日志的位置为: %SystemRoot%\System32\Winevt\Logs\Security.evtx 应用程序日志的位置为: %SystemRoot%\System32\Winevt\Logs\Application.evtx系统日志系统日志主要是指 Windows 系统中的各个组件在运行中产生的各种事件。这些事件一般可以分为:系统中各种驱动程序Q在运行中出现的重大问题、操作系统的多种组件在运行中出现的重大问题及应用软件在运行中出现的重大问题等。这些重大问题主要包括重要教据的丢失、错误,以及系统产生的崩溃行为等例如:事件ID=10016安全性日志安全性日志主要记录了各种与安全相关的事件。构成该日志的内容主要包括:各种登录与退出系统的成功或不成功的信息,对系统中各种重要资源进行的各种操作,如对系统文件进行的创建、删除、更改等操作应用程序日志应用程序日志主要记录各种应用程序所产生的各类事件。例如,系统中 SQL Server 数据库程序在受到暴力破解攻击时,日志中会有相关记录 日志常用ID登录相关事件ID日志筛选器分析日志第三方日志分析工具:FullEventLogView Event Log ExplorerLinux系统日志排查日志位置 /var/log可以使用 cat 命令查看 /var/log/wtmp:记录登录进入、退出、数据交换、关机和重启,即 last。 /var/log/cron:记录与定时任务相关的日志信息。 /var/log/messages:记录系统启动后的信息和错误日志。 /var/log/apache2/access.log:记录 Apache 的访问日志。 /var/log/auth.log:记录系统授权信息,包括用户登录和使用的权限机制等。 /var/log/userlog:记录所有等级用户信息的日志。 /var/log/xferlog(vsftpd.log):记录 Linux FTP 日志。 /var/log/lastlog:记录登录的用户,可以使用命令 lastlog 查看。 /var/log/secure:记录大多数应用输入的账号与密码,以及登录成功与否。 /var/log/faillog:记录登录系统不成功的账号信息。其它应用程序位置IIS日志位置%SystemDrive%\inetpub\logs\LogFiles %SystemRoot%\System32\LogFiles\W3SVC1 %SystemDrive%\inetpub\logs\LogFiles\W3SVC1 %SystemDrive%\Windows\System32\LogFiles\HTTPERRApache日志位置/var/log/httpd/access.log /var/log/apache/access.log /var/log/apache2/access.log /var/log/httpd-access.logNginx日志位置默认在 /usr/local/nginx/Togs 目录下,access.log 代表访问日志error.log 代表错误日志。若没有在默认路径下,则可以到nginx.conf 配置文件中香找Tomcat 日志的位置:默认在 TOMCAT HOME/Logs/ 目录下,有 catalina.out、catalina.YYYY-MM- DD.og、localhost.YYYY-MM-DD.og.ocalhost access log.YYYY-MM-DD.txt、host-manager.YYYY-MM-DD.g、manager.YYYY-MM-DD.log 等几类日志WebLogic 日志的位置:在默认情况下,WebLogic 有三种日志,分别是 access og、server log 和 domain logaccess.log $MW_HOME\user_projects\domains\<domain_name>\servers\<server_name>\logs\access.log server.log $MW_HOME\user_projects\domains\<domain_name>\servers\<server_name>\logs\<server_name>.log domain.log $MW_HOME\user_projects\domains\<domain_name>\servers\<adminserver_name>\logs\<domain_name>.log进程排查Windows系统进程排查对于 Windows 系统中的进程排查,主要是找到恶意进程的 PID、程席路径,有时还需要找到 PPID (PID 的父进程)及程序加载的DLL。对于进程的排查,一般有如下几种方法。任务管理器tasklist命令查询命令行输入 tasklist 可以显示计算机上的所有进程命令行输入 tasklist /m 可显示进程加载DLL情况tasklist /m ntdll.dll 查看特定DLL调用的进程netstat可以显示网络连接信息命令行输入命令 netstat -ano | findstr "ESTABLISHED" 定位可疑的ESTABLISHEDPID=12306有大量的网络连接命令行输入 tasklist | find "12306" 查看具体的程序Linux进程排查命令行输入 netstat 网络连接命令,分析可疑端口,可疑IP,可疑PIDPID=2963存在恶意外链的情况根据 PID 的值,利用 ls -alt /proc/PID 命令,可查看其对应的可执行程序也可以利用 Lsof -D PID 命令,查看进程所打开的文件。使用 Lsof -p 2963 命令,可查看 PID 为2963 的进程所打开的文件,发现文件mbrn 为可疑文件命令行输入指令,杀死进程 kill -9 PID kill -9 2535命令行输入指令,删除文件 rm -rf filename rm -rf mbrn如果 root 用户都无法删除相关文件,那么很可能是因为该文件被加上了 i 属性。使用 Lsatter filename 命令,可查看文件属性,然后使用 chattr -i filename 命令,可移除 i 属性,进而删除文件。也有的进程因为存在守护进程而无法删除,我们可以先把进程挂起,查杀守护进程后,再返回将进程删除有时攻击者会隐藏进程,需要学会查看隐藏进程,一次输入以下指令即可查看ps -ef awkprint sort -n uniq >1 ls /proc sort -n uniq >2 diff 1 2服务排查Windows服务排查命令行输入 services.msc ,之后会打开服务窗口,查看所有服务项,名称,描述,状态Linux服务排查命令行输入 chkconfig --list 查看系统运行的服务其中,0、1、2、3、4、5、6 表示等级,具体含义如下:1 表示单用户模式,2 表示无网络连接的多用户命令行模式;3 表示有网络连接的多用户命令行模式4表示不可用;5 表示带图形界面的多用户模式:6 表示重新启动。命令行输入 service --status-all 可以查看所有服务的状态
绝对勇士
引言在现代的分布式系统中,缓存是提高性能和扩展性的重要组成部分之一。Redis 是一个开源、内存中的数据结构存储系统,可以用作数据库、缓存和消息中间件。而 WebFlux 是 Spring 框架提供的响应式编程模型,在处理高并发和大数据量的情况下具有很好的性能和扩展性。本文将介绍如何使用 Reactor 和 WebFlux 集成 Redis,利用其响应式特性来处理缓存操作。1. 环境准备首先,我们需要在项目的 pom.xml 文件中添加对 Spring WebFlux 和 Spring Data Redis 的依赖:<dependencies> ... <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency> ... </dependencies>2. 配置Redis连接信息在 application.properties 文件中添加Redis连接的配置信息:spring.redis.host=127.0.0.1 spring.redis.port=63793. 创建缓存管理器在项目的配置类中创建一个 RedisCacheManager 来管理缓存:@Configuration public class CacheConfig { @Bean public RedisCacheManager cacheManager(RedisConnectionFactory connectionFactory) { RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofMinutes(5)) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new JdkSerializationRedisSerializer())); return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(connectionFactory) .cacheDefaults(cacheConfiguration) .build(); } }在上述代码中,我们使用 RedisCacheConfiguration 配置了缓存的默认过期时间、键和值的序列化方式。4. 编写缓存逻辑定义一个Service类来处理缓存操作:@Service public class UserService { @Autowired private UserRepository userRepository; @Autowired private ReactiveRedisOperations<String, User> redisOperations; @Cacheable(cacheNames = "users", key = "#id") public Mono<User> getUserById(String id) { return userRepository.findById(id) .flatMap(user -> redisOperations.opsForValue().set(id, user) .then(Mono.just(user))); } @CachePut(cacheNames = "users", key = "#user.id") public Mono<User> saveUser(User user) { return userRepository.save(user) .flatMap(savedUser -> redisOperations.opsForValue().set(savedUser.getId(), savedUser) .then(Mono.just(savedUser))); } @CacheEvict(cacheNames = "users", key = "#id") public Mono<Void> deleteUserById(String id) { return userRepository.deleteById(id) .then(redisOperations.opsForValue().delete(id)); } }在上述代码中,我们使用 Spring 框架的缓存注解来定义缓存的逻辑。@Cacheable 用于读取缓存,@CachePut 用于更新缓存,@CacheEvict 用于清除缓存。同时,我们使用 ReactiveRedisOperations 来执行Redis的操作。5. 创建WebFlux控制器编写一个WebFlux控制器来处理请求:@RestController public class UserController { @Autowired private UserService userService; @GetMapping("/users/{id}") public Mono<User> getUserById(@PathVariable String id) { return userService.getUserById(id); } @PostMapping("/users") public Mono<User> saveUser(@RequestBody User user) { return userService.saveUser(user); } @DeleteMapping("/users/{id}") public Mono<Void> deleteUserById(@PathVariable String id) { return userService.deleteUserById(id); } }在上述代码中,我们使用 @GetMapping、@PostMapping 和 @DeleteMapping 来映射 URL,并调用 UserService 中的相应方法来处理具体的业务逻辑。总结本文介绍了如何使用 Reactor 和 WebFlux 集成 Redis 来处理缓存操作。通过使用 ReactiveRedisOperations 和 Spring 框架的缓存注解,我们可以方便地实现响应式的缓存逻辑。这种方式可以提升系统的性能和扩展性,特别适用于高并发和大数据量的场景。希望本文对您在使用 Reactor 和 WebFlux 集成 Redis 方面有所帮助。
绝对勇士
我们使用 subscribeOn 和 publishOn 操作符在响应链中切换执行上下文(Reactor 中叫 Scheduler)。上一篇文章中,我们说到 Reactor 默认行为是执行订阅的同一线程将用于整个管道执行。如果要切换执行线程怎么办?可以使用 publishOn 和 SubscribeOn让我们看个简单的例子:class ReactiveJavaTutorial { public static void main(String[] args) { Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam") .map(String::toUpperCase) .filter(cityName -> cityName.length() <= 8) .map(cityName -> cityName.concat(" City")) .log(); cities.subscribe(); } 输出:17:39:41.693 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 17:39:41.712 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber) 17:39:41.714 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded) 17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(NEW YORK City) 17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(LONDON City) 17:39:41.715 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(PARIS City) 17:39:41.716 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()中括号中的就是线程名称,在这个例子中,都是 main。可以看到整个管道执行器中都是使用的 main 线程。有的时候,我们可能想告诉 Reactor 别在整个管道中使用同一个线程。我可以使用 subscribeOn() 和 publishOn() 方法达到效果。subscribeOn() 方法subscribeOn() 方法适用于订阅过程。我们可以把它放在响应链条中的任意位置。它接收 Scheduler 参数,且在提供的线程池中选择线程执行。在下面的例子中,我们使用有界弹性线程池(Schedulers.boundElastic())。@Test public void testSubscribeThread() { Flux<String> cities = Flux.just("New York", "London", "Paris", "Amsterdam") .subscribeOn(Schedulers.boundedElastic()) .map(String::toUpperCase) .filter(cityName -> cityName.length() <= 8) .map(cityName -> cityName.concat(" City")) .map(TestCase::concat) .map(TestCase::stringToUpperCase) .log(); // cities.subscribe(); System.out.println(cities.blockFirst()); }PS: 原文提供的case,没有输出,简单修改了一下。输出:20:07:53.517 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 20:07:53.558 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber) 20:07:53.560 [main] INFO reactor.Flux.Map.1 - request(unbounded) concat: boundedElastic-1 stringToUpperCase: boundedElastic-1 20:07:53.564 [boundedElastic-1] INFO reactor.Flux.Map.1 - onNext(NEW YORK CITY CITY) 20:07:53.565 [boundedElastic-1] INFO reactor.Flux.Map.1 - cancel() NEW YORK CITY CITY可以看到 main 线程开始订阅,但是被切换成 boundedElastic-1 线程。我们提供了一个 Scheduler (Schedulers.boundedElastic()),然后这个线程池中的一个线程被选中来替换 main 线程。publishOn() 方法publishOn() 方法跟 subscribeOn() 很类似,但是有一个主要区别。来看个例子:class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just("New York", "London", "Paris", "Amsterdam") .map(ReactiveJavaTutorial::stringToUpperCase) .publishOn(Schedulers.boundedElastic()) .map(ReactiveJavaTutorial::concat) .subscribe(); } private static String stringToUpperCase(String name) { System.out.println("stringToUpperCase: " + Thread.currentThread().getName()); return name.toUpperCase(); } private static String concat(String name) { System.out.println("concat: " + Thread.currentThread().getName()); return name.concat(" City"); } }这里,我们在两个 map 操作中放一个 publishOn()。我们来看输出:stringToUpperCase: main stringToUpperCase: main stringToUpperCase: main concat: boundedElastic-1 concat: boundedElastic-1 concat: boundedElastic-1可以看到,所有在 publishOn 操作之前的都是 main 线程执行,所有 publishOn 之后的都是 boundedElastic-1 执行。这是因为 publishOn 充当任何其他操作符。它从上游接收信号,并在关联的 Scheduler 上对一个 worker 执行回调时向下游重播。这就是 publishOn 和 subscribeOn() 的主要区别。无论我们把 subscribeOn() 放在哪里,它提供的 Scheduler 都会应用到整条响应链。subscribeOn and publishOn operators in Project Reactor
绝对勇士
WebFlux 服务编排是指使用 WebFlux 框架来编排多个异步服务的执行顺序和数据流动,从而构建出一个完整的、基于事件驱动的响应式应用程序。WebFlux服务编排的优势如下:高性能:WebFlux基于响应式编程模型,可以使用少量的线程处理大量的请求,从而提高系统的并发能力和吞吐量。异步处理:WebFlux可以异步处理请求和响应,避免线程的阻塞和等待,提高系统的并发能力和性能。高可靠性:WebFlux基于事件驱动的编程模型,可以更好地处理错误和异常,从而提高系统的可靠性和稳定性。简洁清晰:WebFlux的代码简洁清晰,可以使用函数式编程风格来编写业务逻辑,提高代码的可读性和可维护性。可扩展性:WebFlux可以轻松地集成其他的响应式组件和服务,例如Reactive Streams、Spring Cloud、RSocket等,从而提高系统的可扩展性和灵活性。综上所述,WebFlux服务编排可以帮助我们构建高性能、高可靠性、可扩展性强的响应式应用程序,提高系统的并发能力和性能,从而更好地满足现代应用程序的需求。一个示例public Mono> getOrderDetails(String orderId) { return Mono.fromCallable(() -> { // 查询订单基本信息 return "order info"; }) .flatMap(orderInfo -> { // 查询订单商品信息 return Mono.fromCallable(() -> { return "order item info"; }); }) .flatMap(orderItemInfo -> { // 查询订单配送信息 return Mono.fromCallable(() -> { return "order delivery info"; }); }) .flatMap(orderDeliveryInfo -> { // 查询订单支付信息 return Mono.fromCallable(() -> { return "order payment info"; }); }); }为什么使用 fromCallable,就是上面说的,WebFlux 编排的是异步服务,而不是同步服务。但是实际线上不要使用 fromCallable,会导致创建很多个线程,高并发场景下会导致资源竞争激烈,从而服务性能急剧下降。1 串行1.1 不需要 invoker1 的结果long start = System.currentTimeMillis(); Mono<String> invoke1 = Invoker1.invoke1(); Mono<String> result = invoke1.flatMap(p -> Invoker2.invoke2()) .map(s -> { return s.toString(); }); // result: invoker2, 耗时:3592(串行) System.out.println("result: " + result.block() + ", 耗时:" + (System.currentTimeMillis() - start));1.2 需要返回 invoker1 的结果long start = System.currentTimeMillis(); Mono<String> invoke1 = Invoker1.invoke1(); Mono<String> result = invoke1.flatMap(p -> { return Invoker2.invoke2().map(s -> { return p + s; }); }); // result: invoker1invoker2, 耗时:3554(串行) System.out.println("result: " + result.block() + ", 耗时:" + (System.currentTimeMillis() - start));2 并行2.1 zip 方法zip() 方法可以一次组装任意个Mono,适用于有多个Mono的情况long start = System.currentTimeMillis(); Mono<String> invoke1 = Invoker1.invoke1(); Mono<String> invoker2 = Invoker2.invoke2(); Mono<String> result = Mono.zip(invoke1, invoker2) .map(s-> { String t1 = s.getT1(); String t2 = s.getT2(); return String.format("invoke1:%s, invoke2: %s", t1, t2); }); // invoker1invoker2耗时:2650 (并行) System.out.println("result: " + result.block() + ",耗时:" + (System.currentTimeMillis() - start));2.2 zipWith 方法zipWith() 每次组装一个Mono对象,使用于组装Mono个数比较少的情况。long start = System.currentTimeMillis(); Mono<String> invoke1 = Invoker1.invoke1(); Mono<String> invoker2 = Invoker2.invoke2(); Mono<String> result = invoke1.zipWith(invoker2) .map(s -> { return String.format("invoke1:%s, invoke2: %s", s.getT1(), s.getT2()); }); // invoker1invoker2耗时:2469 (并行) System.out.println(result.block() + ",耗时:" + (System.currentTimeMillis() - start));3 前提这里的 invoker 就是第三方系统调用。保证 invoker 是在独立的线程中执行,这样 invoker 不会影响业务处理。public class Invoker1 { public static Mono<String> invoke1() { return Mono. fromSupplier(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "invoker1"; }) .subscribeOn(Schedulers.parallel()) .doOnError(e -> { System.out.println("error invoker1"); }); } }public class Invoker2 { public static Mono<String> invoke2() { return Mono.fromSupplier(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } return "invoker2"; }) .subscribeOn(Schedulers.parallel()) .doOnError(e -> { System.out.println("error invoker2"); }); } }