Skip to content

🎤 面试官:请说说你们C端商品多条件搜索是怎么实现的?包括ES的使用和数据同步。

面试者: 好的,我来从 架构设计、ES搜索实现、Canal监听、RabbitMQ传输、增量同步 五个层面,完整说明我们基于 Spring Boot + spring-boot-starter-data-elasticsearch + Canal + RabbitMQ 的商品搜索系统。


一、整体架构

MySQL → Canal → (解析Binlog) → RabbitMQ → 搜索服务 → Elasticsearch ← 全量同步

                         消费RabbitMQ

✅ 核心组件:

  • Canal:伪装成 MySQL Slave,监听 binlog
  • RabbitMQ:接收 Canal 发来的变更消息(替代 Kafka)
  • Elasticsearch:提供搜索与筛选服务
  • 搜索服务:处理查询 + 消费 RabbitMQ 消息 + 同步 ES

二、Elasticsearch 实现(使用 spring-boot-starter-data-elasticsearch

1. Maven 依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

配置

yaml
spring:
  elasticsearch:
    uris: 192.168.1.100:9200
    username: elastic
    password: changeme
  data:
    elasticsearch:
      repositories:
        enabled: false  # 关闭自动 Repository 扫描(我们用 Template)

2. 商品实体类(@Document)

java
@Document(indexName = "product_index")
public class Product {

    @Id
    private Long id;

    @Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
    private String title;

    @Field(type = FieldType.Keyword)
    private String brand;

    @Field(type = FieldType.Integer)
    private Integer categoryId;

    @Field(type = FieldType.Long)
    private Long price; // 单位:分

    @Field(type = FieldType.Keyword)
    private List<String> tags;

    @Field(type = FieldType.Keyword)
    private String status;

    // getter / setter
}

3. 多条件搜索实现(使用 ElasticsearchRestTemplate

java
@Service
public class SearchService {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;

    public SearchHits<Product> searchProducts(SearchRequest request) {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        // 全文检索
        if (StringUtils.hasText(request.getQuery())) {
            boolQuery.must(
                QueryBuilders.multiMatchQuery(request.getQuery(), "title", "tags")
                    .field("title", 3f)
            );
        }

        // 过滤条件
        BoolQueryBuilder filterBool = QueryBuilders.boolQuery();
        if (request.getBrand() != null) {
            filterBool.must(QueryBuilders.termQuery("brand", request.getBrand()));
        }
        if (request.getCategoryId() != null) {
            filterBool.must(QueryBuilders.termQuery("category_id", request.getCategoryId()));
        }
        if (CollectionUtils.isNotEmpty(request.getTags())) {
            filterBool.must(QueryBuilders.termsQuery("tags", request.getTags()));
        }
        if (request.getMinPrice() != null || request.getMaxPrice() != null) {
            RangeQueryBuilder priceRange = QueryBuilders.rangeQuery("price");
            if (request.getMinPrice() != null) priceRange.gte(request.getMinPrice() * 100);
            if (request.getMaxPrice() != null) priceRange.lte(request.getMaxPrice() * 100);
            filterBool.must(priceRange);
        }
        filterBool.must(QueryBuilders.termQuery("status", "on_sale"));

        boolQuery.filter(filterBool);

        // 构建查询
        NativeSearchQuery query = new NativeSearchQueryBuilder()
            .withQuery(boolQuery)
            .withPageable(PageRequest.of(request.getPage() - 1, request.getSize()))
            .withSort(SortBuilders.scoreSort().order(SortOrder.DESC))
            .withSort(SortBuilders.fieldSort("sales").order(SortOrder.DESC))  // 聚合用于筛选联动
          .addAggregation(AggregationBuilders.terms("brand_agg").field("brand").size(10))
            .addAggregation(AggregationBuilders.terms("tag_agg").field("tags").size(20))
            .addAggregation(AggregationBuilders.range("price_agg", "price")
                    .addUnboundedTo("0-500", 50000)
                    .addRange("500-1000", 50000, 100000)
                    .addRange("1000-3000", 100000, 300000)
                    .addUnboundedFrom("3000以上", 300000)
            )
            .build();

        return elasticsearchTemplate.search(query, Product.class);
    }
}

✅ 返回值说明:

包含:

  • getSearchHits():商品列表
  • getAggregations():品牌、价格、标签的聚合数据(用于筛选联动)

✅ 如何解析聚合结果?

这个的目的是将检索到的结果重新加入到搜索条件中进行检索,因为有些条件是根据某些条件搜索得到的

java
SearchHits<Product> hits = searchService.searchProducts(request);
// 解析品牌聚合
Terms brandAgg = hits.getAggregations().get("brand_agg");
List<AggregateBean> brandList = brandAgg.getBuckets().stream()
    .map(bucket -> new AggregateBean(bucket.getKeyAsString(), bucket.getDocCount()))
    .collect(Collectors.toList());

前端用这些数据做“华为(120)”、“小米(98)”这样的展示。

三、Canal 客户端代码(监听 MySQL Binlog)

Canal 客户端连接 Canal Server(部署在独立服务器),监听指定表的变更。

1. Maven 依赖

xml
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.6</version>
</dependency>

2. Canal 监听线程(独立运行或嵌入 Spring Boot)

java
// CanalClient.java
@Component
@DependsOn("rabbitMQConfig") // 确保 MQ 已初始化
public class CanalClient implements InitializingBean {

    private final String canalServer = "192.168.1.101:11111";
    private final String destination = "example"; // canal instance name

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Override
    public void afterPropertiesSet() {
        new Thread(this::run).start();
    }

    private void run() {
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(canalServer.split(":")[0], 
                                  Integer.parseInt(canalServer.split(":")[1])),
            destination, "", ""
        );

        connector.connect();
        connector.subscribe("shop_db\\.product"); // 监听指定表

        while (true) {
            Message message = connector.getWithoutAck(1000); // 批量拉取
            long batchId = message.getId();

            try {
                if (batchId == -1) {
                    Thread.sleep(100);
                    continue;
                }

                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        publishToRabbitMQ(entry);
                    }
                }

                connector.ack(batchId); // 确认消费

            } catch (Exception e) {
                connector.rollback(batchId); // 失败回滚
                log.error("Canal 处理失败", e);
            }
        }
    }

    private void publishToRabbitMQ(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

        String tableName = entry.getHeader().getTableName();
        String eventType = entry.getHeader().getEventType().name(); // INSERT, UPDATE, DELETE

        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            JSONObject data = new JSONObject();

            // 根据事件类型选择数据
            List<CanalEntry.Column> columns = 
                eventType.equals("DELETE") ? rowData.getBeforeColumnsList() : rowData.getAfterColumnsList();

            for (CanalEntry.Column column : columns) {
                data.put(column.getName(), column.getValue());
            }

            // 构造消息
            JSONObject msg = new JSONObject();
            msg.put("database", entry.getHeader().getSchemaName());
            msg.put("table", tableName);
            msg.put("type", eventType);
            msg.put("ts", entry.getHeader().getExecuteTime()); // 毫秒时间戳
            msg.put("data", data);

            // 发送到 RabbitMQ
            rabbitMQSender.sendMessage("canal.product.change", msg.toJSONString());
        }
    }
}

四、RabbitMQ 配置与使用

1. Maven 依赖

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置文件(application.yml)

yaml
spring:
  rabbitmq:
    host: 192.168.1.102
    port: 5672
    username: guest
    password: guest
    virtual-host: /

3. RabbitMQ 发送者

java
@Component
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String routingKey, String message) {
        rabbitTemplate.convertAndSend("canal.exchange", routingKey, message);
    }
}

4. RabbitMQ 配置类(声明交换机、队列)

java
@Configuration
public class RabbitMQConfig {

    @Bean
    public DirectExchange canalExchange() {
        return new DirectExchange("canal.exchange", true, false);
    }

    @Bean
    public Queue productChangeQueue() {
        return new Queue("product.change.queue", true);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(productChangeQueue())
            .to(canalExchange())
            .with("canal.product.change");
    }
}

五、搜索服务消费 RabbitMQ 消息,同步 ES

java
@Component
public class ProductChangeConsumer {

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;

    @RabbitListener(queues = "product.change.queue")
    public void handleMessage(String message) {
        try {
            JSONObject msg = JSON.parseObject(message);
            String type = msg.getString("type");
            JSONObject data = msg.getJSONObject("data");

            Product product = convertToProduct(data);

            switch (type) {
                case "INSERT":
                case "UPDATE":
                    if ("on_sale".equals(product.getStatus())) {
                        elasticsearchTemplate.save(product);
                    } else {
                        elasticsearchTemplate.delete(product, Product.class);
                    }
                    break;
                case "DELETE":
                    elasticsearchTemplate.delete(product.getId().toString(), Product.class);
                    break;
            }

        } catch (Exception e) {
            log.error("同步 ES 失败", e);
            // 可以发到死信队列,或记录日志重试
        }
    }

    private Product convertToProduct(JSONObject data) {
        Product p = new Product();
        p.setId(data.getLong("id"));
        p.setTitle(data.getString("title"));
        p.setBrand(data.getString("brand"));
        p.setCategoryId(data.getInteger("category_id"));
        p.setPrice(data.getLong("price") * 100);
        p.setTags(Arrays.asList(data.getString("tags").split(",")));
        p.setStatus(data.getString("status"));
        return p;
    }
}

六、全量同步(补数据或重建索引)

java
@Service
public class FullSyncService {

    @Autowired
    private ProductRepository productRepository;

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;

    public void syncAll() {
        int pageSize = 1000;
        Pageable pageable = PageRequest.of(0, pageSize);

        do {
            Page<Product> page = productRepository.findByStatus("on_sale", pageable);
            if (page.isEmpty()) break;

            elasticsearchTemplate.save(page.getContent());
            pageable = page.nextPageable();

        } while (page.hasNext());

        log.info("全量同步完成");
    }
}

七、数据一致性保障

机制说明
Canal 批量拉取 + ACK防止消息丢失
RabbitMQ 持久化消息写入磁盘,宕机不丢
消费幂等elasticsearchTemplate.save() 是 upsert,天然幂等
每日对账对比 MySQL 和 ES 的商品数量,差异大则告警
人工补偿接口提供 /sync/full 接口手动触发全量同步

总结: 这套方案优势:

  • 解耦:Canal 独立部署,搜索服务无感知
  • 可靠:RabbitMQ 持久化 + ACK 机制
  • 易维护:使用 ElasticsearchRestTemplate,符合 Spring 风格
  • 可扩展:未来可接入更多消费者(如推荐系统)