🎤 面试官:请说说你们C端商品多条件搜索是怎么实现的?包括ES的使用和数据同步。
面试者: 好的,我来从 架构设计、ES搜索实现、Canal监听、RabbitMQ传输、增量同步 五个层面,完整说明我们基于 Spring Boot + spring-boot-starter-data-elasticsearch + Canal + RabbitMQ 的商品搜索系统。
一、整体架构
MySQL → Canal → (解析Binlog) → RabbitMQ → 搜索服务 → Elasticsearch ← 全量同步
↑
消费RabbitMQ✅ 核心组件:
- Canal:伪装成 MySQL Slave,监听 binlog
- RabbitMQ:接收 Canal 发来的变更消息(替代 Kafka)
- Elasticsearch:提供搜索与筛选服务
- 搜索服务:处理查询 + 消费 RabbitMQ 消息 + 同步 ES
二、Elasticsearch 实现(使用 spring-boot-starter-data-elasticsearch)
1. Maven 依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>配置
yaml
spring:
elasticsearch:
uris: 192.168.1.100:9200
username: elastic
password: changeme
data:
elasticsearch:
repositories:
enabled: false # 关闭自动 Repository 扫描(我们用 Template)2. 商品实体类(@Document)
java
@Document(indexName = "product_index")
public class Product {
@Id
private Long id;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
@Field(type = FieldType.Keyword)
private String brand;
@Field(type = FieldType.Integer)
private Integer categoryId;
@Field(type = FieldType.Long)
private Long price; // 单位:分
@Field(type = FieldType.Keyword)
private List<String> tags;
@Field(type = FieldType.Keyword)
private String status;
// getter / setter
}3. 多条件搜索实现(使用 ElasticsearchRestTemplate)
java
@Service
public class SearchService {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
public SearchHits<Product> searchProducts(SearchRequest request) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 全文检索
if (StringUtils.hasText(request.getQuery())) {
boolQuery.must(
QueryBuilders.multiMatchQuery(request.getQuery(), "title", "tags")
.field("title", 3f)
);
}
// 过滤条件
BoolQueryBuilder filterBool = QueryBuilders.boolQuery();
if (request.getBrand() != null) {
filterBool.must(QueryBuilders.termQuery("brand", request.getBrand()));
}
if (request.getCategoryId() != null) {
filterBool.must(QueryBuilders.termQuery("category_id", request.getCategoryId()));
}
if (CollectionUtils.isNotEmpty(request.getTags())) {
filterBool.must(QueryBuilders.termsQuery("tags", request.getTags()));
}
if (request.getMinPrice() != null || request.getMaxPrice() != null) {
RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price");
if (request.getMinPrice() != null) priceRange.gte(request.getMinPrice() * 100);
if (request.getMaxPrice() != null) priceRange.lte(request.getMaxPrice() * 100);
filterBool.must(priceRange);
}
filterBool.must(QueryBuilders.termQuery("status", "on_sale"));
boolQuery.filter(filterBool);
// 构建查询
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withPageable(PageRequest.of(request.getPage() - 1, request.getSize()))
.withSort(SortBuilders.scoreSort().order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("sales").order(SortOrder.DESC)) // 聚合用于筛选联动
.addAggregation(AggregationBuilders.terms("brand_agg").field("brand").size(10))
.addAggregation(AggregationBuilders.terms("tag_agg").field("tags").size(20))
.addAggregation(AggregationBuilders.range("price_agg", "price")
.addUnboundedTo("0-500", 50000)
.addRange("500-1000", 50000, 100000)
.addRange("1000-3000", 100000, 300000)
.addUnboundedFrom("3000以上", 300000)
)
.build();
return elasticsearchTemplate.search(query, Product.class);
}
}✅ 返回值说明:
包含:
getSearchHits():商品列表getAggregations():品牌、价格、标签的聚合数据(用于筛选联动)
✅ 如何解析聚合结果?
这个的目的是将检索到的结果重新加入到搜索条件中进行检索,因为有些条件是根据某些条件搜索得到的
java
SearchHits<Product> hits = searchService.searchProducts(request);
// 解析品牌聚合
Terms brandAgg = hits.getAggregations().get("brand_agg");
List<AggregateBean> brandList = brandAgg.getBuckets().stream()
.map(bucket -> new AggregateBean(bucket.getKeyAsString(), bucket.getDocCount()))
.collect(Collectors.toList());前端用这些数据做“华为(120)”、“小米(98)”这样的展示。
三、Canal 客户端代码(监听 MySQL Binlog)
Canal 客户端连接 Canal Server(部署在独立服务器),监听指定表的变更。
1. Maven 依赖
xml
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>2. Canal 监听线程(独立运行或嵌入 Spring Boot)
java
// CanalClient.java
@Component
@DependsOn("rabbitMQConfig") // 确保 MQ 已初始化
public class CanalClient implements InitializingBean {
private final String canalServer = "192.168.1.101:11111";
private final String destination = "example"; // canal instance name
@Autowired
private RabbitMQSender rabbitMQSender;
@Override
public void afterPropertiesSet() {
new Thread(this::run).start();
}
private void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(canalServer.split(":")[0],
Integer.parseInt(canalServer.split(":")[1])),
destination, "", ""
);
connector.connect();
connector.subscribe("shop_db\\.product"); // 监听指定表
while (true) {
Message message = connector.getWithoutAck(1000); // 批量拉取
long batchId = message.getId();
try {
if (batchId == -1) {
Thread.sleep(100);
continue;
}
for (CanalEntry.Entry entry : message.getEntries()) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
publishToRabbitMQ(entry);
}
}
connector.ack(batchId); // 确认消费
} catch (Exception e) {
connector.rollback(batchId); // 失败回滚
log.error("Canal 处理失败", e);
}
}
}
private void publishToRabbitMQ(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
String eventType = entry.getHeader().getEventType().name(); // INSERT, UPDATE, DELETE
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
JSONObject data = new JSONObject();
// 根据事件类型选择数据
List<CanalEntry.Column> columns =
eventType.equals("DELETE") ? rowData.getBeforeColumnsList() : rowData.getAfterColumnsList();
for (CanalEntry.Column column : columns) {
data.put(column.getName(), column.getValue());
}
// 构造消息
JSONObject msg = new JSONObject();
msg.put("database", entry.getHeader().getSchemaName());
msg.put("table", tableName);
msg.put("type", eventType);
msg.put("ts", entry.getHeader().getExecuteTime()); // 毫秒时间戳
msg.put("data", data);
// 发送到 RabbitMQ
rabbitMQSender.sendMessage("canal.product.change", msg.toJSONString());
}
}
}四、RabbitMQ 配置与使用
1. Maven 依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>2. 配置文件(application.yml)
yaml
spring:
rabbitmq:
host: 192.168.1.102
port: 5672
username: guest
password: guest
virtual-host: /3. RabbitMQ 发送者
java
@Component
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("canal.exchange", routingKey, message);
}
}4. RabbitMQ 配置类(声明交换机、队列)
java
@Configuration
public class RabbitMQConfig {
@Bean
public DirectExchange canalExchange() {
return new DirectExchange("canal.exchange", true, false);
}
@Bean
public Queue productChangeQueue() {
return new Queue("product.change.queue", true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(productChangeQueue())
.to(canalExchange())
.with("canal.product.change");
}
}五、搜索服务消费 RabbitMQ 消息,同步 ES
java
@Component
public class ProductChangeConsumer {
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
@RabbitListener(queues = "product.change.queue")
public void handleMessage(String message) {
try {
JSONObject msg = JSON.parseObject(message);
String type = msg.getString("type");
JSONObject data = msg.getJSONObject("data");
Product product = convertToProduct(data);
switch (type) {
case "INSERT":
case "UPDATE":
if ("on_sale".equals(product.getStatus())) {
elasticsearchTemplate.save(product);
} else {
elasticsearchTemplate.delete(product, Product.class);
}
break;
case "DELETE":
elasticsearchTemplate.delete(product.getId().toString(), Product.class);
break;
}
} catch (Exception e) {
log.error("同步 ES 失败", e);
// 可以发到死信队列,或记录日志重试
}
}
private Product convertToProduct(JSONObject data) {
Product p = new Product();
p.setId(data.getLong("id"));
p.setTitle(data.getString("title"));
p.setBrand(data.getString("brand"));
p.setCategoryId(data.getInteger("category_id"));
p.setPrice(data.getLong("price") * 100);
p.setTags(Arrays.asList(data.getString("tags").split(",")));
p.setStatus(data.getString("status"));
return p;
}
}六、全量同步(补数据或重建索引)
java
@Service
public class FullSyncService {
@Autowired
private ProductRepository productRepository;
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
public void syncAll() {
int pageSize = 1000;
Pageable pageable = PageRequest.of(0, pageSize);
do {
Page<Product> page = productRepository.findByStatus("on_sale", pageable);
if (page.isEmpty()) break;
elasticsearchTemplate.save(page.getContent());
pageable = page.nextPageable();
} while (page.hasNext());
log.info("全量同步完成");
}
}七、数据一致性保障
| 机制 | 说明 |
|---|---|
| ✅ Canal 批量拉取 + ACK | 防止消息丢失 |
| ✅ RabbitMQ 持久化 | 消息写入磁盘,宕机不丢 |
| ✅ 消费幂等 | elasticsearchTemplate.save() 是 upsert,天然幂等 |
| ✅ 每日对账 | 对比 MySQL 和 ES 的商品数量,差异大则告警 |
| ✅ 人工补偿接口 | 提供 /sync/full 接口手动触发全量同步 |
✅ 总结: 这套方案优势:
- 解耦:Canal 独立部署,搜索服务无感知
- 可靠:RabbitMQ 持久化 + ACK 机制
- 易维护:使用
ElasticsearchRestTemplate,符合 Spring 风格 - 可扩展:未来可接入更多消费者(如推荐系统)