初识Flux:数据世界的自来水管
1.1 当数据变成水流
想象你家的自来水管就是Flux的化身。传统的水桶打水(List)需要一次性搬完所有水,而Flux就像安装了智能水龙头:- 打开开关就持续出水(数据流)
- 可以随时调节水流大小(背压控制)
- 水压不足时会自动通知你(异步回调)
Flux<String> 厨房水管 = Flux.just("清水", "洗洁精", "洗碗水");
厨房水管.subscribe(水分子 -> System.out.println("正在处理:" + 水分子));
1.2 快递站 vs 实体店
传统集合像超市购物(List):- 必须等所有商品上架才能购买
- 买牛奶也得先等面包生产完
Flux<快递包裹> 每日快递 = Flux.create(快递站 -> {
快递站.next(晨报包裹());
快递站.next(鲜奶包裹());
快递站.next(网购包裹());
});
每日快递中的包裹就像Flux数据流中的元素,快递员每准备好一个包裹就立即派送,不需要等待所有货物都到达快递站后才进行配送,这样更加灵活高效。
Flux核心操作
2.1 咖啡店的点单艺术
假设你是星巴克的咖啡师Flux:操作符
现实对应
代码示例
map
美式→卡布奇诺
.map(咖啡豆 → 研磨(咖啡豆))
filter
拒绝过期的牛奶
.filter(牛奶 → 牛奶.保质期 > 今天)
buffer
外卖订单批量处理
.buffer(5) → 每5杯打包一次
Flux.just("拿铁","摩卡","浓缩")
.map(订单 -> 订单 + "加糖")
.filter(订单 -> !订单.contains("无糖"))
.buffer(2)
.subscribe(批量订单 -> System.out.println("制作:" + 批量订单));
2.2 异常处理三连招
像经验丰富的外卖骑手处理突发状况:- onErrorReturn:爆胎时换备用自行车
.onErrorReturn("外卖延误通知单")
- onErrorResume:改派其他骑手接单
.onErrorResume(e -> 备用骑手.get订单流())
- retry:重新尝试配送
.retry(3) // 最多重试3次
深入数据快递站:订阅者的智慧
3.1 杂志订阅的四种信号
订阅Flux就像订杂志:厨房水管.subscribe(
最新期刊 -> System.out.println("收到:" + 最新期刊), // onNext
退订通知 -> System.err.println("因为:" + 退订通知), // onError
() -> System.out.println("本年期刊已全部送达"), // onComplete
订阅订单 -> { // onSubscribe
订阅订单.request(3); // 每次只收3期杂志
}
);
3.2 背压危机处理:快递仓库的智慧
当消费者处理速度跟不上时:- 直接丢弃(DROP):爆仓时扔掉旧包裹
.onBackpressureDrop(包裹 -> 记录丢弃(包裹))
- 缓存策略(BUFFER):租用临时仓库
.onBackpressureBuffer(100) // 最大缓存100个
- 最新优先(LATEST):只保留最新包裹
.onBackpressureLatest()
进阶技巧:Flux的七十二变
4.1 冷热Flux之争:电影院 vs 录像带
类型类比
特点
创建方式
冷Flux
DVD租赁店
每次播放都是全新开始
Flux.just()
热Flux
电影院直播
后来观众只能看实时画面
Flux.share()
实例场景:
Flux<String> 冷直播 = Flux.interval(Duration.ofSeconds(1))
.map(i -> "比赛第"+i+"分钟");
// 第一个观众看到完整比赛
冷直播.subscribe(观众A);
Thread.sleep(5000);
// 第二个观众从第5分钟开始看
冷直播.subscribe(观众B);
4.2 Flux组合套餐:数据交响乐团
- zip操作:咖啡+甜点套餐
Flux<String> 咖啡 = Flux.just("美式", "拿铁");
Flux<String> 甜点 = Flux.just("蛋糕", "马卡龙");
Flux.zip(咖啡, 甜点)
.map(tuple -> tuple.getT1() + "+" + tuple.getT2())
.subscribe(System.out:
rintln);
// 输出:美式+蛋糕,拿铁+马卡龙
Flux.merge(中餐厨房, 西餐厨房)
.subscribe(服务员::上菜);
实战演练:外卖平台订单系统
场景需求:
- 实时接收订单(每秒100+)
- 智能过滤无效订单
- 批量处理(每10单打包)
- 动态限流(根据餐厅产能)
Flux<订单> 订单流 = 骑手APP.get实时订单流()
.filter(订单 → 订单.验证有效性())
.map(订单 → 订单.添加时间戳())
.bufferTimeout(10, Duration.ofSeconds(5))
.onBackpressureDrop(爆单处理::记录丢弃订单)
.doOnNext(批量订单 → 厨房系统.接收订单(批量订单))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
- 像智能水流控制阀门一样管理订单
- 高峰期自动开启限流保护
- 网络波动时自动重试
- 厨房永远只处理合理数量的订单
结语:成为数据管道的掌控者
Flux就像城市地下的智能水管网络:- 知道何时加速(concatMap)
- 懂得何时分流(flatMap)
- 具备自我修复能力(retry)
- 拥有流量感知(背压控制)
原文
部分内容由AI优化原文来源于本人博客:Java Flux流 从0到精通|Handsome